tabledefs.h \
threads.c \
threads-types.h \
- threadpool-ms.c \
- threadpool-ms.h \
- threadpool-ms-io.c \
- threadpool-ms-io.h \
+ threadpool.c \
+ threadpool.h \
+ threadpool-worker-default.c \
+ threadpool-worker.h \
+ threadpool-io.c \
+ threadpool-io.h \
verify.c \
verify-internals.h \
wrapper-types.h \
verify.h
EXTRA_DIST = $(win32_sources) $(unix_sources) $(null_sources) runtime.h \
- threadpool-ms-io-poll.c threadpool-ms-io-epoll.c threadpool-ms-io-kqueue.c sgen-dynarray.h
+ threadpool-io-poll.c threadpool-io-epoll.c threadpool-io-kqueue.c sgen-dynarray.h
#include <mono/metadata/exception.h>
#include <mono/metadata/exception-internals.h>
#include <mono/metadata/threads.h>
-#include <mono/metadata/threadpool-ms.h>
+#include <mono/metadata/threadpool.h>
#include <mono/metadata/socket-io.h>
#include <mono/metadata/tabledefs.h>
#include <mono/metadata/gc-internals.h>
goto failure;
}
- if (!mono_threadpool_ms_remove_domain_jobs (domain, -1)) {
+ if (!mono_threadpool_remove_domain_jobs (domain, -1)) {
data->failure_reason = g_strdup_printf ("Cleanup of threadpool jobs of domain %s timed out.", domain->friendly_name);
goto failure;
}
#include <mono/metadata/domain-internals.h>
#include <mono/metadata/gc-internals.h>
#include <mono/metadata/metadata.h>
-#include <mono/metadata/threadpool-ms.h>
+#include <mono/metadata/threadpool.h>
#include <mono/utils/mono-signal-handler.h>
#include <mono/utils/mono-proclib.h>
#include <mono/io-layer/io-layer.h>
method = mono_class_get_method_from_name (klass, "BeginInvoke", -1);
g_assert (method != NULL);
- mono_threadpool_ms_begin_invoke (domain, (MonoObject*) load_value, method, NULL, &error);
+ mono_threadpool_begin_invoke (domain, (MonoObject*) load_value, method, NULL, &error);
if (!is_ok (&error)) {
g_warning ("Couldn't invoke System.Console cancel handler due to %s", mono_error_get_message (&error));
mono_error_cleanup (&error);
#include <mono/metadata/metadata-internals.h>
#include <mono/metadata/mono-mlist.h>
#include <mono/metadata/threads-types.h>
-#include <mono/metadata/threadpool-ms.h>
+#include <mono/metadata/threadpool.h>
#include <mono/sgen/sgen-conf.h>
#include <mono/sgen/sgen-gc.h>
#include <mono/utils/mono-logger-internals.h>
}
if (domain == mono_get_root_domain ()) {
- mono_threadpool_ms_cleanup ();
+ mono_threadpool_cleanup ();
mono_gc_finalize_threadpool_threads ();
}
#include <mono/metadata/object.h>
#include <mono/metadata/threads.h>
#include <mono/metadata/threads-types.h>
-#include <mono/metadata/threadpool-ms.h>
-#include <mono/metadata/threadpool-ms-io.h>
+#include <mono/metadata/threadpool.h>
+#include <mono/metadata/threadpool-io.h>
#include <mono/metadata/monitor.h>
#include <mono/metadata/reflection.h>
#include <mono/metadata/image-internals.h>
#include "mono/metadata/cominterop.h"
#include "mono/metadata/remoting.h"
#include "mono/metadata/reflection-internals.h"
-#include "mono/metadata/threadpool-ms.h"
+#include "mono/metadata/threadpool.h"
#include "mono/metadata/handle.h"
#include "mono/utils/mono-counters.h"
#include "mono/utils/mono-tls.h"
method = mono_get_delegate_invoke (klass);
g_assert (method);
- MonoAsyncResult *result = mono_threadpool_ms_begin_invoke (mono_domain_get (), (MonoObject*) delegate, method, params, &error);
+ MonoAsyncResult *result = mono_threadpool_begin_invoke (mono_domain_get (), (MonoObject*) delegate, method, params, &error);
mono_error_set_pending_exception (&error);
return result;
}
} else
#endif
{
- res = mono_threadpool_ms_end_invoke (ares, &out_args, &exc, &error);
+ res = mono_threadpool_end_invoke (ares, &out_args, &exc, &error);
if (mono_error_set_pending_exception (&error))
return NULL;
}
#include <mono/metadata/runtime.h>
#include <mono/metadata/monitor.h>
#include <mono/metadata/threads-types.h>
-#include <mono/metadata/threadpool-ms.h>
+#include <mono/metadata/threadpool.h>
#include <mono/metadata/marshal.h>
#include <mono/utils/atomic.h>
mono_runtime_set_shutting_down ();
/* This will kill the tp threads which cannot be suspended */
- mono_threadpool_ms_cleanup ();
+ mono_threadpool_cleanup ();
/*TODO move the follow to here:
mono_thread_suspend_all_other_threads (); OR mono_thread_wait_all_other_threads
#include <mono/metadata/file-io.h>
#include <mono/metadata/threads.h>
#include <mono/metadata/threads-types.h>
-#include <mono/metadata/threadpool-ms-io.h>
+#include <mono/metadata/threadpool-io.h>
#include <mono/utils/mono-poll.h>
/* FIXME change this code to not mess so much with the internals */
#include <mono/metadata/class-internals.h>
/* Clear any pending work item from this socket if the underlying
* polling system does not notify when the socket is closed */
- mono_threadpool_ms_io_remove_socket (GPOINTER_TO_INT (sock));
+ mono_threadpool_io_remove_socket (GPOINTER_TO_INT (sock));
MONO_ENTER_GC_SAFE;
closesocket (sock);
--- /dev/null
+
+#if defined(HAVE_EPOLL)
+
+#include <sys/epoll.h>
+
+#if defined(HOST_WIN32)
+/* We assume that epoll is not available on windows */
+#error
+#endif
+
+#define EPOLL_NEVENTS 128
+
+static gint epoll_fd;
+static struct epoll_event *epoll_events;
+
+static gboolean
+epoll_init (gint wakeup_pipe_fd)
+{
+ struct epoll_event event;
+
+#ifdef EPOOL_CLOEXEC
+ epoll_fd = epoll_create1 (EPOLL_CLOEXEC);
+#else
+ epoll_fd = epoll_create (256);
+ fcntl (epoll_fd, F_SETFD, FD_CLOEXEC);
+#endif
+
+ if (epoll_fd == -1) {
+#ifdef EPOOL_CLOEXEC
+ g_error ("epoll_init: epoll (EPOLL_CLOEXEC) failed, error (%d) %s\n", errno, g_strerror (errno));
+#else
+ g_error ("epoll_init: epoll (256) failed, error (%d) %s\n", errno, g_strerror (errno));
+#endif
+ return FALSE;
+ }
+
+ event.events = EPOLLIN;
+ event.data.fd = wakeup_pipe_fd;
+ if (epoll_ctl (epoll_fd, EPOLL_CTL_ADD, event.data.fd, &event) == -1) {
+ g_error ("epoll_init: epoll_ctl () failed, error (%d) %s", errno, g_strerror (errno));
+ close (epoll_fd);
+ return FALSE;
+ }
+
+ epoll_events = g_new0 (struct epoll_event, EPOLL_NEVENTS);
+
+ return TRUE;
+}
+
+static void
+epoll_register_fd (gint fd, gint events, gboolean is_new)
+{
+ struct epoll_event event;
+
+#ifndef EPOLLONESHOT
+/* it was only defined on android in May 2013 */
+#define EPOLLONESHOT 0x40000000
+#endif
+
+ event.data.fd = fd;
+ event.events = EPOLLONESHOT;
+ if ((events & EVENT_IN) != 0)
+ event.events |= EPOLLIN;
+ if ((events & EVENT_OUT) != 0)
+ event.events |= EPOLLOUT;
+
+ if (epoll_ctl (epoll_fd, is_new ? EPOLL_CTL_ADD : EPOLL_CTL_MOD, event.data.fd, &event) == -1)
+ g_error ("epoll_register_fd: epoll_ctl(%s) failed, error (%d) %s", is_new ? "EPOLL_CTL_ADD" : "EPOLL_CTL_MOD", errno, g_strerror (errno));
+}
+
+static void
+epoll_remove_fd (gint fd)
+{
+ if (epoll_ctl (epoll_fd, EPOLL_CTL_DEL, fd, NULL) == -1)
+ g_error ("epoll_remove_fd: epoll_ctl (EPOLL_CTL_DEL) failed, error (%d) %s", errno, g_strerror (errno));
+}
+
+static gint
+epoll_event_wait (void (*callback) (gint fd, gint events, gpointer user_data), gpointer user_data)
+{
+ gint i, ready;
+
+ memset (epoll_events, 0, sizeof (struct epoll_event) * EPOLL_NEVENTS);
+
+ mono_gc_set_skip_thread (TRUE);
+
+ MONO_ENTER_GC_SAFE;
+ ready = epoll_wait (epoll_fd, epoll_events, EPOLL_NEVENTS, -1);
+ MONO_EXIT_GC_SAFE;
+
+ mono_gc_set_skip_thread (FALSE);
+
+ if (ready == -1) {
+ switch (errno) {
+ case EINTR:
+ mono_thread_internal_check_for_interruption_critical (mono_thread_internal_current ());
+ ready = 0;
+ break;
+ default:
+ g_error ("epoll_event_wait: epoll_wait () failed, error (%d) %s", errno, g_strerror (errno));
+ break;
+ }
+ }
+
+ if (ready == -1)
+ return -1;
+
+ for (i = 0; i < ready; ++i) {
+ gint fd, events = 0;
+
+ fd = epoll_events [i].data.fd;
+ if (epoll_events [i].events & (EPOLLIN | EPOLLERR | EPOLLHUP))
+ events |= EVENT_IN;
+ if (epoll_events [i].events & (EPOLLOUT | EPOLLERR | EPOLLHUP))
+ events |= EVENT_OUT;
+
+ callback (fd, events, user_data);
+ }
+
+ return 0;
+}
+
+static ThreadPoolIOBackend backend_epoll = {
+ .init = epoll_init,
+ .register_fd = epoll_register_fd,
+ .remove_fd = epoll_remove_fd,
+ .event_wait = epoll_event_wait,
+};
+
+#endif
--- /dev/null
+
+#if defined(HAVE_KQUEUE)
+
+#include <sys/types.h>
+#include <sys/event.h>
+#include <sys/time.h>
+
+#if defined(HOST_WIN32)
+/* We assume that kqueue is not available on windows */
+#error
+#endif
+
+#define KQUEUE_NEVENTS 128
+
+static gint kqueue_fd;
+static struct kevent *kqueue_events;
+
+static gint
+KQUEUE_INIT_FD (gint fd, gint events, gint flags)
+{
+ struct kevent event;
+ EV_SET (&event, fd, events, flags, 0, 0, 0);
+ return kevent (kqueue_fd, &event, 1, NULL, 0, NULL);
+}
+
+static gboolean
+kqueue_init (gint wakeup_pipe_fd)
+{
+ kqueue_fd = kqueue ();
+ if (kqueue_fd == -1) {
+ g_error ("kqueue_init: kqueue () failed, error (%d) %s", errno, g_strerror (errno));
+ return FALSE;
+ }
+
+ if (KQUEUE_INIT_FD (wakeup_pipe_fd, EVFILT_READ, EV_ADD | EV_ENABLE) == -1) {
+ g_error ("kqueue_init: kevent () failed, error (%d) %s", errno, g_strerror (errno));
+ close (kqueue_fd);
+ return FALSE;
+ }
+
+ kqueue_events = g_new0 (struct kevent, KQUEUE_NEVENTS);
+
+ return TRUE;
+}
+
+static void
+kqueue_register_fd (gint fd, gint events, gboolean is_new)
+{
+ if (events & EVENT_IN) {
+ if (KQUEUE_INIT_FD (fd, EVFILT_READ, EV_ADD | EV_ENABLE) == -1)
+ g_error ("kqueue_register_fd: kevent(read,enable) failed, error (%d) %s", errno, g_strerror (errno));
+ } else {
+ if (KQUEUE_INIT_FD (fd, EVFILT_READ, EV_ADD | EV_DISABLE) == -1)
+ g_error ("kqueue_register_fd: kevent(read,disable) failed, error (%d) %s", errno, g_strerror (errno));
+ }
+ if (events & EVENT_OUT) {
+ if (KQUEUE_INIT_FD (fd, EVFILT_WRITE, EV_ADD | EV_ENABLE) == -1)
+ g_error ("kqueue_register_fd: kevent(write,enable) failed, error (%d) %s", errno, g_strerror (errno));
+ } else {
+ if (KQUEUE_INIT_FD (fd, EVFILT_WRITE, EV_ADD | EV_DISABLE) == -1)
+ g_error ("kqueue_register_fd: kevent(write,disable) failed, error (%d) %s", errno, g_strerror (errno));
+ }
+}
+
+static void
+kqueue_remove_fd (gint fd)
+{
+ /* FIXME: a race between closing and adding operation in the Socket managed code trigger a ENOENT error */
+ if (KQUEUE_INIT_FD (fd, EVFILT_READ, EV_DELETE) == -1)
+ g_error ("kqueue_register_fd: kevent(read,delete) failed, error (%d) %s", errno, g_strerror (errno));
+ if (KQUEUE_INIT_FD (fd, EVFILT_WRITE, EV_DELETE) == -1)
+ g_error ("kqueue_register_fd: kevent(write,delete) failed, error (%d) %s", errno, g_strerror (errno));
+}
+
+static gint
+kqueue_event_wait (void (*callback) (gint fd, gint events, gpointer user_data), gpointer user_data)
+{
+ gint i, ready;
+
+ memset (kqueue_events, 0, sizeof (struct kevent) * KQUEUE_NEVENTS);
+
+ mono_gc_set_skip_thread (TRUE);
+
+ MONO_ENTER_GC_SAFE;
+ ready = kevent (kqueue_fd, NULL, 0, kqueue_events, KQUEUE_NEVENTS, NULL);
+ MONO_EXIT_GC_SAFE;
+
+ mono_gc_set_skip_thread (FALSE);
+
+ if (ready == -1) {
+ switch (errno) {
+ case EINTR:
+ mono_thread_internal_check_for_interruption_critical (mono_thread_internal_current ());
+ ready = 0;
+ break;
+ default:
+ g_error ("kqueue_event_wait: kevent () failed, error (%d) %s", errno, g_strerror (errno));
+ break;
+ }
+ }
+
+ if (ready == -1)
+ return -1;
+
+ for (i = 0; i < ready; ++i) {
+ gint fd, events = 0;
+
+ fd = kqueue_events [i].ident;
+ if (kqueue_events [i].filter == EVFILT_READ || (kqueue_events [i].flags & EV_ERROR) != 0)
+ events |= EVENT_IN;
+ if (kqueue_events [i].filter == EVFILT_WRITE || (kqueue_events [i].flags & EV_ERROR) != 0)
+ events |= EVENT_OUT;
+
+ callback (fd, events, user_data);
+ }
+
+ return 0;
+}
+
+static ThreadPoolIOBackend backend_kqueue = {
+ .init = kqueue_init,
+ .register_fd = kqueue_register_fd,
+ .remove_fd = kqueue_remove_fd,
+ .event_wait = kqueue_event_wait,
+};
+
+#endif
--- /dev/null
+
+#include "utils/mono-poll.h"
+
+static mono_pollfd *poll_fds;
+static guint poll_fds_capacity;
+static guint poll_fds_size;
+
+static inline void
+POLL_INIT_FD (mono_pollfd *poll_fd, gint fd, gint events)
+{
+ poll_fd->fd = fd;
+ poll_fd->events = events;
+ poll_fd->revents = 0;
+}
+
+static gboolean
+poll_init (gint wakeup_pipe_fd)
+{
+ g_assert (wakeup_pipe_fd >= 0);
+
+ poll_fds_size = 1;
+ poll_fds_capacity = 64;
+
+ poll_fds = g_new0 (mono_pollfd, poll_fds_capacity);
+
+ POLL_INIT_FD (&poll_fds [0], wakeup_pipe_fd, MONO_POLLIN);
+
+ return TRUE;
+}
+
+static void
+poll_register_fd (gint fd, gint events, gboolean is_new)
+{
+ gint i;
+ gint poll_event;
+
+ g_assert (fd >= 0);
+ g_assert (poll_fds_size <= poll_fds_capacity);
+
+ g_assert ((events & ~(EVENT_IN | EVENT_OUT)) == 0);
+
+ poll_event = 0;
+ if (events & EVENT_IN)
+ poll_event |= MONO_POLLIN;
+ if (events & EVENT_OUT)
+ poll_event |= MONO_POLLOUT;
+
+ for (i = 0; i < poll_fds_size; ++i) {
+ if (poll_fds [i].fd == fd) {
+ g_assert (!is_new);
+ POLL_INIT_FD (&poll_fds [i], fd, poll_event);
+ return;
+ }
+ }
+
+ g_assert (is_new);
+
+ for (i = 0; i < poll_fds_size; ++i) {
+ if (poll_fds [i].fd == -1) {
+ POLL_INIT_FD (&poll_fds [i], fd, poll_event);
+ return;
+ }
+ }
+
+ poll_fds_size += 1;
+
+ if (poll_fds_size > poll_fds_capacity) {
+ poll_fds_capacity *= 2;
+ g_assert (poll_fds_size <= poll_fds_capacity);
+
+ poll_fds = (mono_pollfd *)g_renew (mono_pollfd, poll_fds, poll_fds_capacity);
+ }
+
+ POLL_INIT_FD (&poll_fds [poll_fds_size - 1], fd, poll_event);
+}
+
+static void
+poll_remove_fd (gint fd)
+{
+ gint i;
+
+ g_assert (fd >= 0);
+
+ for (i = 0; i < poll_fds_size; ++i) {
+ if (poll_fds [i].fd == fd) {
+ POLL_INIT_FD (&poll_fds [i], -1, 0);
+ break;
+ }
+ }
+
+ /* if we don't find the fd in poll_fds,
+ * it means we try to delete it twice */
+ g_assert (i < poll_fds_size);
+
+ /* if we find it again, it means we added
+ * it twice */
+ for (; i < poll_fds_size; ++i)
+ g_assert (poll_fds [i].fd != fd);
+
+ /* reduce the value of poll_fds_size so we
+ * do not keep it too big */
+ while (poll_fds_size > 1 && poll_fds [poll_fds_size - 1].fd == -1)
+ poll_fds_size -= 1;
+}
+
+static inline gint
+poll_mark_bad_fds (mono_pollfd *poll_fds, gint poll_fds_size)
+{
+ gint i, ready = 0;
+
+ for (i = 0; i < poll_fds_size; i++) {
+ if (poll_fds [i].fd == -1)
+ continue;
+
+ switch (mono_poll (&poll_fds [i], 1, 0)) {
+ case 1:
+ ready++;
+ break;
+ case -1:
+ if (errno == EBADF)
+ {
+ poll_fds [i].revents |= MONO_POLLNVAL;
+ ready++;
+ }
+ break;
+ }
+ }
+
+ return ready;
+}
+
+static gint
+poll_event_wait (void (*callback) (gint fd, gint events, gpointer user_data), gpointer user_data)
+{
+ gint i, ready;
+
+ for (i = 0; i < poll_fds_size; ++i)
+ poll_fds [i].revents = 0;
+
+ mono_gc_set_skip_thread (TRUE);
+
+ MONO_ENTER_GC_SAFE;
+ ready = mono_poll (poll_fds, poll_fds_size, -1);
+ MONO_EXIT_GC_SAFE;
+
+ mono_gc_set_skip_thread (FALSE);
+
+ if (ready == -1) {
+ /*
+ * Apart from EINTR, we only check EBADF, for the rest:
+ * EINVAL: mono_poll() 'protects' us from descriptor
+ * numbers above the limit if using select() by marking
+ * then as POLLERR. If a system poll() is being
+ * used, the number of descriptor we're passing will not
+ * be over sysconf(_SC_OPEN_MAX), as the error would have
+ * happened when opening.
+ *
+ * EFAULT: we own the memory pointed by pfds.
+ * ENOMEM: we're doomed anyway
+ *
+ */
+ switch (errno)
+ {
+ case EINTR:
+ {
+ mono_thread_internal_check_for_interruption_critical (mono_thread_internal_current ());
+ ready = 0;
+ break;
+ }
+ case EBADF:
+ {
+ ready = poll_mark_bad_fds (poll_fds, poll_fds_size);
+ break;
+ }
+ default:
+ g_error ("poll_event_wait: mono_poll () failed, error (%d) %s", errno, g_strerror (errno));
+ break;
+ }
+ }
+
+ if (ready == -1)
+ return -1;
+ if (ready == 0)
+ return 0;
+
+ g_assert (ready > 0);
+
+ for (i = 0; i < poll_fds_size; ++i) {
+ gint fd, events = 0;
+
+ if (poll_fds [i].fd == -1)
+ continue;
+ if (poll_fds [i].revents == 0)
+ continue;
+
+ fd = poll_fds [i].fd;
+ if (poll_fds [i].revents & (MONO_POLLIN | MONO_POLLERR | MONO_POLLHUP | MONO_POLLNVAL))
+ events |= EVENT_IN;
+ if (poll_fds [i].revents & (MONO_POLLOUT | MONO_POLLERR | MONO_POLLHUP | MONO_POLLNVAL))
+ events |= EVENT_OUT;
+ if (poll_fds [i].revents & (MONO_POLLERR | MONO_POLLHUP | MONO_POLLNVAL))
+ events |= EVENT_ERR;
+
+ callback (fd, events, user_data);
+
+ if (--ready == 0)
+ break;
+ }
+
+ return 0;
+}
+
+static ThreadPoolIOBackend backend_poll = {
+ .init = poll_init,
+ .register_fd = poll_register_fd,
+ .remove_fd = poll_remove_fd,
+ .event_wait = poll_event_wait,
+};
--- /dev/null
+/*
+ * threadpool-io.c: Microsoft IO threadpool runtime support
+ *
+ * Author:
+ * Ludovic Henry (ludovic.henry@xamarin.com)
+ *
+ * Copyright 2015 Xamarin, Inc (http://www.xamarin.com)
+ * Licensed under the MIT license. See LICENSE file in the project root for full license information.
+ */
+
+#include <config.h>
+
+#ifndef DISABLE_SOCKETS
+
+#include <glib.h>
+
+#if defined(HOST_WIN32)
+#include <windows.h>
+#else
+#include <errno.h>
+#include <fcntl.h>
+#endif
+
+#include <mono/metadata/gc-internals.h>
+#include <mono/metadata/mono-mlist.h>
+#include <mono/metadata/threadpool.h>
+#include <mono/metadata/threadpool-io.h>
+#include <mono/utils/atomic.h>
+#include <mono/utils/mono-threads.h>
+#include <mono/utils/mono-lazy-init.h>
+#include <mono/utils/mono-logger-internals.h>
+
+typedef struct {
+ gboolean (*init) (gint wakeup_pipe_fd);
+ void (*register_fd) (gint fd, gint events, gboolean is_new);
+ void (*remove_fd) (gint fd);
+ gint (*event_wait) (void (*callback) (gint fd, gint events, gpointer user_data), gpointer user_data);
+} ThreadPoolIOBackend;
+
+/* Keep in sync with System.IOOperation in mcs/class/System/System/IOSelector.cs */
+enum MonoIOOperation {
+ EVENT_IN = 1 << 0,
+ EVENT_OUT = 1 << 1,
+ EVENT_ERR = 1 << 2, /* not in managed */
+};
+
+#include "threadpool-io-epoll.c"
+#include "threadpool-io-kqueue.c"
+#include "threadpool-io-poll.c"
+
+#define UPDATES_CAPACITY 128
+
+/* Keep in sync with System.IOSelectorJob in mcs/class/System/System/IOSelector.cs */
+struct _MonoIOSelectorJob {
+ MonoObject object;
+ gint32 operation;
+ MonoObject *callback;
+ MonoObject *state;
+};
+
+typedef enum {
+ UPDATE_EMPTY = 0,
+ UPDATE_ADD,
+ UPDATE_REMOVE_SOCKET,
+ UPDATE_REMOVE_DOMAIN,
+} ThreadPoolIOUpdateType;
+
+typedef struct {
+ gint fd;
+ MonoIOSelectorJob *job;
+} ThreadPoolIOUpdate_Add;
+
+typedef struct {
+ gint fd;
+} ThreadPoolIOUpdate_RemoveSocket;
+
+typedef struct {
+ MonoDomain *domain;
+} ThreadPoolIOUpdate_RemoveDomain;
+
+typedef struct {
+ ThreadPoolIOUpdateType type;
+ union {
+ ThreadPoolIOUpdate_Add add;
+ ThreadPoolIOUpdate_RemoveSocket remove_socket;
+ ThreadPoolIOUpdate_RemoveDomain remove_domain;
+ } data;
+} ThreadPoolIOUpdate;
+
+typedef struct {
+ ThreadPoolIOBackend backend;
+
+ ThreadPoolIOUpdate updates [UPDATES_CAPACITY];
+ gint updates_size;
+ MonoCoopMutex updates_lock;
+ MonoCoopCond updates_cond;
+
+#if !defined(HOST_WIN32)
+ gint wakeup_pipes [2];
+#else
+ SOCKET wakeup_pipes [2];
+#endif
+} ThreadPoolIO;
+
+static mono_lazy_init_t io_status = MONO_LAZY_INIT_STATUS_NOT_INITIALIZED;
+
+static gboolean io_selector_running = FALSE;
+
+static ThreadPoolIO* threadpool_io;
+
+static MonoIOSelectorJob*
+get_job_for_event (MonoMList **list, gint32 event)
+{
+ MonoMList *current;
+
+ g_assert (list);
+
+ for (current = *list; current; current = mono_mlist_next (current)) {
+ MonoIOSelectorJob *job = (MonoIOSelectorJob*) mono_mlist_get_data (current);
+ if (job->operation == event) {
+ *list = mono_mlist_remove_item (*list, current);
+ return job;
+ }
+ }
+
+ return NULL;
+}
+
+static gint
+get_operations_for_jobs (MonoMList *list)
+{
+ MonoMList *current;
+ gint operations = 0;
+
+ for (current = list; current; current = mono_mlist_next (current))
+ operations |= ((MonoIOSelectorJob*) mono_mlist_get_data (current))->operation;
+
+ return operations;
+}
+
+static void
+selector_thread_wakeup (void)
+{
+ gchar msg = 'c';
+ gint written;
+
+ for (;;) {
+#if !defined(HOST_WIN32)
+ written = write (threadpool_io->wakeup_pipes [1], &msg, 1);
+ if (written == 1)
+ break;
+ if (written == -1) {
+ g_warning ("selector_thread_wakeup: write () failed, error (%d) %s\n", errno, g_strerror (errno));
+ break;
+ }
+#else
+ written = send (threadpool_io->wakeup_pipes [1], &msg, 1, 0);
+ if (written == 1)
+ break;
+ if (written == SOCKET_ERROR) {
+ g_warning ("selector_thread_wakeup: write () failed, error (%d)\n", WSAGetLastError ());
+ break;
+ }
+#endif
+ }
+}
+
+static void
+selector_thread_wakeup_drain_pipes (void)
+{
+ gchar buffer [128];
+ gint received;
+
+ for (;;) {
+#if !defined(HOST_WIN32)
+ received = read (threadpool_io->wakeup_pipes [0], buffer, sizeof (buffer));
+ if (received == 0)
+ break;
+ if (received == -1) {
+ if (errno != EINTR && errno != EAGAIN)
+ g_warning ("selector_thread_wakeup_drain_pipes: read () failed, error (%d) %s\n", errno, g_strerror (errno));
+ break;
+ }
+#else
+ received = recv (threadpool_io->wakeup_pipes [0], buffer, sizeof (buffer), 0);
+ if (received == 0)
+ break;
+ if (received == SOCKET_ERROR) {
+ if (WSAGetLastError () != WSAEINTR && WSAGetLastError () != WSAEWOULDBLOCK)
+ g_warning ("selector_thread_wakeup_drain_pipes: recv () failed, error (%d) %s\n", WSAGetLastError ());
+ break;
+ }
+#endif
+ }
+}
+
+typedef struct {
+ MonoDomain *domain;
+ MonoGHashTable *states;
+} FilterSockaresForDomainData;
+
+static void
+filter_jobs_for_domain (gpointer key, gpointer value, gpointer user_data)
+{
+ FilterSockaresForDomainData *data;
+ MonoMList *list = (MonoMList *)value, *element;
+ MonoDomain *domain;
+ MonoGHashTable *states;
+
+ g_assert (user_data);
+ data = (FilterSockaresForDomainData *)user_data;
+ domain = data->domain;
+ states = data->states;
+
+ for (element = list; element; element = mono_mlist_next (element)) {
+ MonoIOSelectorJob *job = (MonoIOSelectorJob*) mono_mlist_get_data (element);
+ if (mono_object_domain (job) == domain)
+ mono_mlist_set_data (element, NULL);
+ }
+
+ /* we skip all the first elements which are NULL */
+ for (; list; list = mono_mlist_next (list)) {
+ if (mono_mlist_get_data (list))
+ break;
+ }
+
+ if (list) {
+ g_assert (mono_mlist_get_data (list));
+
+ /* we delete all the NULL elements after the first one */
+ for (element = list; element;) {
+ MonoMList *next;
+ if (!(next = mono_mlist_next (element)))
+ break;
+ if (mono_mlist_get_data (next))
+ element = next;
+ else
+ mono_mlist_set_next (element, mono_mlist_next (next));
+ }
+ }
+
+ mono_g_hash_table_replace (states, key, list);
+}
+
+static void
+wait_callback (gint fd, gint events, gpointer user_data)
+{
+ MonoError error;
+
+ if (mono_runtime_is_shutting_down ())
+ return;
+
+ if (fd == threadpool_io->wakeup_pipes [0]) {
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: wke");
+ selector_thread_wakeup_drain_pipes ();
+ } else {
+ MonoGHashTable *states;
+ MonoMList *list = NULL;
+ gpointer k;
+ gboolean remove_fd = FALSE;
+ gint operations;
+
+ g_assert (user_data);
+ states = (MonoGHashTable *)user_data;
+
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: cal fd %3d, events = %2s | %2s | %3s",
+ fd, (events & EVENT_IN) ? "RD" : "..", (events & EVENT_OUT) ? "WR" : "..", (events & EVENT_ERR) ? "ERR" : "...");
+
+ if (!mono_g_hash_table_lookup_extended (states, GINT_TO_POINTER (fd), &k, (gpointer*) &list))
+ g_error ("wait_callback: fd %d not found in states table", fd);
+
+ if (list && (events & EVENT_IN) != 0) {
+ MonoIOSelectorJob *job = get_job_for_event (&list, EVENT_IN);
+ if (job) {
+ mono_threadpool_enqueue_work_item (((MonoObject*) job)->vtable->domain, (MonoObject*) job, &error);
+ mono_error_assert_ok (&error);
+ }
+
+ }
+ if (list && (events & EVENT_OUT) != 0) {
+ MonoIOSelectorJob *job = get_job_for_event (&list, EVENT_OUT);
+ if (job) {
+ mono_threadpool_enqueue_work_item (((MonoObject*) job)->vtable->domain, (MonoObject*) job, &error);
+ mono_error_assert_ok (&error);
+ }
+ }
+
+ remove_fd = (events & EVENT_ERR) == EVENT_ERR;
+ if (!remove_fd) {
+ mono_g_hash_table_replace (states, GINT_TO_POINTER (fd), list);
+
+ operations = get_operations_for_jobs (list);
+
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: res fd %3d, events = %2s | %2s | %3s",
+ fd, (operations & EVENT_IN) ? "RD" : "..", (operations & EVENT_OUT) ? "WR" : "..", (operations & EVENT_ERR) ? "ERR" : "...");
+
+ threadpool_io->backend.register_fd (fd, operations, FALSE);
+ } else {
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: err fd %d", fd);
+
+ mono_g_hash_table_remove (states, GINT_TO_POINTER (fd));
+
+ threadpool_io->backend.remove_fd (fd);
+ }
+ }
+}
+
+static void
+selector_thread (gpointer data)
+{
+ MonoError error;
+ MonoGHashTable *states;
+
+ io_selector_running = TRUE;
+
+ if (mono_runtime_is_shutting_down ()) {
+ io_selector_running = FALSE;
+ return;
+ }
+
+ states = mono_g_hash_table_new_type (g_direct_hash, g_direct_equal, MONO_HASH_VALUE_GC, MONO_ROOT_SOURCE_THREAD_POOL, "i/o thread pool states table");
+
+ for (;;) {
+ gint i, j;
+ gint res;
+
+ mono_coop_mutex_lock (&threadpool_io->updates_lock);
+
+ for (i = 0; i < threadpool_io->updates_size; ++i) {
+ ThreadPoolIOUpdate *update = &threadpool_io->updates [i];
+
+ switch (update->type) {
+ case UPDATE_EMPTY:
+ break;
+ case UPDATE_ADD: {
+ gint fd;
+ gint operations;
+ gpointer k;
+ gboolean exists;
+ MonoMList *list = NULL;
+ MonoIOSelectorJob *job;
+
+ fd = update->data.add.fd;
+ g_assert (fd >= 0);
+
+ job = update->data.add.job;
+ g_assert (job);
+
+ exists = mono_g_hash_table_lookup_extended (states, GINT_TO_POINTER (fd), &k, (gpointer*) &list);
+ list = mono_mlist_append_checked (list, (MonoObject*) job, &error);
+ mono_error_assert_ok (&error);
+ mono_g_hash_table_replace (states, GINT_TO_POINTER (fd), list);
+
+ operations = get_operations_for_jobs (list);
+
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: %3s fd %3d, operations = %2s | %2s | %3s",
+ exists ? "mod" : "add", fd, (operations & EVENT_IN) ? "RD" : "..", (operations & EVENT_OUT) ? "WR" : "..", (operations & EVENT_ERR) ? "ERR" : "...");
+
+ threadpool_io->backend.register_fd (fd, operations, !exists);
+
+ break;
+ }
+ case UPDATE_REMOVE_SOCKET: {
+ gint fd;
+ gpointer k;
+ MonoMList *list = NULL;
+
+ fd = update->data.remove_socket.fd;
+ g_assert (fd >= 0);
+
+ if (mono_g_hash_table_lookup_extended (states, GINT_TO_POINTER (fd), &k, (gpointer*) &list)) {
+ mono_g_hash_table_remove (states, GINT_TO_POINTER (fd));
+
+ for (j = i + 1; j < threadpool_io->updates_size; ++j) {
+ ThreadPoolIOUpdate *update = &threadpool_io->updates [j];
+ if (update->type == UPDATE_ADD && update->data.add.fd == fd)
+ memset (update, 0, sizeof (ThreadPoolIOUpdate));
+ }
+
+ for (; list; list = mono_mlist_remove_item (list, list)) {
+ mono_threadpool_enqueue_work_item (mono_object_domain (mono_mlist_get_data (list)), mono_mlist_get_data (list), &error);
+ mono_error_assert_ok (&error);
+ }
+
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: del fd %3d", fd);
+ threadpool_io->backend.remove_fd (fd);
+ }
+
+ break;
+ }
+ case UPDATE_REMOVE_DOMAIN: {
+ MonoDomain *domain;
+
+ domain = update->data.remove_domain.domain;
+ g_assert (domain);
+
+ FilterSockaresForDomainData user_data = { .domain = domain, .states = states };
+ mono_g_hash_table_foreach (states, filter_jobs_for_domain, &user_data);
+
+ for (j = i + 1; j < threadpool_io->updates_size; ++j) {
+ ThreadPoolIOUpdate *update = &threadpool_io->updates [j];
+ if (update->type == UPDATE_ADD && mono_object_domain (update->data.add.job) == domain)
+ memset (update, 0, sizeof (ThreadPoolIOUpdate));
+ }
+
+ break;
+ }
+ default:
+ g_assert_not_reached ();
+ }
+ }
+
+ mono_coop_cond_broadcast (&threadpool_io->updates_cond);
+
+ if (threadpool_io->updates_size > 0) {
+ threadpool_io->updates_size = 0;
+ memset (&threadpool_io->updates, 0, UPDATES_CAPACITY * sizeof (ThreadPoolIOUpdate));
+ }
+
+ mono_coop_mutex_unlock (&threadpool_io->updates_lock);
+
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: wai");
+
+ res = threadpool_io->backend.event_wait (wait_callback, states);
+
+ if (res == -1 || mono_runtime_is_shutting_down ())
+ break;
+ }
+
+ mono_g_hash_table_destroy (states);
+
+ io_selector_running = FALSE;
+}
+
+/* Locking: threadpool_io->updates_lock must be held */
+static ThreadPoolIOUpdate*
+update_get_new (void)
+{
+ ThreadPoolIOUpdate *update = NULL;
+ g_assert (threadpool_io->updates_size <= UPDATES_CAPACITY);
+
+ while (threadpool_io->updates_size == UPDATES_CAPACITY) {
+ /* we wait for updates to be applied in the selector_thread and we loop
+ * as long as none are available. if it happends too much, then we need
+ * to increase UPDATES_CAPACITY */
+ mono_coop_cond_wait (&threadpool_io->updates_cond, &threadpool_io->updates_lock);
+ }
+
+ g_assert (threadpool_io->updates_size < UPDATES_CAPACITY);
+
+ update = &threadpool_io->updates [threadpool_io->updates_size ++];
+
+ return update;
+}
+
+static void
+wakeup_pipes_init (void)
+{
+#if !defined(HOST_WIN32)
+ if (pipe (threadpool_io->wakeup_pipes) == -1)
+ g_error ("wakeup_pipes_init: pipe () failed, error (%d) %s\n", errno, g_strerror (errno));
+ if (fcntl (threadpool_io->wakeup_pipes [0], F_SETFL, O_NONBLOCK) == -1)
+ g_error ("wakeup_pipes_init: fcntl () failed, error (%d) %s\n", errno, g_strerror (errno));
+#else
+ struct sockaddr_in client;
+ struct sockaddr_in server;
+ SOCKET server_sock;
+ gulong arg;
+ gint size;
+
+ server_sock = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
+ g_assert (server_sock != INVALID_SOCKET);
+ threadpool_io->wakeup_pipes [1] = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
+ g_assert (threadpool_io->wakeup_pipes [1] != INVALID_SOCKET);
+
+ server.sin_family = AF_INET;
+ server.sin_addr.s_addr = inet_addr ("127.0.0.1");
+ server.sin_port = 0;
+ if (bind (server_sock, (SOCKADDR*) &server, sizeof (server)) == SOCKET_ERROR) {
+ closesocket (server_sock);
+ g_error ("wakeup_pipes_init: bind () failed, error (%d)\n", WSAGetLastError ());
+ }
+
+ size = sizeof (server);
+ if (getsockname (server_sock, (SOCKADDR*) &server, &size) == SOCKET_ERROR) {
+ closesocket (server_sock);
+ g_error ("wakeup_pipes_init: getsockname () failed, error (%d)\n", WSAGetLastError ());
+ }
+ if (listen (server_sock, 1024) == SOCKET_ERROR) {
+ closesocket (server_sock);
+ g_error ("wakeup_pipes_init: listen () failed, error (%d)\n", WSAGetLastError ());
+ }
+ if (connect ((SOCKET) threadpool_io->wakeup_pipes [1], (SOCKADDR*) &server, sizeof (server)) == SOCKET_ERROR) {
+ closesocket (server_sock);
+ g_error ("wakeup_pipes_init: connect () failed, error (%d)\n", WSAGetLastError ());
+ }
+
+ size = sizeof (client);
+ threadpool_io->wakeup_pipes [0] = accept (server_sock, (SOCKADDR *) &client, &size);
+ g_assert (threadpool_io->wakeup_pipes [0] != INVALID_SOCKET);
+
+ arg = 1;
+ if (ioctlsocket (threadpool_io->wakeup_pipes [0], FIONBIO, &arg) == SOCKET_ERROR) {
+ closesocket (threadpool_io->wakeup_pipes [0]);
+ closesocket (server_sock);
+ g_error ("wakeup_pipes_init: ioctlsocket () failed, error (%d)\n", WSAGetLastError ());
+ }
+
+ closesocket (server_sock);
+#endif
+}
+
+static void
+initialize (void)
+{
+ g_assert (!threadpool_io);
+ threadpool_io = g_new0 (ThreadPoolIO, 1);
+ g_assert (threadpool_io);
+
+ mono_coop_mutex_init (&threadpool_io->updates_lock);
+ mono_coop_cond_init (&threadpool_io->updates_cond);
+ mono_gc_register_root ((char *)&threadpool_io->updates [0], sizeof (threadpool_io->updates), MONO_GC_DESCRIPTOR_NULL, MONO_ROOT_SOURCE_THREAD_POOL, "i/o thread pool updates list");
+
+ threadpool_io->updates_size = 0;
+
+ threadpool_io->backend = backend_poll;
+ if (g_getenv ("MONO_ENABLE_AIO") != NULL) {
+#if defined(HAVE_EPOLL)
+ threadpool_io->backend = backend_epoll;
+#elif defined(HAVE_KQUEUE)
+ threadpool_io->backend = backend_kqueue;
+#endif
+ }
+
+ wakeup_pipes_init ();
+
+ if (!threadpool_io->backend.init (threadpool_io->wakeup_pipes [0]))
+ g_error ("initialize: backend->init () failed");
+
+ MonoError error;
+ if (!mono_thread_create_internal (mono_get_root_domain (), selector_thread, NULL, TRUE, SMALL_STACK, &error))
+ g_error ("initialize: mono_thread_create_internal () failed due to %s", mono_error_get_message (&error));
+}
+
+static void
+cleanup (void)
+{
+ /* we make the assumption along the code that we are
+ * cleaning up only if the runtime is shutting down */
+ g_assert (mono_runtime_is_shutting_down ());
+
+ selector_thread_wakeup ();
+ while (io_selector_running)
+ mono_thread_info_usleep (1000);
+}
+
+void
+mono_threadpool_io_cleanup (void)
+{
+ mono_lazy_cleanup (&io_status, cleanup);
+}
+
+void
+ves_icall_System_IOSelector_Add (gpointer handle, MonoIOSelectorJob *job)
+{
+ ThreadPoolIOUpdate *update;
+
+ g_assert (handle);
+
+ g_assert ((job->operation == EVENT_IN) ^ (job->operation == EVENT_OUT));
+ g_assert (job->callback);
+
+ if (mono_runtime_is_shutting_down ())
+ return;
+ if (mono_domain_is_unloading (mono_object_domain (job)))
+ return;
+
+ mono_lazy_initialize (&io_status, initialize);
+
+ mono_coop_mutex_lock (&threadpool_io->updates_lock);
+
+ update = update_get_new ();
+ update->type = UPDATE_ADD;
+ update->data.add.fd = GPOINTER_TO_INT (handle);
+ update->data.add.job = job;
+ mono_memory_barrier (); /* Ensure this is safely published before we wake up the selector */
+
+ selector_thread_wakeup ();
+
+ mono_coop_mutex_unlock (&threadpool_io->updates_lock);
+}
+
+void
+ves_icall_System_IOSelector_Remove (gpointer handle)
+{
+ mono_threadpool_io_remove_socket (GPOINTER_TO_INT (handle));
+}
+
+void
+mono_threadpool_io_remove_socket (int fd)
+{
+ ThreadPoolIOUpdate *update;
+
+ if (!mono_lazy_is_initialized (&io_status))
+ return;
+
+ mono_coop_mutex_lock (&threadpool_io->updates_lock);
+
+ update = update_get_new ();
+ update->type = UPDATE_REMOVE_SOCKET;
+ update->data.add.fd = fd;
+ mono_memory_barrier (); /* Ensure this is safely published before we wake up the selector */
+
+ selector_thread_wakeup ();
+
+ mono_coop_cond_wait (&threadpool_io->updates_cond, &threadpool_io->updates_lock);
+
+ mono_coop_mutex_unlock (&threadpool_io->updates_lock);
+}
+
+void
+mono_threadpool_io_remove_domain_jobs (MonoDomain *domain)
+{
+ ThreadPoolIOUpdate *update;
+
+ if (!mono_lazy_is_initialized (&io_status))
+ return;
+
+ mono_coop_mutex_lock (&threadpool_io->updates_lock);
+
+ update = update_get_new ();
+ update->type = UPDATE_REMOVE_DOMAIN;
+ update->data.remove_domain.domain = domain;
+ mono_memory_barrier (); /* Ensure this is safely published before we wake up the selector */
+
+ selector_thread_wakeup ();
+
+ mono_coop_cond_wait (&threadpool_io->updates_cond, &threadpool_io->updates_lock);
+
+ mono_coop_mutex_unlock (&threadpool_io->updates_lock);
+}
+
+#else
+
+void
+ves_icall_System_IOSelector_Add (gpointer handle, MonoIOSelectorJob *job)
+{
+ g_assert_not_reached ();
+}
+
+void
+ves_icall_System_IOSelector_Remove (gpointer handle)
+{
+ g_assert_not_reached ();
+}
+
+void
+mono_threadpool_io_cleanup (void)
+{
+ g_assert_not_reached ();
+}
+
+void
+mono_threadpool_io_remove_socket (int fd)
+{
+ g_assert_not_reached ();
+}
+
+void
+mono_threadpool_io_remove_domain_jobs (MonoDomain *domain)
+{
+ g_assert_not_reached ();
+}
+
+#endif
--- /dev/null
+
+#ifndef _MONO_METADATA_THREADPOOL_IO_H_
+#define _MONO_METADATA_THREADPOOL_IO_H_
+
+#include <config.h>
+#include <glib.h>
+
+#include <mono/metadata/object-internals.h>
+#include <mono/metadata/socket-io.h>
+
+typedef struct _MonoIOSelectorJob MonoIOSelectorJob;
+
+void
+ves_icall_System_IOSelector_Add (gpointer handle, MonoIOSelectorJob *job);
+
+void
+ves_icall_System_IOSelector_Remove (gpointer handle);
+
+void
+mono_threadpool_io_remove_socket (int fd);
+void
+mono_threadpool_io_remove_domain_jobs (MonoDomain *domain);
+void
+mono_threadpool_io_cleanup (void);
+
+#endif /* _MONO_METADATA_THREADPOOL_IO_H_ */
+++ /dev/null
-
-#if defined(HAVE_EPOLL)
-
-#include <sys/epoll.h>
-
-#if defined(HOST_WIN32)
-/* We assume that epoll is not available on windows */
-#error
-#endif
-
-#define EPOLL_NEVENTS 128
-
-static gint epoll_fd;
-static struct epoll_event *epoll_events;
-
-static gboolean
-epoll_init (gint wakeup_pipe_fd)
-{
- struct epoll_event event;
-
-#ifdef EPOOL_CLOEXEC
- epoll_fd = epoll_create1 (EPOLL_CLOEXEC);
-#else
- epoll_fd = epoll_create (256);
- fcntl (epoll_fd, F_SETFD, FD_CLOEXEC);
-#endif
-
- if (epoll_fd == -1) {
-#ifdef EPOOL_CLOEXEC
- g_error ("epoll_init: epoll (EPOLL_CLOEXEC) failed, error (%d) %s\n", errno, g_strerror (errno));
-#else
- g_error ("epoll_init: epoll (256) failed, error (%d) %s\n", errno, g_strerror (errno));
-#endif
- return FALSE;
- }
-
- event.events = EPOLLIN;
- event.data.fd = wakeup_pipe_fd;
- if (epoll_ctl (epoll_fd, EPOLL_CTL_ADD, event.data.fd, &event) == -1) {
- g_error ("epoll_init: epoll_ctl () failed, error (%d) %s", errno, g_strerror (errno));
- close (epoll_fd);
- return FALSE;
- }
-
- epoll_events = g_new0 (struct epoll_event, EPOLL_NEVENTS);
-
- return TRUE;
-}
-
-static void
-epoll_register_fd (gint fd, gint events, gboolean is_new)
-{
- struct epoll_event event;
-
-#ifndef EPOLLONESHOT
-/* it was only defined on android in May 2013 */
-#define EPOLLONESHOT 0x40000000
-#endif
-
- event.data.fd = fd;
- event.events = EPOLLONESHOT;
- if ((events & EVENT_IN) != 0)
- event.events |= EPOLLIN;
- if ((events & EVENT_OUT) != 0)
- event.events |= EPOLLOUT;
-
- if (epoll_ctl (epoll_fd, is_new ? EPOLL_CTL_ADD : EPOLL_CTL_MOD, event.data.fd, &event) == -1)
- g_error ("epoll_register_fd: epoll_ctl(%s) failed, error (%d) %s", is_new ? "EPOLL_CTL_ADD" : "EPOLL_CTL_MOD", errno, g_strerror (errno));
-}
-
-static void
-epoll_remove_fd (gint fd)
-{
- if (epoll_ctl (epoll_fd, EPOLL_CTL_DEL, fd, NULL) == -1)
- g_error ("epoll_remove_fd: epoll_ctl (EPOLL_CTL_DEL) failed, error (%d) %s", errno, g_strerror (errno));
-}
-
-static gint
-epoll_event_wait (void (*callback) (gint fd, gint events, gpointer user_data), gpointer user_data)
-{
- gint i, ready;
-
- memset (epoll_events, 0, sizeof (struct epoll_event) * EPOLL_NEVENTS);
-
- mono_gc_set_skip_thread (TRUE);
-
- MONO_ENTER_GC_SAFE;
- ready = epoll_wait (epoll_fd, epoll_events, EPOLL_NEVENTS, -1);
- MONO_EXIT_GC_SAFE;
-
- mono_gc_set_skip_thread (FALSE);
-
- if (ready == -1) {
- switch (errno) {
- case EINTR:
- mono_thread_internal_check_for_interruption_critical (mono_thread_internal_current ());
- ready = 0;
- break;
- default:
- g_error ("epoll_event_wait: epoll_wait () failed, error (%d) %s", errno, g_strerror (errno));
- break;
- }
- }
-
- if (ready == -1)
- return -1;
-
- for (i = 0; i < ready; ++i) {
- gint fd, events = 0;
-
- fd = epoll_events [i].data.fd;
- if (epoll_events [i].events & (EPOLLIN | EPOLLERR | EPOLLHUP))
- events |= EVENT_IN;
- if (epoll_events [i].events & (EPOLLOUT | EPOLLERR | EPOLLHUP))
- events |= EVENT_OUT;
-
- callback (fd, events, user_data);
- }
-
- return 0;
-}
-
-static ThreadPoolIOBackend backend_epoll = {
- .init = epoll_init,
- .register_fd = epoll_register_fd,
- .remove_fd = epoll_remove_fd,
- .event_wait = epoll_event_wait,
-};
-
-#endif
+++ /dev/null
-
-#if defined(HAVE_KQUEUE)
-
-#include <sys/types.h>
-#include <sys/event.h>
-#include <sys/time.h>
-
-#if defined(HOST_WIN32)
-/* We assume that kqueue is not available on windows */
-#error
-#endif
-
-#define KQUEUE_NEVENTS 128
-
-static gint kqueue_fd;
-static struct kevent *kqueue_events;
-
-static gint
-KQUEUE_INIT_FD (gint fd, gint events, gint flags)
-{
- struct kevent event;
- EV_SET (&event, fd, events, flags, 0, 0, 0);
- return kevent (kqueue_fd, &event, 1, NULL, 0, NULL);
-}
-
-static gboolean
-kqueue_init (gint wakeup_pipe_fd)
-{
- kqueue_fd = kqueue ();
- if (kqueue_fd == -1) {
- g_error ("kqueue_init: kqueue () failed, error (%d) %s", errno, g_strerror (errno));
- return FALSE;
- }
-
- if (KQUEUE_INIT_FD (wakeup_pipe_fd, EVFILT_READ, EV_ADD | EV_ENABLE) == -1) {
- g_error ("kqueue_init: kevent () failed, error (%d) %s", errno, g_strerror (errno));
- close (kqueue_fd);
- return FALSE;
- }
-
- kqueue_events = g_new0 (struct kevent, KQUEUE_NEVENTS);
-
- return TRUE;
-}
-
-static void
-kqueue_register_fd (gint fd, gint events, gboolean is_new)
-{
- if (events & EVENT_IN) {
- if (KQUEUE_INIT_FD (fd, EVFILT_READ, EV_ADD | EV_ENABLE) == -1)
- g_error ("kqueue_register_fd: kevent(read,enable) failed, error (%d) %s", errno, g_strerror (errno));
- } else {
- if (KQUEUE_INIT_FD (fd, EVFILT_READ, EV_ADD | EV_DISABLE) == -1)
- g_error ("kqueue_register_fd: kevent(read,disable) failed, error (%d) %s", errno, g_strerror (errno));
- }
- if (events & EVENT_OUT) {
- if (KQUEUE_INIT_FD (fd, EVFILT_WRITE, EV_ADD | EV_ENABLE) == -1)
- g_error ("kqueue_register_fd: kevent(write,enable) failed, error (%d) %s", errno, g_strerror (errno));
- } else {
- if (KQUEUE_INIT_FD (fd, EVFILT_WRITE, EV_ADD | EV_DISABLE) == -1)
- g_error ("kqueue_register_fd: kevent(write,disable) failed, error (%d) %s", errno, g_strerror (errno));
- }
-}
-
-static void
-kqueue_remove_fd (gint fd)
-{
- /* FIXME: a race between closing and adding operation in the Socket managed code trigger a ENOENT error */
- if (KQUEUE_INIT_FD (fd, EVFILT_READ, EV_DELETE) == -1)
- g_error ("kqueue_register_fd: kevent(read,delete) failed, error (%d) %s", errno, g_strerror (errno));
- if (KQUEUE_INIT_FD (fd, EVFILT_WRITE, EV_DELETE) == -1)
- g_error ("kqueue_register_fd: kevent(write,delete) failed, error (%d) %s", errno, g_strerror (errno));
-}
-
-static gint
-kqueue_event_wait (void (*callback) (gint fd, gint events, gpointer user_data), gpointer user_data)
-{
- gint i, ready;
-
- memset (kqueue_events, 0, sizeof (struct kevent) * KQUEUE_NEVENTS);
-
- mono_gc_set_skip_thread (TRUE);
-
- MONO_ENTER_GC_SAFE;
- ready = kevent (kqueue_fd, NULL, 0, kqueue_events, KQUEUE_NEVENTS, NULL);
- MONO_EXIT_GC_SAFE;
-
- mono_gc_set_skip_thread (FALSE);
-
- if (ready == -1) {
- switch (errno) {
- case EINTR:
- mono_thread_internal_check_for_interruption_critical (mono_thread_internal_current ());
- ready = 0;
- break;
- default:
- g_error ("kqueue_event_wait: kevent () failed, error (%d) %s", errno, g_strerror (errno));
- break;
- }
- }
-
- if (ready == -1)
- return -1;
-
- for (i = 0; i < ready; ++i) {
- gint fd, events = 0;
-
- fd = kqueue_events [i].ident;
- if (kqueue_events [i].filter == EVFILT_READ || (kqueue_events [i].flags & EV_ERROR) != 0)
- events |= EVENT_IN;
- if (kqueue_events [i].filter == EVFILT_WRITE || (kqueue_events [i].flags & EV_ERROR) != 0)
- events |= EVENT_OUT;
-
- callback (fd, events, user_data);
- }
-
- return 0;
-}
-
-static ThreadPoolIOBackend backend_kqueue = {
- .init = kqueue_init,
- .register_fd = kqueue_register_fd,
- .remove_fd = kqueue_remove_fd,
- .event_wait = kqueue_event_wait,
-};
-
-#endif
+++ /dev/null
-
-#include "utils/mono-poll.h"
-
-static mono_pollfd *poll_fds;
-static guint poll_fds_capacity;
-static guint poll_fds_size;
-
-static inline void
-POLL_INIT_FD (mono_pollfd *poll_fd, gint fd, gint events)
-{
- poll_fd->fd = fd;
- poll_fd->events = events;
- poll_fd->revents = 0;
-}
-
-static gboolean
-poll_init (gint wakeup_pipe_fd)
-{
- g_assert (wakeup_pipe_fd >= 0);
-
- poll_fds_size = 1;
- poll_fds_capacity = 64;
-
- poll_fds = g_new0 (mono_pollfd, poll_fds_capacity);
-
- POLL_INIT_FD (&poll_fds [0], wakeup_pipe_fd, MONO_POLLIN);
-
- return TRUE;
-}
-
-static void
-poll_register_fd (gint fd, gint events, gboolean is_new)
-{
- gint i;
- gint poll_event;
-
- g_assert (fd >= 0);
- g_assert (poll_fds_size <= poll_fds_capacity);
-
- g_assert ((events & ~(EVENT_IN | EVENT_OUT)) == 0);
-
- poll_event = 0;
- if (events & EVENT_IN)
- poll_event |= MONO_POLLIN;
- if (events & EVENT_OUT)
- poll_event |= MONO_POLLOUT;
-
- for (i = 0; i < poll_fds_size; ++i) {
- if (poll_fds [i].fd == fd) {
- g_assert (!is_new);
- POLL_INIT_FD (&poll_fds [i], fd, poll_event);
- return;
- }
- }
-
- g_assert (is_new);
-
- for (i = 0; i < poll_fds_size; ++i) {
- if (poll_fds [i].fd == -1) {
- POLL_INIT_FD (&poll_fds [i], fd, poll_event);
- return;
- }
- }
-
- poll_fds_size += 1;
-
- if (poll_fds_size > poll_fds_capacity) {
- poll_fds_capacity *= 2;
- g_assert (poll_fds_size <= poll_fds_capacity);
-
- poll_fds = (mono_pollfd *)g_renew (mono_pollfd, poll_fds, poll_fds_capacity);
- }
-
- POLL_INIT_FD (&poll_fds [poll_fds_size - 1], fd, poll_event);
-}
-
-static void
-poll_remove_fd (gint fd)
-{
- gint i;
-
- g_assert (fd >= 0);
-
- for (i = 0; i < poll_fds_size; ++i) {
- if (poll_fds [i].fd == fd) {
- POLL_INIT_FD (&poll_fds [i], -1, 0);
- break;
- }
- }
-
- /* if we don't find the fd in poll_fds,
- * it means we try to delete it twice */
- g_assert (i < poll_fds_size);
-
- /* if we find it again, it means we added
- * it twice */
- for (; i < poll_fds_size; ++i)
- g_assert (poll_fds [i].fd != fd);
-
- /* reduce the value of poll_fds_size so we
- * do not keep it too big */
- while (poll_fds_size > 1 && poll_fds [poll_fds_size - 1].fd == -1)
- poll_fds_size -= 1;
-}
-
-static inline gint
-poll_mark_bad_fds (mono_pollfd *poll_fds, gint poll_fds_size)
-{
- gint i, ready = 0;
-
- for (i = 0; i < poll_fds_size; i++) {
- if (poll_fds [i].fd == -1)
- continue;
-
- switch (mono_poll (&poll_fds [i], 1, 0)) {
- case 1:
- ready++;
- break;
- case -1:
- if (errno == EBADF)
- {
- poll_fds [i].revents |= MONO_POLLNVAL;
- ready++;
- }
- break;
- }
- }
-
- return ready;
-}
-
-static gint
-poll_event_wait (void (*callback) (gint fd, gint events, gpointer user_data), gpointer user_data)
-{
- gint i, ready;
-
- for (i = 0; i < poll_fds_size; ++i)
- poll_fds [i].revents = 0;
-
- mono_gc_set_skip_thread (TRUE);
-
- MONO_ENTER_GC_SAFE;
- ready = mono_poll (poll_fds, poll_fds_size, -1);
- MONO_EXIT_GC_SAFE;
-
- mono_gc_set_skip_thread (FALSE);
-
- if (ready == -1) {
- /*
- * Apart from EINTR, we only check EBADF, for the rest:
- * EINVAL: mono_poll() 'protects' us from descriptor
- * numbers above the limit if using select() by marking
- * then as POLLERR. If a system poll() is being
- * used, the number of descriptor we're passing will not
- * be over sysconf(_SC_OPEN_MAX), as the error would have
- * happened when opening.
- *
- * EFAULT: we own the memory pointed by pfds.
- * ENOMEM: we're doomed anyway
- *
- */
- switch (errno)
- {
- case EINTR:
- {
- mono_thread_internal_check_for_interruption_critical (mono_thread_internal_current ());
- ready = 0;
- break;
- }
- case EBADF:
- {
- ready = poll_mark_bad_fds (poll_fds, poll_fds_size);
- break;
- }
- default:
- g_error ("poll_event_wait: mono_poll () failed, error (%d) %s", errno, g_strerror (errno));
- break;
- }
- }
-
- if (ready == -1)
- return -1;
- if (ready == 0)
- return 0;
-
- g_assert (ready > 0);
-
- for (i = 0; i < poll_fds_size; ++i) {
- gint fd, events = 0;
-
- if (poll_fds [i].fd == -1)
- continue;
- if (poll_fds [i].revents == 0)
- continue;
-
- fd = poll_fds [i].fd;
- if (poll_fds [i].revents & (MONO_POLLIN | MONO_POLLERR | MONO_POLLHUP | MONO_POLLNVAL))
- events |= EVENT_IN;
- if (poll_fds [i].revents & (MONO_POLLOUT | MONO_POLLERR | MONO_POLLHUP | MONO_POLLNVAL))
- events |= EVENT_OUT;
- if (poll_fds [i].revents & (MONO_POLLERR | MONO_POLLHUP | MONO_POLLNVAL))
- events |= EVENT_ERR;
-
- callback (fd, events, user_data);
-
- if (--ready == 0)
- break;
- }
-
- return 0;
-}
-
-static ThreadPoolIOBackend backend_poll = {
- .init = poll_init,
- .register_fd = poll_register_fd,
- .remove_fd = poll_remove_fd,
- .event_wait = poll_event_wait,
-};
+++ /dev/null
-/*
- * threadpool-ms-io.c: Microsoft IO threadpool runtime support
- *
- * Author:
- * Ludovic Henry (ludovic.henry@xamarin.com)
- *
- * Copyright 2015 Xamarin, Inc (http://www.xamarin.com)
- * Licensed under the MIT license. See LICENSE file in the project root for full license information.
- */
-
-#include <config.h>
-
-#ifndef DISABLE_SOCKETS
-
-#include <glib.h>
-
-#if defined(HOST_WIN32)
-#include <windows.h>
-#else
-#include <errno.h>
-#include <fcntl.h>
-#endif
-
-#include <mono/metadata/gc-internals.h>
-#include <mono/metadata/mono-mlist.h>
-#include <mono/metadata/threadpool-ms.h>
-#include <mono/metadata/threadpool-ms-io.h>
-#include <mono/utils/atomic.h>
-#include <mono/utils/mono-threads.h>
-#include <mono/utils/mono-lazy-init.h>
-#include <mono/utils/mono-logger-internals.h>
-
-typedef struct {
- gboolean (*init) (gint wakeup_pipe_fd);
- void (*register_fd) (gint fd, gint events, gboolean is_new);
- void (*remove_fd) (gint fd);
- gint (*event_wait) (void (*callback) (gint fd, gint events, gpointer user_data), gpointer user_data);
-} ThreadPoolIOBackend;
-
-/* Keep in sync with System.IOOperation in mcs/class/System/System/IOSelector.cs */
-enum MonoIOOperation {
- EVENT_IN = 1 << 0,
- EVENT_OUT = 1 << 1,
- EVENT_ERR = 1 << 2, /* not in managed */
-};
-
-#include "threadpool-ms-io-epoll.c"
-#include "threadpool-ms-io-kqueue.c"
-#include "threadpool-ms-io-poll.c"
-
-#define UPDATES_CAPACITY 128
-
-/* Keep in sync with System.IOSelectorJob in mcs/class/System/System/IOSelector.cs */
-struct _MonoIOSelectorJob {
- MonoObject object;
- gint32 operation;
- MonoObject *callback;
- MonoObject *state;
-};
-
-typedef enum {
- UPDATE_EMPTY = 0,
- UPDATE_ADD,
- UPDATE_REMOVE_SOCKET,
- UPDATE_REMOVE_DOMAIN,
-} ThreadPoolIOUpdateType;
-
-typedef struct {
- gint fd;
- MonoIOSelectorJob *job;
-} ThreadPoolIOUpdate_Add;
-
-typedef struct {
- gint fd;
-} ThreadPoolIOUpdate_RemoveSocket;
-
-typedef struct {
- MonoDomain *domain;
-} ThreadPoolIOUpdate_RemoveDomain;
-
-typedef struct {
- ThreadPoolIOUpdateType type;
- union {
- ThreadPoolIOUpdate_Add add;
- ThreadPoolIOUpdate_RemoveSocket remove_socket;
- ThreadPoolIOUpdate_RemoveDomain remove_domain;
- } data;
-} ThreadPoolIOUpdate;
-
-typedef struct {
- ThreadPoolIOBackend backend;
-
- ThreadPoolIOUpdate updates [UPDATES_CAPACITY];
- gint updates_size;
- MonoCoopMutex updates_lock;
- MonoCoopCond updates_cond;
-
-#if !defined(HOST_WIN32)
- gint wakeup_pipes [2];
-#else
- SOCKET wakeup_pipes [2];
-#endif
-} ThreadPoolIO;
-
-static mono_lazy_init_t io_status = MONO_LAZY_INIT_STATUS_NOT_INITIALIZED;
-
-static gboolean io_selector_running = FALSE;
-
-static ThreadPoolIO* threadpool_io;
-
-static MonoIOSelectorJob*
-get_job_for_event (MonoMList **list, gint32 event)
-{
- MonoMList *current;
-
- g_assert (list);
-
- for (current = *list; current; current = mono_mlist_next (current)) {
- MonoIOSelectorJob *job = (MonoIOSelectorJob*) mono_mlist_get_data (current);
- if (job->operation == event) {
- *list = mono_mlist_remove_item (*list, current);
- return job;
- }
- }
-
- return NULL;
-}
-
-static gint
-get_operations_for_jobs (MonoMList *list)
-{
- MonoMList *current;
- gint operations = 0;
-
- for (current = list; current; current = mono_mlist_next (current))
- operations |= ((MonoIOSelectorJob*) mono_mlist_get_data (current))->operation;
-
- return operations;
-}
-
-static void
-selector_thread_wakeup (void)
-{
- gchar msg = 'c';
- gint written;
-
- for (;;) {
-#if !defined(HOST_WIN32)
- written = write (threadpool_io->wakeup_pipes [1], &msg, 1);
- if (written == 1)
- break;
- if (written == -1) {
- g_warning ("selector_thread_wakeup: write () failed, error (%d) %s\n", errno, g_strerror (errno));
- break;
- }
-#else
- written = send (threadpool_io->wakeup_pipes [1], &msg, 1, 0);
- if (written == 1)
- break;
- if (written == SOCKET_ERROR) {
- g_warning ("selector_thread_wakeup: write () failed, error (%d)\n", WSAGetLastError ());
- break;
- }
-#endif
- }
-}
-
-static void
-selector_thread_wakeup_drain_pipes (void)
-{
- gchar buffer [128];
- gint received;
-
- for (;;) {
-#if !defined(HOST_WIN32)
- received = read (threadpool_io->wakeup_pipes [0], buffer, sizeof (buffer));
- if (received == 0)
- break;
- if (received == -1) {
- if (errno != EINTR && errno != EAGAIN)
- g_warning ("selector_thread_wakeup_drain_pipes: read () failed, error (%d) %s\n", errno, g_strerror (errno));
- break;
- }
-#else
- received = recv (threadpool_io->wakeup_pipes [0], buffer, sizeof (buffer), 0);
- if (received == 0)
- break;
- if (received == SOCKET_ERROR) {
- if (WSAGetLastError () != WSAEINTR && WSAGetLastError () != WSAEWOULDBLOCK)
- g_warning ("selector_thread_wakeup_drain_pipes: recv () failed, error (%d) %s\n", WSAGetLastError ());
- break;
- }
-#endif
- }
-}
-
-typedef struct {
- MonoDomain *domain;
- MonoGHashTable *states;
-} FilterSockaresForDomainData;
-
-static void
-filter_jobs_for_domain (gpointer key, gpointer value, gpointer user_data)
-{
- FilterSockaresForDomainData *data;
- MonoMList *list = (MonoMList *)value, *element;
- MonoDomain *domain;
- MonoGHashTable *states;
-
- g_assert (user_data);
- data = (FilterSockaresForDomainData *)user_data;
- domain = data->domain;
- states = data->states;
-
- for (element = list; element; element = mono_mlist_next (element)) {
- MonoIOSelectorJob *job = (MonoIOSelectorJob*) mono_mlist_get_data (element);
- if (mono_object_domain (job) == domain)
- mono_mlist_set_data (element, NULL);
- }
-
- /* we skip all the first elements which are NULL */
- for (; list; list = mono_mlist_next (list)) {
- if (mono_mlist_get_data (list))
- break;
- }
-
- if (list) {
- g_assert (mono_mlist_get_data (list));
-
- /* we delete all the NULL elements after the first one */
- for (element = list; element;) {
- MonoMList *next;
- if (!(next = mono_mlist_next (element)))
- break;
- if (mono_mlist_get_data (next))
- element = next;
- else
- mono_mlist_set_next (element, mono_mlist_next (next));
- }
- }
-
- mono_g_hash_table_replace (states, key, list);
-}
-
-static void
-wait_callback (gint fd, gint events, gpointer user_data)
-{
- MonoError error;
-
- if (mono_runtime_is_shutting_down ())
- return;
-
- if (fd == threadpool_io->wakeup_pipes [0]) {
- mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: wke");
- selector_thread_wakeup_drain_pipes ();
- } else {
- MonoGHashTable *states;
- MonoMList *list = NULL;
- gpointer k;
- gboolean remove_fd = FALSE;
- gint operations;
-
- g_assert (user_data);
- states = (MonoGHashTable *)user_data;
-
- mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: cal fd %3d, events = %2s | %2s | %3s",
- fd, (events & EVENT_IN) ? "RD" : "..", (events & EVENT_OUT) ? "WR" : "..", (events & EVENT_ERR) ? "ERR" : "...");
-
- if (!mono_g_hash_table_lookup_extended (states, GINT_TO_POINTER (fd), &k, (gpointer*) &list))
- g_error ("wait_callback: fd %d not found in states table", fd);
-
- if (list && (events & EVENT_IN) != 0) {
- MonoIOSelectorJob *job = get_job_for_event (&list, EVENT_IN);
- if (job) {
- mono_threadpool_ms_enqueue_work_item (((MonoObject*) job)->vtable->domain, (MonoObject*) job, &error);
- mono_error_assert_ok (&error);
- }
-
- }
- if (list && (events & EVENT_OUT) != 0) {
- MonoIOSelectorJob *job = get_job_for_event (&list, EVENT_OUT);
- if (job) {
- mono_threadpool_ms_enqueue_work_item (((MonoObject*) job)->vtable->domain, (MonoObject*) job, &error);
- mono_error_assert_ok (&error);
- }
- }
-
- remove_fd = (events & EVENT_ERR) == EVENT_ERR;
- if (!remove_fd) {
- mono_g_hash_table_replace (states, GINT_TO_POINTER (fd), list);
-
- operations = get_operations_for_jobs (list);
-
- mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: res fd %3d, events = %2s | %2s | %3s",
- fd, (operations & EVENT_IN) ? "RD" : "..", (operations & EVENT_OUT) ? "WR" : "..", (operations & EVENT_ERR) ? "ERR" : "...");
-
- threadpool_io->backend.register_fd (fd, operations, FALSE);
- } else {
- mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: err fd %d", fd);
-
- mono_g_hash_table_remove (states, GINT_TO_POINTER (fd));
-
- threadpool_io->backend.remove_fd (fd);
- }
- }
-}
-
-static void
-selector_thread (gpointer data)
-{
- MonoError error;
- MonoGHashTable *states;
-
- io_selector_running = TRUE;
-
- if (mono_runtime_is_shutting_down ()) {
- io_selector_running = FALSE;
- return;
- }
-
- states = mono_g_hash_table_new_type (g_direct_hash, g_direct_equal, MONO_HASH_VALUE_GC, MONO_ROOT_SOURCE_THREAD_POOL, "i/o thread pool states table");
-
- for (;;) {
- gint i, j;
- gint res;
-
- mono_coop_mutex_lock (&threadpool_io->updates_lock);
-
- for (i = 0; i < threadpool_io->updates_size; ++i) {
- ThreadPoolIOUpdate *update = &threadpool_io->updates [i];
-
- switch (update->type) {
- case UPDATE_EMPTY:
- break;
- case UPDATE_ADD: {
- gint fd;
- gint operations;
- gpointer k;
- gboolean exists;
- MonoMList *list = NULL;
- MonoIOSelectorJob *job;
-
- fd = update->data.add.fd;
- g_assert (fd >= 0);
-
- job = update->data.add.job;
- g_assert (job);
-
- exists = mono_g_hash_table_lookup_extended (states, GINT_TO_POINTER (fd), &k, (gpointer*) &list);
- list = mono_mlist_append_checked (list, (MonoObject*) job, &error);
- mono_error_assert_ok (&error);
- mono_g_hash_table_replace (states, GINT_TO_POINTER (fd), list);
-
- operations = get_operations_for_jobs (list);
-
- mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: %3s fd %3d, operations = %2s | %2s | %3s",
- exists ? "mod" : "add", fd, (operations & EVENT_IN) ? "RD" : "..", (operations & EVENT_OUT) ? "WR" : "..", (operations & EVENT_ERR) ? "ERR" : "...");
-
- threadpool_io->backend.register_fd (fd, operations, !exists);
-
- break;
- }
- case UPDATE_REMOVE_SOCKET: {
- gint fd;
- gpointer k;
- MonoMList *list = NULL;
-
- fd = update->data.remove_socket.fd;
- g_assert (fd >= 0);
-
- if (mono_g_hash_table_lookup_extended (states, GINT_TO_POINTER (fd), &k, (gpointer*) &list)) {
- mono_g_hash_table_remove (states, GINT_TO_POINTER (fd));
-
- for (j = i + 1; j < threadpool_io->updates_size; ++j) {
- ThreadPoolIOUpdate *update = &threadpool_io->updates [j];
- if (update->type == UPDATE_ADD && update->data.add.fd == fd)
- memset (update, 0, sizeof (ThreadPoolIOUpdate));
- }
-
- for (; list; list = mono_mlist_remove_item (list, list)) {
- mono_threadpool_ms_enqueue_work_item (mono_object_domain (mono_mlist_get_data (list)), mono_mlist_get_data (list), &error);
- mono_error_assert_ok (&error);
- }
-
- mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: del fd %3d", fd);
- threadpool_io->backend.remove_fd (fd);
- }
-
- break;
- }
- case UPDATE_REMOVE_DOMAIN: {
- MonoDomain *domain;
-
- domain = update->data.remove_domain.domain;
- g_assert (domain);
-
- FilterSockaresForDomainData user_data = { .domain = domain, .states = states };
- mono_g_hash_table_foreach (states, filter_jobs_for_domain, &user_data);
-
- for (j = i + 1; j < threadpool_io->updates_size; ++j) {
- ThreadPoolIOUpdate *update = &threadpool_io->updates [j];
- if (update->type == UPDATE_ADD && mono_object_domain (update->data.add.job) == domain)
- memset (update, 0, sizeof (ThreadPoolIOUpdate));
- }
-
- break;
- }
- default:
- g_assert_not_reached ();
- }
- }
-
- mono_coop_cond_broadcast (&threadpool_io->updates_cond);
-
- if (threadpool_io->updates_size > 0) {
- threadpool_io->updates_size = 0;
- memset (&threadpool_io->updates, 0, UPDATES_CAPACITY * sizeof (ThreadPoolIOUpdate));
- }
-
- mono_coop_mutex_unlock (&threadpool_io->updates_lock);
-
- mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: wai");
-
- res = threadpool_io->backend.event_wait (wait_callback, states);
-
- if (res == -1 || mono_runtime_is_shutting_down ())
- break;
- }
-
- mono_g_hash_table_destroy (states);
-
- io_selector_running = FALSE;
-}
-
-/* Locking: threadpool_io->updates_lock must be held */
-static ThreadPoolIOUpdate*
-update_get_new (void)
-{
- ThreadPoolIOUpdate *update = NULL;
- g_assert (threadpool_io->updates_size <= UPDATES_CAPACITY);
-
- while (threadpool_io->updates_size == UPDATES_CAPACITY) {
- /* we wait for updates to be applied in the selector_thread and we loop
- * as long as none are available. if it happends too much, then we need
- * to increase UPDATES_CAPACITY */
- mono_coop_cond_wait (&threadpool_io->updates_cond, &threadpool_io->updates_lock);
- }
-
- g_assert (threadpool_io->updates_size < UPDATES_CAPACITY);
-
- update = &threadpool_io->updates [threadpool_io->updates_size ++];
-
- return update;
-}
-
-static void
-wakeup_pipes_init (void)
-{
-#if !defined(HOST_WIN32)
- if (pipe (threadpool_io->wakeup_pipes) == -1)
- g_error ("wakeup_pipes_init: pipe () failed, error (%d) %s\n", errno, g_strerror (errno));
- if (fcntl (threadpool_io->wakeup_pipes [0], F_SETFL, O_NONBLOCK) == -1)
- g_error ("wakeup_pipes_init: fcntl () failed, error (%d) %s\n", errno, g_strerror (errno));
-#else
- struct sockaddr_in client;
- struct sockaddr_in server;
- SOCKET server_sock;
- gulong arg;
- gint size;
-
- server_sock = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
- g_assert (server_sock != INVALID_SOCKET);
- threadpool_io->wakeup_pipes [1] = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
- g_assert (threadpool_io->wakeup_pipes [1] != INVALID_SOCKET);
-
- server.sin_family = AF_INET;
- server.sin_addr.s_addr = inet_addr ("127.0.0.1");
- server.sin_port = 0;
- if (bind (server_sock, (SOCKADDR*) &server, sizeof (server)) == SOCKET_ERROR) {
- closesocket (server_sock);
- g_error ("wakeup_pipes_init: bind () failed, error (%d)\n", WSAGetLastError ());
- }
-
- size = sizeof (server);
- if (getsockname (server_sock, (SOCKADDR*) &server, &size) == SOCKET_ERROR) {
- closesocket (server_sock);
- g_error ("wakeup_pipes_init: getsockname () failed, error (%d)\n", WSAGetLastError ());
- }
- if (listen (server_sock, 1024) == SOCKET_ERROR) {
- closesocket (server_sock);
- g_error ("wakeup_pipes_init: listen () failed, error (%d)\n", WSAGetLastError ());
- }
- if (connect ((SOCKET) threadpool_io->wakeup_pipes [1], (SOCKADDR*) &server, sizeof (server)) == SOCKET_ERROR) {
- closesocket (server_sock);
- g_error ("wakeup_pipes_init: connect () failed, error (%d)\n", WSAGetLastError ());
- }
-
- size = sizeof (client);
- threadpool_io->wakeup_pipes [0] = accept (server_sock, (SOCKADDR *) &client, &size);
- g_assert (threadpool_io->wakeup_pipes [0] != INVALID_SOCKET);
-
- arg = 1;
- if (ioctlsocket (threadpool_io->wakeup_pipes [0], FIONBIO, &arg) == SOCKET_ERROR) {
- closesocket (threadpool_io->wakeup_pipes [0]);
- closesocket (server_sock);
- g_error ("wakeup_pipes_init: ioctlsocket () failed, error (%d)\n", WSAGetLastError ());
- }
-
- closesocket (server_sock);
-#endif
-}
-
-static void
-initialize (void)
-{
- g_assert (!threadpool_io);
- threadpool_io = g_new0 (ThreadPoolIO, 1);
- g_assert (threadpool_io);
-
- mono_coop_mutex_init (&threadpool_io->updates_lock);
- mono_coop_cond_init (&threadpool_io->updates_cond);
- mono_gc_register_root ((char *)&threadpool_io->updates [0], sizeof (threadpool_io->updates), MONO_GC_DESCRIPTOR_NULL, MONO_ROOT_SOURCE_THREAD_POOL, "i/o thread pool updates list");
-
- threadpool_io->updates_size = 0;
-
- threadpool_io->backend = backend_poll;
- if (g_getenv ("MONO_ENABLE_AIO") != NULL) {
-#if defined(HAVE_EPOLL)
- threadpool_io->backend = backend_epoll;
-#elif defined(HAVE_KQUEUE)
- threadpool_io->backend = backend_kqueue;
-#endif
- }
-
- wakeup_pipes_init ();
-
- if (!threadpool_io->backend.init (threadpool_io->wakeup_pipes [0]))
- g_error ("initialize: backend->init () failed");
-
- MonoError error;
- if (!mono_thread_create_internal (mono_get_root_domain (), selector_thread, NULL, TRUE, SMALL_STACK, &error))
- g_error ("initialize: mono_thread_create_internal () failed due to %s", mono_error_get_message (&error));
-}
-
-static void
-cleanup (void)
-{
- /* we make the assumption along the code that we are
- * cleaning up only if the runtime is shutting down */
- g_assert (mono_runtime_is_shutting_down ());
-
- selector_thread_wakeup ();
- while (io_selector_running)
- mono_thread_info_usleep (1000);
-}
-
-void
-mono_threadpool_ms_io_cleanup (void)
-{
- mono_lazy_cleanup (&io_status, cleanup);
-}
-
-void
-ves_icall_System_IOSelector_Add (gpointer handle, MonoIOSelectorJob *job)
-{
- ThreadPoolIOUpdate *update;
-
- g_assert (handle);
-
- g_assert ((job->operation == EVENT_IN) ^ (job->operation == EVENT_OUT));
- g_assert (job->callback);
-
- if (mono_runtime_is_shutting_down ())
- return;
- if (mono_domain_is_unloading (mono_object_domain (job)))
- return;
-
- mono_lazy_initialize (&io_status, initialize);
-
- mono_coop_mutex_lock (&threadpool_io->updates_lock);
-
- update = update_get_new ();
- update->type = UPDATE_ADD;
- update->data.add.fd = GPOINTER_TO_INT (handle);
- update->data.add.job = job;
- mono_memory_barrier (); /* Ensure this is safely published before we wake up the selector */
-
- selector_thread_wakeup ();
-
- mono_coop_mutex_unlock (&threadpool_io->updates_lock);
-}
-
-void
-ves_icall_System_IOSelector_Remove (gpointer handle)
-{
- mono_threadpool_ms_io_remove_socket (GPOINTER_TO_INT (handle));
-}
-
-void
-mono_threadpool_ms_io_remove_socket (int fd)
-{
- ThreadPoolIOUpdate *update;
-
- if (!mono_lazy_is_initialized (&io_status))
- return;
-
- mono_coop_mutex_lock (&threadpool_io->updates_lock);
-
- update = update_get_new ();
- update->type = UPDATE_REMOVE_SOCKET;
- update->data.add.fd = fd;
- mono_memory_barrier (); /* Ensure this is safely published before we wake up the selector */
-
- selector_thread_wakeup ();
-
- mono_coop_cond_wait (&threadpool_io->updates_cond, &threadpool_io->updates_lock);
-
- mono_coop_mutex_unlock (&threadpool_io->updates_lock);
-}
-
-void
-mono_threadpool_ms_io_remove_domain_jobs (MonoDomain *domain)
-{
- ThreadPoolIOUpdate *update;
-
- if (!mono_lazy_is_initialized (&io_status))
- return;
-
- mono_coop_mutex_lock (&threadpool_io->updates_lock);
-
- update = update_get_new ();
- update->type = UPDATE_REMOVE_DOMAIN;
- update->data.remove_domain.domain = domain;
- mono_memory_barrier (); /* Ensure this is safely published before we wake up the selector */
-
- selector_thread_wakeup ();
-
- mono_coop_cond_wait (&threadpool_io->updates_cond, &threadpool_io->updates_lock);
-
- mono_coop_mutex_unlock (&threadpool_io->updates_lock);
-}
-
-#else
-
-void
-ves_icall_System_IOSelector_Add (gpointer handle, MonoIOSelectorJob *job)
-{
- g_assert_not_reached ();
-}
-
-void
-ves_icall_System_IOSelector_Remove (gpointer handle)
-{
- g_assert_not_reached ();
-}
-
-void
-mono_threadpool_ms_io_cleanup (void)
-{
- g_assert_not_reached ();
-}
-
-void
-mono_threadpool_ms_io_remove_socket (int fd)
-{
- g_assert_not_reached ();
-}
-
-void
-mono_threadpool_ms_io_remove_domain_jobs (MonoDomain *domain)
-{
- g_assert_not_reached ();
-}
-
-#endif
+++ /dev/null
-#ifndef _MONO_THREADPOOL_MS_IO_H_
-#define _MONO_THREADPOOL_MS_IO_H_
-
-#include <config.h>
-#include <glib.h>
-
-#include <mono/metadata/object-internals.h>
-#include <mono/metadata/socket-io.h>
-
-typedef struct _MonoIOSelectorJob MonoIOSelectorJob;
-
-void
-ves_icall_System_IOSelector_Add (gpointer handle, MonoIOSelectorJob *job);
-
-void
-ves_icall_System_IOSelector_Remove (gpointer handle);
-
-void
-mono_threadpool_ms_io_remove_socket (int fd);
-void
-mono_threadpool_ms_io_remove_domain_jobs (MonoDomain *domain);
-void
-mono_threadpool_ms_io_cleanup (void);
-
-#endif /* _MONO_THREADPOOL_MS_IO_H_ */
+++ /dev/null
-/*
- * threadpool-ms.c: Microsoft threadpool runtime support
- *
- * Author:
- * Ludovic Henry (ludovic.henry@xamarin.com)
- *
- * Copyright 2015 Xamarin, Inc (http://www.xamarin.com)
- * Licensed under the MIT license. See LICENSE file in the project root for full license information.
- */
-
-//
-// Copyright (c) Microsoft. All rights reserved.
-// Licensed under the MIT license. See LICENSE file in the project root for full license information.
-//
-// Files:
-// - src/vm/comthreadpool.cpp
-// - src/vm/win32threadpoolcpp
-// - src/vm/threadpoolrequest.cpp
-// - src/vm/hillclimbing.cpp
-//
-// Ported from C++ to C and adjusted to Mono runtime
-
-#include <stdlib.h>
-#define _USE_MATH_DEFINES // needed by MSVC to define math constants
-#include <math.h>
-#include <config.h>
-#include <glib.h>
-
-#include <mono/metadata/class-internals.h>
-#include <mono/metadata/exception.h>
-#include <mono/metadata/gc-internals.h>
-#include <mono/metadata/object.h>
-#include <mono/metadata/object-internals.h>
-#include <mono/metadata/threadpool-ms.h>
-#include <mono/metadata/threadpool-ms-io.h>
-#include <mono/metadata/w32event.h>
-#include <mono/utils/atomic.h>
-#include <mono/utils/mono-compiler.h>
-#include <mono/utils/mono-complex.h>
-#include <mono/utils/mono-lazy-init.h>
-#include <mono/utils/mono-logger.h>
-#include <mono/utils/mono-logger-internals.h>
-#include <mono/utils/mono-proclib.h>
-#include <mono/utils/mono-threads.h>
-#include <mono/utils/mono-time.h>
-#include <mono/utils/mono-rand.h>
-#include <mono/io-layer/io-layer.h>
-
-#define CPU_USAGE_LOW 80
-#define CPU_USAGE_HIGH 95
-
-#define MONITOR_INTERVAL 500 // ms
-#define MONITOR_MINIMAL_LIFETIME 60 * 1000 // ms
-
-#define WORKER_CREATION_MAX_PER_SEC 10
-
-/* The exponent to apply to the gain. 1.0 means to use linear gain,
- * higher values will enhance large moves and damp small ones.
- * default: 2.0 */
-#define HILL_CLIMBING_GAIN_EXPONENT 2.0
-
-/* The 'cost' of a thread. 0 means drive for increased throughput regardless
- * of thread count, higher values bias more against higher thread counts.
- * default: 0.15 */
-#define HILL_CLIMBING_BIAS 0.15
-
-#define HILL_CLIMBING_WAVE_PERIOD 4
-#define HILL_CLIMBING_MAX_WAVE_MAGNITUDE 20
-#define HILL_CLIMBING_WAVE_MAGNITUDE_MULTIPLIER 1.0
-#define HILL_CLIMBING_WAVE_HISTORY_SIZE 8
-#define HILL_CLIMBING_TARGET_SIGNAL_TO_NOISE_RATIO 3.0
-#define HILL_CLIMBING_MAX_CHANGE_PER_SECOND 4
-#define HILL_CLIMBING_MAX_CHANGE_PER_SAMPLE 20
-#define HILL_CLIMBING_SAMPLE_INTERVAL_LOW 10
-#define HILL_CLIMBING_SAMPLE_INTERVAL_HIGH 200
-#define HILL_CLIMBING_ERROR_SMOOTHING_FACTOR 0.01
-#define HILL_CLIMBING_MAX_SAMPLE_ERROR_PERCENT 0.15
-
-typedef union {
- struct {
- gint16 max_working; /* determined by heuristic */
- gint16 active; /* executing worker_thread */
- gint16 working; /* actively executing worker_thread, not parked */
- gint16 parked; /* parked */
- } _;
- gint64 as_gint64;
-} ThreadPoolCounter;
-
-typedef struct {
- MonoDomain *domain;
- /* Number of outstanding jobs */
- gint32 outstanding_request;
- /* Number of currently executing jobs */
- int threadpool_jobs;
- /* Signalled when threadpool_jobs + outstanding_request is 0 */
- /* Protected by threadpool->domains_lock */
- MonoCoopCond cleanup_cond;
-} ThreadPoolDomain;
-
-typedef MonoInternalThread ThreadPoolWorkingThread;
-
-typedef struct {
- gint32 wave_period;
- gint32 samples_to_measure;
- gdouble target_throughput_ratio;
- gdouble target_signal_to_noise_ratio;
- gdouble max_change_per_second;
- gdouble max_change_per_sample;
- gint32 max_thread_wave_magnitude;
- gint32 sample_interval_low;
- gdouble thread_magnitude_multiplier;
- gint32 sample_interval_high;
- gdouble throughput_error_smoothing_factor;
- gdouble gain_exponent;
- gdouble max_sample_error;
-
- gdouble current_control_setting;
- gint64 total_samples;
- gint16 last_thread_count;
- gdouble elapsed_since_last_change;
- gdouble completions_since_last_change;
-
- gdouble average_throughput_noise;
-
- gdouble *samples;
- gdouble *thread_counts;
-
- guint32 current_sample_interval;
- gpointer random_interval_generator;
-
- gint32 accumulated_completion_count;
- gdouble accumulated_sample_duration;
-} ThreadPoolHillClimbing;
-
-typedef struct {
- ThreadPoolCounter counters;
-
- GPtrArray *domains; // ThreadPoolDomain* []
- MonoCoopMutex domains_lock;
-
- GPtrArray *working_threads; // ThreadPoolWorkingThread* []
- gint32 parked_threads_count;
- MonoCoopCond parked_threads_cond;
- MonoCoopMutex active_threads_lock; /* protect access to working_threads and parked_threads */
-
- guint32 worker_creation_current_second;
- guint32 worker_creation_current_count;
- MonoCoopMutex worker_creation_lock;
-
- gint32 heuristic_completions;
- gint64 heuristic_sample_start;
- gint64 heuristic_last_dequeue; // ms
- gint64 heuristic_last_adjustment; // ms
- gint64 heuristic_adjustment_interval; // ms
- ThreadPoolHillClimbing heuristic_hill_climbing;
- MonoCoopMutex heuristic_lock;
-
- gint32 limit_worker_min;
- gint32 limit_worker_max;
- gint32 limit_io_min;
- gint32 limit_io_max;
-
- MonoCpuUsageState *cpu_usage_state;
- gint32 cpu_usage;
-
- /* suspended by the debugger */
- gboolean suspended;
-} ThreadPool;
-
-typedef enum {
- TRANSITION_WARMUP,
- TRANSITION_INITIALIZING,
- TRANSITION_RANDOM_MOVE,
- TRANSITION_CLIMBING_MOVE,
- TRANSITION_CHANGE_POINT,
- TRANSITION_STABILIZING,
- TRANSITION_STARVATION,
- TRANSITION_THREAD_TIMED_OUT,
- TRANSITION_UNDEFINED,
-} ThreadPoolHeuristicStateTransition;
-
-static mono_lazy_init_t status = MONO_LAZY_INIT_STATUS_NOT_INITIALIZED;
-
-enum {
- MONITOR_STATUS_REQUESTED,
- MONITOR_STATUS_WAITING_FOR_REQUEST,
- MONITOR_STATUS_NOT_RUNNING,
-};
-
-static gint32 monitor_status = MONITOR_STATUS_NOT_RUNNING;
-
-static ThreadPool* threadpool;
-
-#define COUNTER_CHECK(counter) \
- do { \
- g_assert (counter._.max_working > 0); \
- g_assert (counter._.working >= 0); \
- g_assert (counter._.active >= 0); \
- } while (0)
-
-#define COUNTER_READ() (InterlockedRead64 (&threadpool->counters.as_gint64))
-
-#define COUNTER_ATOMIC(var,block) \
- do { \
- ThreadPoolCounter __old; \
- do { \
- g_assert (threadpool); \
- __old.as_gint64 = COUNTER_READ (); \
- (var) = __old; \
- { block; } \
- COUNTER_CHECK (var); \
- } while (InterlockedCompareExchange64 (&threadpool->counters.as_gint64, (var).as_gint64, __old.as_gint64) != __old.as_gint64); \
- } while (0)
-
-#define COUNTER_TRY_ATOMIC(res,var,block) \
- do { \
- ThreadPoolCounter __old; \
- do { \
- g_assert (threadpool); \
- __old.as_gint64 = COUNTER_READ (); \
- (var) = __old; \
- (res) = FALSE; \
- { block; } \
- COUNTER_CHECK (var); \
- (res) = InterlockedCompareExchange64 (&threadpool->counters.as_gint64, (var).as_gint64, __old.as_gint64) == __old.as_gint64; \
- } while (0); \
- } while (0)
-
-static inline void
-domains_lock (void)
-{
- mono_coop_mutex_lock (&threadpool->domains_lock);
-}
-
-static inline void
-domains_unlock (void)
-{
- mono_coop_mutex_unlock (&threadpool->domains_lock);
-}
-
-static gpointer
-rand_create (void)
-{
- mono_rand_open ();
- return mono_rand_init (NULL, 0);
-}
-
-static guint32
-rand_next (gpointer *handle, guint32 min, guint32 max)
-{
- MonoError error;
- guint32 val;
- mono_rand_try_get_uint32 (handle, &val, min, max, &error);
- // FIXME handle error
- mono_error_assert_ok (&error);
- return val;
-}
-
-static void
-rand_free (gpointer handle)
-{
- mono_rand_close (handle);
-}
-
-static void
-initialize (void)
-{
- ThreadPoolHillClimbing *hc;
- const char *threads_per_cpu_env;
- gint threads_per_cpu;
- gint threads_count;
-
- g_assert (!threadpool);
- threadpool = g_new0 (ThreadPool, 1);
- g_assert (threadpool);
-
- threadpool->domains = g_ptr_array_new ();
- mono_coop_mutex_init (&threadpool->domains_lock);
-
- threadpool->parked_threads_count = 0;
- mono_coop_cond_init (&threadpool->parked_threads_cond);
- threadpool->working_threads = g_ptr_array_new ();
- mono_coop_mutex_init (&threadpool->active_threads_lock);
-
- threadpool->worker_creation_current_second = -1;
- mono_coop_mutex_init (&threadpool->worker_creation_lock);
-
- threadpool->heuristic_adjustment_interval = 10;
- mono_coop_mutex_init (&threadpool->heuristic_lock);
-
- mono_rand_open ();
-
- hc = &threadpool->heuristic_hill_climbing;
-
- hc->wave_period = HILL_CLIMBING_WAVE_PERIOD;
- hc->max_thread_wave_magnitude = HILL_CLIMBING_MAX_WAVE_MAGNITUDE;
- hc->thread_magnitude_multiplier = (gdouble) HILL_CLIMBING_WAVE_MAGNITUDE_MULTIPLIER;
- hc->samples_to_measure = hc->wave_period * HILL_CLIMBING_WAVE_HISTORY_SIZE;
- hc->target_throughput_ratio = (gdouble) HILL_CLIMBING_BIAS;
- hc->target_signal_to_noise_ratio = (gdouble) HILL_CLIMBING_TARGET_SIGNAL_TO_NOISE_RATIO;
- hc->max_change_per_second = (gdouble) HILL_CLIMBING_MAX_CHANGE_PER_SECOND;
- hc->max_change_per_sample = (gdouble) HILL_CLIMBING_MAX_CHANGE_PER_SAMPLE;
- hc->sample_interval_low = HILL_CLIMBING_SAMPLE_INTERVAL_LOW;
- hc->sample_interval_high = HILL_CLIMBING_SAMPLE_INTERVAL_HIGH;
- hc->throughput_error_smoothing_factor = (gdouble) HILL_CLIMBING_ERROR_SMOOTHING_FACTOR;
- hc->gain_exponent = (gdouble) HILL_CLIMBING_GAIN_EXPONENT;
- hc->max_sample_error = (gdouble) HILL_CLIMBING_MAX_SAMPLE_ERROR_PERCENT;
- hc->current_control_setting = 0;
- hc->total_samples = 0;
- hc->last_thread_count = 0;
- hc->average_throughput_noise = 0;
- hc->elapsed_since_last_change = 0;
- hc->accumulated_completion_count = 0;
- hc->accumulated_sample_duration = 0;
- hc->samples = g_new0 (gdouble, hc->samples_to_measure);
- hc->thread_counts = g_new0 (gdouble, hc->samples_to_measure);
- hc->random_interval_generator = rand_create ();
- hc->current_sample_interval = rand_next (&hc->random_interval_generator, hc->sample_interval_low, hc->sample_interval_high);
-
- if (!(threads_per_cpu_env = g_getenv ("MONO_THREADS_PER_CPU")))
- threads_per_cpu = 1;
- else
- threads_per_cpu = CLAMP (atoi (threads_per_cpu_env), 1, 50);
-
- threads_count = mono_cpu_count () * threads_per_cpu;
-
- threadpool->limit_worker_min = threadpool->limit_io_min = threads_count;
-
-#if defined (PLATFORM_ANDROID) || defined (HOST_IOS)
- threadpool->limit_worker_max = threadpool->limit_io_max = CLAMP (threads_count * 100, MIN (threads_count, 200), MAX (threads_count, 200));
-#else
- threadpool->limit_worker_max = threadpool->limit_io_max = threads_count * 100;
-#endif
-
- threadpool->counters._.max_working = threadpool->limit_worker_min;
-
- threadpool->cpu_usage_state = g_new0 (MonoCpuUsageState, 1);
-
- threadpool->suspended = FALSE;
-}
-
-static void worker_kill (ThreadPoolWorkingThread *thread);
-
-static void
-cleanup (void)
-{
- guint i;
-
- /* we make the assumption along the code that we are
- * cleaning up only if the runtime is shutting down */
- g_assert (mono_runtime_is_shutting_down ());
-
- while (monitor_status != MONITOR_STATUS_NOT_RUNNING)
- mono_thread_info_sleep (1, NULL);
-
- mono_coop_mutex_lock (&threadpool->active_threads_lock);
-
- /* stop all threadpool->working_threads */
- for (i = 0; i < threadpool->working_threads->len; ++i)
- worker_kill ((ThreadPoolWorkingThread*) g_ptr_array_index (threadpool->working_threads, i));
-
- /* unpark all threadpool->parked_threads */
- mono_coop_cond_broadcast (&threadpool->parked_threads_cond);
-
- mono_coop_mutex_unlock (&threadpool->active_threads_lock);
-}
-
-gboolean
-mono_threadpool_ms_enqueue_work_item (MonoDomain *domain, MonoObject *work_item, MonoError *error)
-{
- static MonoClass *threadpool_class = NULL;
- static MonoMethod *unsafe_queue_custom_work_item_method = NULL;
- MonoDomain *current_domain;
- MonoBoolean f;
- gpointer args [2];
-
- mono_error_init (error);
- g_assert (work_item);
-
- if (!threadpool_class)
- threadpool_class = mono_class_load_from_name (mono_defaults.corlib, "System.Threading", "ThreadPool");
-
- if (!unsafe_queue_custom_work_item_method)
- unsafe_queue_custom_work_item_method = mono_class_get_method_from_name (threadpool_class, "UnsafeQueueCustomWorkItem", 2);
- g_assert (unsafe_queue_custom_work_item_method);
-
- f = FALSE;
-
- args [0] = (gpointer) work_item;
- args [1] = (gpointer) &f;
-
- current_domain = mono_domain_get ();
- if (current_domain == domain) {
- mono_runtime_invoke_checked (unsafe_queue_custom_work_item_method, NULL, args, error);
- return_val_if_nok (error, FALSE);
- } else {
- mono_thread_push_appdomain_ref (domain);
- if (mono_domain_set (domain, FALSE)) {
- mono_runtime_invoke_checked (unsafe_queue_custom_work_item_method, NULL, args, error);
- if (!is_ok (error)) {
- mono_thread_pop_appdomain_ref ();
- return FALSE;
- }
- mono_domain_set (current_domain, TRUE);
- }
- mono_thread_pop_appdomain_ref ();
- }
- return TRUE;
-}
-
-/* LOCKING: domains_lock must be held */
-static void
-tpdomain_add (ThreadPoolDomain *tpdomain)
-{
- guint i, len;
-
- g_assert (tpdomain);
-
- len = threadpool->domains->len;
- for (i = 0; i < len; ++i) {
- if (g_ptr_array_index (threadpool->domains, i) == tpdomain)
- break;
- }
-
- if (i == len)
- g_ptr_array_add (threadpool->domains, tpdomain);
-}
-
-/* LOCKING: domains_lock must be held. */
-static gboolean
-tpdomain_remove (ThreadPoolDomain *tpdomain)
-{
- g_assert (tpdomain);
- return g_ptr_array_remove (threadpool->domains, tpdomain);
-}
-
-/* LOCKING: domains_lock must be held */
-static ThreadPoolDomain *
-tpdomain_get (MonoDomain *domain, gboolean create)
-{
- guint i;
- ThreadPoolDomain *tpdomain;
-
- g_assert (domain);
-
- for (i = 0; i < threadpool->domains->len; ++i) {
- ThreadPoolDomain *tpdomain;
-
- tpdomain = (ThreadPoolDomain *)g_ptr_array_index (threadpool->domains, i);
- if (tpdomain->domain == domain)
- return tpdomain;
- }
-
- if (!create)
- return NULL;
-
- tpdomain = g_new0 (ThreadPoolDomain, 1);
- tpdomain->domain = domain;
- mono_coop_cond_init (&tpdomain->cleanup_cond);
-
- tpdomain_add (tpdomain);
-
- return tpdomain;
-}
-
-static void
-tpdomain_free (ThreadPoolDomain *tpdomain)
-{
- g_free (tpdomain);
-}
-
-/* LOCKING: domains_lock must be held */
-static gboolean
-domain_any_has_request (void)
-{
- guint i;
-
- for (i = 0; i < threadpool->domains->len; ++i) {
- ThreadPoolDomain *tmp = (ThreadPoolDomain *)g_ptr_array_index (threadpool->domains, i);
- if (tmp->outstanding_request > 0)
- return TRUE;
- }
-
- return FALSE;
-}
-
-/* LOCKING: domains_lock must be held */
-static ThreadPoolDomain *
-tpdomain_get_next (ThreadPoolDomain *current)
-{
- ThreadPoolDomain *tpdomain = NULL;
- guint len;
-
- len = threadpool->domains->len;
- if (len > 0) {
- guint i, current_idx = -1;
- if (current) {
- for (i = 0; i < len; ++i) {
- if (current == g_ptr_array_index (threadpool->domains, i)) {
- current_idx = i;
- break;
- }
- }
- g_assert (current_idx != (guint)-1);
- }
- for (i = current_idx + 1; i < len + current_idx + 1; ++i) {
- ThreadPoolDomain *tmp = (ThreadPoolDomain *)g_ptr_array_index (threadpool->domains, i % len);
- if (tmp->outstanding_request > 0) {
- tpdomain = tmp;
- break;
- }
- }
- }
-
- return tpdomain;
-}
-
-static void
-worker_wait_interrupt (gpointer data)
-{
- mono_coop_mutex_lock (&threadpool->active_threads_lock);
- mono_coop_cond_signal (&threadpool->parked_threads_cond);
- mono_coop_mutex_unlock (&threadpool->active_threads_lock);
-}
-
-/* return TRUE if timeout, FALSE otherwise (worker unpark or interrupt) */
-static gboolean
-worker_park (void)
-{
- gboolean timeout = FALSE;
-
- mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] current worker parking", mono_native_thread_id_get ());
-
- mono_gc_set_skip_thread (TRUE);
-
- mono_coop_mutex_lock (&threadpool->active_threads_lock);
-
- if (!mono_runtime_is_shutting_down ()) {
- static gpointer rand_handle = NULL;
- MonoInternalThread *thread_internal;
- gboolean interrupted = FALSE;
-
- if (!rand_handle)
- rand_handle = rand_create ();
- g_assert (rand_handle);
-
- thread_internal = mono_thread_internal_current ();
- g_assert (thread_internal);
-
- threadpool->parked_threads_count += 1;
- g_ptr_array_remove_fast (threadpool->working_threads, thread_internal);
-
- mono_thread_info_install_interrupt (worker_wait_interrupt, NULL, &interrupted);
- if (interrupted)
- goto done;
-
- if (mono_coop_cond_timedwait (&threadpool->parked_threads_cond, &threadpool->active_threads_lock, rand_next (&rand_handle, 5 * 1000, 60 * 1000)) != 0)
- timeout = TRUE;
-
- mono_thread_info_uninstall_interrupt (&interrupted);
-
-done:
- g_ptr_array_add (threadpool->working_threads, thread_internal);
- threadpool->parked_threads_count -= 1;
- }
-
- mono_coop_mutex_unlock (&threadpool->active_threads_lock);
-
- mono_gc_set_skip_thread (FALSE);
-
- mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] current worker unparking, timeout? %s", mono_native_thread_id_get (), timeout ? "yes" : "no");
-
- return timeout;
-}
-
-static gboolean
-worker_try_unpark (void)
-{
- gboolean res = FALSE;
-
- mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try unpark worker", mono_native_thread_id_get ());
-
- mono_coop_mutex_lock (&threadpool->active_threads_lock);
- if (threadpool->parked_threads_count > 0) {
- mono_coop_cond_signal (&threadpool->parked_threads_cond);
- res = TRUE;
- }
- mono_coop_mutex_unlock (&threadpool->active_threads_lock);
-
- mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try unpark worker, success? %s", mono_native_thread_id_get (), res ? "yes" : "no");
-
- return res;
-}
-
-static void
-worker_kill (ThreadPoolWorkingThread *thread)
-{
- if (thread == mono_thread_internal_current ())
- return;
-
- mono_thread_internal_abort ((MonoInternalThread*) thread);
-}
-
-static void
-worker_thread (gpointer data)
-{
- MonoError error;
- MonoInternalThread *thread;
- ThreadPoolDomain *tpdomain, *previous_tpdomain;
- ThreadPoolCounter counter;
- gboolean retire = FALSE;
-
- mono_trace (G_LOG_LEVEL_INFO, MONO_TRACE_THREADPOOL, "[%p] worker starting", mono_native_thread_id_get ());
-
- g_assert (threadpool);
-
- thread = mono_thread_internal_current ();
- g_assert (thread);
-
- mono_thread_set_name_internal (thread, mono_string_new (mono_get_root_domain (), "Threadpool worker"), FALSE, &error);
- mono_error_assert_ok (&error);
-
- mono_coop_mutex_lock (&threadpool->active_threads_lock);
- g_ptr_array_add (threadpool->working_threads, thread);
- mono_coop_mutex_unlock (&threadpool->active_threads_lock);
-
- previous_tpdomain = NULL;
-
- domains_lock ();
-
- while (!mono_runtime_is_shutting_down ()) {
- tpdomain = NULL;
-
- if ((thread->state & (ThreadState_AbortRequested | ThreadState_SuspendRequested)) != 0) {
- domains_unlock ();
- mono_thread_interruption_checkpoint ();
- domains_lock ();
- }
-
- if (retire || !(tpdomain = tpdomain_get_next (previous_tpdomain))) {
- gboolean timeout;
-
- COUNTER_ATOMIC (counter, {
- counter._.working --;
- counter._.parked ++;
- });
-
- domains_unlock ();
- timeout = worker_park ();
- domains_lock ();
-
- COUNTER_ATOMIC (counter, {
- counter._.working ++;
- counter._.parked --;
- });
-
- if (timeout)
- break;
-
- if (retire)
- retire = FALSE;
-
- /* The tpdomain->domain might have unloaded, while this thread was parked */
- previous_tpdomain = NULL;
-
- continue;
- }
-
- tpdomain->outstanding_request --;
- g_assert (tpdomain->outstanding_request >= 0);
-
- mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] worker running in domain %p (outstanding requests %d) ",
- mono_native_thread_id_get (), tpdomain->domain, tpdomain->outstanding_request);
-
- g_assert (tpdomain->domain);
- g_assert (tpdomain->threadpool_jobs >= 0);
- tpdomain->threadpool_jobs ++;
-
- /*
- * This is needed so there is always an lmf frame in the runtime invoke call below,
- * so ThreadAbortExceptions are caught even if the thread is in native code.
- */
- mono_defaults.threadpool_perform_wait_callback_method->save_lmf = TRUE;
-
- domains_unlock ();
-
- mono_thread_push_appdomain_ref (tpdomain->domain);
- if (mono_domain_set (tpdomain->domain, FALSE)) {
- MonoObject *exc = NULL, *res;
-
- res = mono_runtime_try_invoke (mono_defaults.threadpool_perform_wait_callback_method, NULL, NULL, &exc, &error);
- if (exc || !mono_error_ok(&error)) {
- if (exc == NULL)
- exc = (MonoObject *) mono_error_convert_to_exception (&error);
- else
- mono_error_cleanup (&error);
- mono_thread_internal_unhandled_exception (exc);
- } else if (res && *(MonoBoolean*) mono_object_unbox (res) == FALSE)
- retire = TRUE;
-
- mono_thread_clr_state (thread, (MonoThreadState)~ThreadState_Background);
- if (!mono_thread_test_state (thread , ThreadState_Background))
- ves_icall_System_Threading_Thread_SetState (thread, ThreadState_Background);
-
- mono_domain_set (mono_get_root_domain (), TRUE);
- }
- mono_thread_pop_appdomain_ref ();
-
- domains_lock ();
-
- tpdomain->threadpool_jobs --;
- g_assert (tpdomain->threadpool_jobs >= 0);
-
- if (tpdomain->outstanding_request + tpdomain->threadpool_jobs == 0 && mono_domain_is_unloading (tpdomain->domain)) {
- gboolean removed;
-
- removed = tpdomain_remove (tpdomain);
- g_assert (removed);
-
- mono_coop_cond_signal (&tpdomain->cleanup_cond);
- tpdomain = NULL;
- }
-
- previous_tpdomain = tpdomain;
- }
-
- domains_unlock ();
-
- mono_coop_mutex_lock (&threadpool->active_threads_lock);
- g_ptr_array_remove_fast (threadpool->working_threads, thread);
- mono_coop_mutex_unlock (&threadpool->active_threads_lock);
-
- COUNTER_ATOMIC (counter, {
- counter._.working--;
- counter._.active --;
- });
-
- mono_trace (G_LOG_LEVEL_INFO, MONO_TRACE_THREADPOOL, "[%p] worker finishing", mono_native_thread_id_get ());
-}
-
-static gboolean
-worker_try_create (void)
-{
- ThreadPoolCounter counter;
- MonoInternalThread *thread;
- gint64 current_ticks;
- gint32 now;
-
- mono_coop_mutex_lock (&threadpool->worker_creation_lock);
-
- mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try create worker", mono_native_thread_id_get ());
- current_ticks = mono_100ns_ticks ();
- now = current_ticks / (10 * 1000 * 1000);
- if (0 == current_ticks) {
- g_warning ("failed to get 100ns ticks");
- } else {
- if (threadpool->worker_creation_current_second != now) {
- threadpool->worker_creation_current_second = now;
- threadpool->worker_creation_current_count = 0;
- } else {
- g_assert (threadpool->worker_creation_current_count <= WORKER_CREATION_MAX_PER_SEC);
- if (threadpool->worker_creation_current_count == WORKER_CREATION_MAX_PER_SEC) {
- mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try create worker, failed: maximum number of worker created per second reached, current count = %d",
- mono_native_thread_id_get (), threadpool->worker_creation_current_count);
- mono_coop_mutex_unlock (&threadpool->worker_creation_lock);
- return FALSE;
- }
- }
- }
-
- COUNTER_ATOMIC (counter, {
- if (counter._.working >= counter._.max_working) {
- mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try create worker, failed: maximum number of working threads reached",
- mono_native_thread_id_get ());
- mono_coop_mutex_unlock (&threadpool->worker_creation_lock);
- return FALSE;
- }
- counter._.working ++;
- counter._.active ++;
- });
-
- MonoError error;
- if ((thread = mono_thread_create_internal (mono_get_root_domain (), worker_thread, NULL, TRUE, 0, &error)) != NULL) {
- threadpool->worker_creation_current_count += 1;
-
- mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try create worker, created %p, now = %d count = %d", mono_native_thread_id_get (), GUINT_TO_POINTER(thread->tid), now, threadpool->worker_creation_current_count);
- mono_coop_mutex_unlock (&threadpool->worker_creation_lock);
- return TRUE;
- }
-
- mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try create worker, failed: could not create thread due to %s", mono_native_thread_id_get (), mono_error_get_message (&error));
- mono_error_cleanup (&error);
-
- COUNTER_ATOMIC (counter, {
- counter._.working --;
- counter._.active --;
- });
-
- mono_coop_mutex_unlock (&threadpool->worker_creation_lock);
- return FALSE;
-}
-
-static void monitor_ensure_running (void);
-
-static gboolean
-worker_request (MonoDomain *domain)
-{
- ThreadPoolDomain *tpdomain;
-
- g_assert (domain);
- g_assert (threadpool);
-
- if (mono_runtime_is_shutting_down ())
- return FALSE;
-
- domains_lock ();
-
- /* synchronize check with worker_thread */
- if (mono_domain_is_unloading (domain)) {
- domains_unlock ();
- return FALSE;
- }
-
- tpdomain = tpdomain_get (domain, TRUE);
- g_assert (tpdomain);
- tpdomain->outstanding_request ++;
-
- mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] request worker, domain = %p, outstanding_request = %d",
- mono_native_thread_id_get (), tpdomain->domain, tpdomain->outstanding_request);
-
- domains_unlock ();
-
- if (threadpool->suspended)
- return FALSE;
-
- monitor_ensure_running ();
-
- if (worker_try_unpark ()) {
- mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] request worker, unparked", mono_native_thread_id_get ());
- return TRUE;
- }
-
- if (worker_try_create ()) {
- mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] request worker, created", mono_native_thread_id_get ());
- return TRUE;
- }
-
- mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] request worker, failed", mono_native_thread_id_get ());
- return FALSE;
-}
-
-static gboolean
-monitor_should_keep_running (void)
-{
- static gint64 last_should_keep_running = -1;
-
- g_assert (monitor_status == MONITOR_STATUS_WAITING_FOR_REQUEST || monitor_status == MONITOR_STATUS_REQUESTED);
-
- if (InterlockedExchange (&monitor_status, MONITOR_STATUS_WAITING_FOR_REQUEST) == MONITOR_STATUS_WAITING_FOR_REQUEST) {
- gboolean should_keep_running = TRUE, force_should_keep_running = FALSE;
-
- if (mono_runtime_is_shutting_down ()) {
- should_keep_running = FALSE;
- } else {
- domains_lock ();
- if (!domain_any_has_request ())
- should_keep_running = FALSE;
- domains_unlock ();
-
- if (!should_keep_running) {
- if (last_should_keep_running == -1 || mono_100ns_ticks () - last_should_keep_running < MONITOR_MINIMAL_LIFETIME * 1000 * 10) {
- should_keep_running = force_should_keep_running = TRUE;
- }
- }
- }
-
- if (should_keep_running) {
- if (last_should_keep_running == -1 || !force_should_keep_running)
- last_should_keep_running = mono_100ns_ticks ();
- } else {
- last_should_keep_running = -1;
- if (InterlockedCompareExchange (&monitor_status, MONITOR_STATUS_NOT_RUNNING, MONITOR_STATUS_WAITING_FOR_REQUEST) == MONITOR_STATUS_WAITING_FOR_REQUEST)
- return FALSE;
- }
- }
-
- g_assert (monitor_status == MONITOR_STATUS_WAITING_FOR_REQUEST || monitor_status == MONITOR_STATUS_REQUESTED);
-
- return TRUE;
-}
-
-static gboolean
-monitor_sufficient_delay_since_last_dequeue (void)
-{
- gint64 threshold;
-
- g_assert (threadpool);
-
- if (threadpool->cpu_usage < CPU_USAGE_LOW) {
- threshold = MONITOR_INTERVAL;
- } else {
- ThreadPoolCounter counter;
- counter.as_gint64 = COUNTER_READ();
- threshold = counter._.max_working * MONITOR_INTERVAL * 2;
- }
-
- return mono_msec_ticks () >= threadpool->heuristic_last_dequeue + threshold;
-}
-
-static void hill_climbing_force_change (gint16 new_thread_count, ThreadPoolHeuristicStateTransition transition);
-
-static void
-monitor_thread (void)
-{
- MonoInternalThread *current_thread = mono_thread_internal_current ();
- guint i;
-
- mono_cpu_usage (threadpool->cpu_usage_state);
-
- mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] monitor thread, started", mono_native_thread_id_get ());
-
- do {
- ThreadPoolCounter counter;
- gboolean limit_worker_max_reached;
- gint32 interval_left = MONITOR_INTERVAL;
- gint32 awake = 0; /* number of spurious awakes we tolerate before doing a round of rebalancing */
-
- g_assert (monitor_status != MONITOR_STATUS_NOT_RUNNING);
-
- mono_gc_set_skip_thread (TRUE);
-
- do {
- gint64 ts;
- gboolean alerted = FALSE;
-
- if (mono_runtime_is_shutting_down ())
- break;
-
- ts = mono_msec_ticks ();
- if (mono_thread_info_sleep (interval_left, &alerted) == 0)
- break;
- interval_left -= mono_msec_ticks () - ts;
-
- mono_gc_set_skip_thread (FALSE);
- if ((current_thread->state & (ThreadState_StopRequested | ThreadState_SuspendRequested)) != 0)
- mono_thread_interruption_checkpoint ();
- mono_gc_set_skip_thread (TRUE);
- } while (interval_left > 0 && ++awake < 10);
-
- mono_gc_set_skip_thread (FALSE);
-
- if (threadpool->suspended)
- continue;
-
- if (mono_runtime_is_shutting_down ())
- continue;
-
- domains_lock ();
- if (!domain_any_has_request ()) {
- domains_unlock ();
- continue;
- }
- domains_unlock ();
-
- threadpool->cpu_usage = mono_cpu_usage (threadpool->cpu_usage_state);
-
- if (!monitor_sufficient_delay_since_last_dequeue ())
- continue;
-
- limit_worker_max_reached = FALSE;
-
- COUNTER_ATOMIC (counter, {
- if (counter._.max_working >= threadpool->limit_worker_max) {
- limit_worker_max_reached = TRUE;
- break;
- }
- counter._.max_working ++;
- });
-
- if (limit_worker_max_reached)
- continue;
-
- hill_climbing_force_change (counter._.max_working, TRANSITION_STARVATION);
-
- for (i = 0; i < 5; ++i) {
- if (mono_runtime_is_shutting_down ())
- break;
-
- if (worker_try_unpark ()) {
- mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] monitor thread, unparked", mono_native_thread_id_get ());
- break;
- }
-
- if (worker_try_create ()) {
- mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] monitor thread, created", mono_native_thread_id_get ());
- break;
- }
- }
- } while (monitor_should_keep_running ());
-
- mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] monitor thread, finished", mono_native_thread_id_get ());
-}
-
-static void
-monitor_ensure_running (void)
-{
- MonoError error;
- for (;;) {
- switch (monitor_status) {
- case MONITOR_STATUS_REQUESTED:
- return;
- case MONITOR_STATUS_WAITING_FOR_REQUEST:
- InterlockedCompareExchange (&monitor_status, MONITOR_STATUS_REQUESTED, MONITOR_STATUS_WAITING_FOR_REQUEST);
- break;
- case MONITOR_STATUS_NOT_RUNNING:
- if (mono_runtime_is_shutting_down ())
- return;
- if (InterlockedCompareExchange (&monitor_status, MONITOR_STATUS_REQUESTED, MONITOR_STATUS_NOT_RUNNING) == MONITOR_STATUS_NOT_RUNNING) {
- if (!mono_thread_create_internal (mono_get_root_domain (), monitor_thread, NULL, TRUE, SMALL_STACK, &error)) {
- monitor_status = MONITOR_STATUS_NOT_RUNNING;
- mono_error_cleanup (&error);
- }
- return;
- }
- break;
- default: g_assert_not_reached ();
- }
- }
-}
-
-static void
-hill_climbing_change_thread_count (gint16 new_thread_count, ThreadPoolHeuristicStateTransition transition)
-{
- ThreadPoolHillClimbing *hc;
-
- g_assert (threadpool);
-
- hc = &threadpool->heuristic_hill_climbing;
-
- mono_trace (G_LOG_LEVEL_INFO, MONO_TRACE_THREADPOOL, "[%p] hill climbing, change max number of threads %d", mono_native_thread_id_get (), new_thread_count);
-
- hc->last_thread_count = new_thread_count;
- hc->current_sample_interval = rand_next (&hc->random_interval_generator, hc->sample_interval_low, hc->sample_interval_high);
- hc->elapsed_since_last_change = 0;
- hc->completions_since_last_change = 0;
-}
-
-static void
-hill_climbing_force_change (gint16 new_thread_count, ThreadPoolHeuristicStateTransition transition)
-{
- ThreadPoolHillClimbing *hc;
-
- g_assert (threadpool);
-
- hc = &threadpool->heuristic_hill_climbing;
-
- if (new_thread_count != hc->last_thread_count) {
- hc->current_control_setting += new_thread_count - hc->last_thread_count;
- hill_climbing_change_thread_count (new_thread_count, transition);
- }
-}
-
-static double_complex
-hill_climbing_get_wave_component (gdouble *samples, guint sample_count, gdouble period)
-{
- ThreadPoolHillClimbing *hc;
- gdouble w, cosine, sine, coeff, q0, q1, q2;
- guint i;
-
- g_assert (threadpool);
- g_assert (sample_count >= period);
- g_assert (period >= 2);
-
- hc = &threadpool->heuristic_hill_climbing;
-
- w = 2.0 * M_PI / period;
- cosine = cos (w);
- sine = sin (w);
- coeff = 2.0 * cosine;
- q0 = q1 = q2 = 0;
-
- for (i = 0; i < sample_count; ++i) {
- q0 = coeff * q1 - q2 + samples [(hc->total_samples - sample_count + i) % hc->samples_to_measure];
- q2 = q1;
- q1 = q0;
- }
-
- return mono_double_complex_scalar_div (mono_double_complex_make (q1 - q2 * cosine, (q2 * sine)), ((gdouble)sample_count));
-}
-
-static gint16
-hill_climbing_update (gint16 current_thread_count, guint32 sample_duration, gint32 completions, gint64 *adjustment_interval)
-{
- ThreadPoolHillClimbing *hc;
- ThreadPoolHeuristicStateTransition transition;
- gdouble throughput;
- gdouble throughput_error_estimate;
- gdouble confidence;
- gdouble move;
- gdouble gain;
- gint sample_index;
- gint sample_count;
- gint new_thread_wave_magnitude;
- gint new_thread_count;
- double_complex thread_wave_component;
- double_complex throughput_wave_component;
- double_complex ratio;
-
- g_assert (threadpool);
- g_assert (adjustment_interval);
-
- hc = &threadpool->heuristic_hill_climbing;
-
- /* If someone changed the thread count without telling us, update our records accordingly. */
- if (current_thread_count != hc->last_thread_count)
- hill_climbing_force_change (current_thread_count, TRANSITION_INITIALIZING);
-
- /* Update the cumulative stats for this thread count */
- hc->elapsed_since_last_change += sample_duration;
- hc->completions_since_last_change += completions;
-
- /* Add in any data we've already collected about this sample */
- sample_duration += hc->accumulated_sample_duration;
- completions += hc->accumulated_completion_count;
-
- /* We need to make sure we're collecting reasonably accurate data. Since we're just counting the end
- * of each work item, we are goinng to be missing some data about what really happened during the
- * sample interval. The count produced by each thread includes an initial work item that may have
- * started well before the start of the interval, and each thread may have been running some new
- * work item for some time before the end of the interval, which did not yet get counted. So
- * our count is going to be off by +/- threadCount workitems.
- *
- * The exception is that the thread that reported to us last time definitely wasn't running any work
- * at that time, and the thread that's reporting now definitely isn't running a work item now. So
- * we really only need to consider threadCount-1 threads.
- *
- * Thus the percent error in our count is +/- (threadCount-1)/numCompletions.
- *
- * We cannot rely on the frequency-domain analysis we'll be doing later to filter out this error, because
- * of the way it accumulates over time. If this sample is off by, say, 33% in the negative direction,
- * then the next one likely will be too. The one after that will include the sum of the completions
- * we missed in the previous samples, and so will be 33% positive. So every three samples we'll have
- * two "low" samples and one "high" sample. This will appear as periodic variation right in the frequency
- * range we're targeting, which will not be filtered by the frequency-domain translation. */
- if (hc->total_samples > 0 && ((current_thread_count - 1.0) / completions) >= hc->max_sample_error) {
- /* Not accurate enough yet. Let's accumulate the data so
- * far, and tell the ThreadPool to collect a little more. */
- hc->accumulated_sample_duration = sample_duration;
- hc->accumulated_completion_count = completions;
- *adjustment_interval = 10;
- return current_thread_count;
- }
-
- /* We've got enouugh data for our sample; reset our accumulators for next time. */
- hc->accumulated_sample_duration = 0;
- hc->accumulated_completion_count = 0;
-
- /* Add the current thread count and throughput sample to our history. */
- throughput = ((gdouble) completions) / sample_duration;
-
- sample_index = hc->total_samples % hc->samples_to_measure;
- hc->samples [sample_index] = throughput;
- hc->thread_counts [sample_index] = current_thread_count;
- hc->total_samples ++;
-
- /* Set up defaults for our metrics. */
- thread_wave_component = mono_double_complex_make(0, 0);
- throughput_wave_component = mono_double_complex_make(0, 0);
- throughput_error_estimate = 0;
- ratio = mono_double_complex_make(0, 0);
- confidence = 0;
-
- transition = TRANSITION_WARMUP;
-
- /* How many samples will we use? It must be at least the three wave periods we're looking for, and it must also
- * be a whole multiple of the primary wave's period; otherwise the frequency we're looking for will fall between
- * two frequency bands in the Fourier analysis, and we won't be able to measure it accurately. */
- sample_count = ((gint) MIN (hc->total_samples - 1, hc->samples_to_measure) / hc->wave_period) * hc->wave_period;
-
- if (sample_count > hc->wave_period) {
- guint i;
- gdouble average_throughput;
- gdouble average_thread_count;
- gdouble sample_sum = 0;
- gdouble thread_sum = 0;
-
- /* Average the throughput and thread count samples, so we can scale the wave magnitudes later. */
- for (i = 0; i < sample_count; ++i) {
- guint j = (hc->total_samples - sample_count + i) % hc->samples_to_measure;
- sample_sum += hc->samples [j];
- thread_sum += hc->thread_counts [j];
- }
-
- average_throughput = sample_sum / sample_count;
- average_thread_count = thread_sum / sample_count;
-
- if (average_throughput > 0 && average_thread_count > 0) {
- gdouble noise_for_confidence, adjacent_period_1, adjacent_period_2;
-
- /* Calculate the periods of the adjacent frequency bands we'll be using to
- * measure noise levels. We want the two adjacent Fourier frequency bands. */
- adjacent_period_1 = sample_count / (((gdouble) sample_count) / ((gdouble) hc->wave_period) + 1);
- adjacent_period_2 = sample_count / (((gdouble) sample_count) / ((gdouble) hc->wave_period) - 1);
-
- /* Get the the three different frequency components of the throughput (scaled by average
- * throughput). Our "error" estimate (the amount of noise that might be present in the
- * frequency band we're really interested in) is the average of the adjacent bands. */
- throughput_wave_component = mono_double_complex_scalar_div (hill_climbing_get_wave_component (hc->samples, sample_count, hc->wave_period), average_throughput);
- throughput_error_estimate = cabs (mono_double_complex_scalar_div (hill_climbing_get_wave_component (hc->samples, sample_count, adjacent_period_1), average_throughput));
-
- if (adjacent_period_2 <= sample_count) {
- throughput_error_estimate = MAX (throughput_error_estimate, cabs (mono_double_complex_scalar_div (hill_climbing_get_wave_component (
- hc->samples, sample_count, adjacent_period_2), average_throughput)));
- }
-
- /* Do the same for the thread counts, so we have something to compare to. We don't
- * measure thread count noise, because there is none; these are exact measurements. */
- thread_wave_component = mono_double_complex_scalar_div (hill_climbing_get_wave_component (hc->thread_counts, sample_count, hc->wave_period), average_thread_count);
-
- /* Update our moving average of the throughput noise. We'll use this
- * later as feedback to determine the new size of the thread wave. */
- if (hc->average_throughput_noise == 0) {
- hc->average_throughput_noise = throughput_error_estimate;
- } else {
- hc->average_throughput_noise = (hc->throughput_error_smoothing_factor * throughput_error_estimate)
- + ((1.0 + hc->throughput_error_smoothing_factor) * hc->average_throughput_noise);
- }
-
- if (cabs (thread_wave_component) > 0) {
- /* Adjust the throughput wave so it's centered around the target wave,
- * and then calculate the adjusted throughput/thread ratio. */
- ratio = mono_double_complex_div (mono_double_complex_sub (throughput_wave_component, mono_double_complex_scalar_mul(thread_wave_component, hc->target_throughput_ratio)), thread_wave_component);
- transition = TRANSITION_CLIMBING_MOVE;
- } else {
- ratio = mono_double_complex_make (0, 0);
- transition = TRANSITION_STABILIZING;
- }
-
- noise_for_confidence = MAX (hc->average_throughput_noise, throughput_error_estimate);
- if (noise_for_confidence > 0) {
- confidence = cabs (thread_wave_component) / noise_for_confidence / hc->target_signal_to_noise_ratio;
- } else {
- /* there is no noise! */
- confidence = 1.0;
- }
- }
- }
-
- /* We use just the real part of the complex ratio we just calculated. If the throughput signal
- * is exactly in phase with the thread signal, this will be the same as taking the magnitude of
- * the complex move and moving that far up. If they're 180 degrees out of phase, we'll move
- * backward (because this indicates that our changes are having the opposite of the intended effect).
- * If they're 90 degrees out of phase, we won't move at all, because we can't tell wether we're
- * having a negative or positive effect on throughput. */
- move = creal (ratio);
- move = CLAMP (move, -1.0, 1.0);
-
- /* Apply our confidence multiplier. */
- move *= CLAMP (confidence, -1.0, 1.0);
-
- /* Now apply non-linear gain, such that values around zero are attenuated, while higher values
- * are enhanced. This allows us to move quickly if we're far away from the target, but more slowly
- * if we're getting close, giving us rapid ramp-up without wild oscillations around the target. */
- gain = hc->max_change_per_second * sample_duration;
- move = pow (fabs (move), hc->gain_exponent) * (move >= 0.0 ? 1 : -1) * gain;
- move = MIN (move, hc->max_change_per_sample);
-
- /* If the result was positive, and CPU is > 95%, refuse the move. */
- if (move > 0.0 && threadpool->cpu_usage > CPU_USAGE_HIGH)
- move = 0.0;
-
- /* Apply the move to our control setting. */
- hc->current_control_setting += move;
-
- /* Calculate the new thread wave magnitude, which is based on the moving average we've been keeping of the
- * throughput error. This average starts at zero, so we'll start with a nice safe little wave at first. */
- new_thread_wave_magnitude = (gint)(0.5 + (hc->current_control_setting * hc->average_throughput_noise
- * hc->target_signal_to_noise_ratio * hc->thread_magnitude_multiplier * 2.0));
- new_thread_wave_magnitude = CLAMP (new_thread_wave_magnitude, 1, hc->max_thread_wave_magnitude);
-
- /* Make sure our control setting is within the ThreadPool's limits. */
- hc->current_control_setting = CLAMP (hc->current_control_setting, threadpool->limit_worker_min, threadpool->limit_worker_max - new_thread_wave_magnitude);
-
- /* Calculate the new thread count (control setting + square wave). */
- new_thread_count = (gint)(hc->current_control_setting + new_thread_wave_magnitude * ((hc->total_samples / (hc->wave_period / 2)) % 2));
-
- /* Make sure the new thread count doesn't exceed the ThreadPool's limits. */
- new_thread_count = CLAMP (new_thread_count, threadpool->limit_worker_min, threadpool->limit_worker_max);
-
- if (new_thread_count != current_thread_count)
- hill_climbing_change_thread_count (new_thread_count, transition);
-
- if (creal (ratio) < 0.0 && new_thread_count == threadpool->limit_worker_min)
- *adjustment_interval = (gint)(0.5 + hc->current_sample_interval * (10.0 * MAX (-1.0 * creal (ratio), 1.0)));
- else
- *adjustment_interval = hc->current_sample_interval;
-
- return new_thread_count;
-}
-
-static void
-heuristic_notify_work_completed (void)
-{
- g_assert (threadpool);
-
- InterlockedIncrement (&threadpool->heuristic_completions);
- threadpool->heuristic_last_dequeue = mono_msec_ticks ();
-}
-
-static gboolean
-heuristic_should_adjust (void)
-{
- g_assert (threadpool);
-
- if (threadpool->heuristic_last_dequeue > threadpool->heuristic_last_adjustment + threadpool->heuristic_adjustment_interval) {
- ThreadPoolCounter counter;
- counter.as_gint64 = COUNTER_READ();
- if (counter._.working <= counter._.max_working)
- return TRUE;
- }
-
- return FALSE;
-}
-
-static void
-heuristic_adjust (void)
-{
- g_assert (threadpool);
-
- if (mono_coop_mutex_trylock (&threadpool->heuristic_lock) == 0) {
- gint32 completions = InterlockedExchange (&threadpool->heuristic_completions, 0);
- gint64 sample_end = mono_msec_ticks ();
- gint64 sample_duration = sample_end - threadpool->heuristic_sample_start;
-
- if (sample_duration >= threadpool->heuristic_adjustment_interval / 2) {
- ThreadPoolCounter counter;
- gint16 new_thread_count;
-
- counter.as_gint64 = COUNTER_READ ();
- new_thread_count = hill_climbing_update (counter._.max_working, sample_duration, completions, &threadpool->heuristic_adjustment_interval);
-
- COUNTER_ATOMIC (counter, { counter._.max_working = new_thread_count; });
-
- if (new_thread_count > counter._.max_working)
- worker_request (mono_domain_get ());
-
- threadpool->heuristic_sample_start = sample_end;
- threadpool->heuristic_last_adjustment = mono_msec_ticks ();
- }
-
- mono_coop_mutex_unlock (&threadpool->heuristic_lock);
- }
-}
-
-void
-mono_threadpool_ms_cleanup (void)
-{
-#ifndef DISABLE_SOCKETS
- mono_threadpool_ms_io_cleanup ();
-#endif
- mono_lazy_cleanup (&status, cleanup);
-}
-
-MonoAsyncResult *
-mono_threadpool_ms_begin_invoke (MonoDomain *domain, MonoObject *target, MonoMethod *method, gpointer *params, MonoError *error)
-{
- static MonoClass *async_call_klass = NULL;
- MonoMethodMessage *message;
- MonoAsyncResult *async_result;
- MonoAsyncCall *async_call;
- MonoDelegate *async_callback = NULL;
- MonoObject *state = NULL;
-
- if (!async_call_klass)
- async_call_klass = mono_class_load_from_name (mono_defaults.corlib, "System", "MonoAsyncCall");
-
- mono_lazy_initialize (&status, initialize);
-
- mono_error_init (error);
-
- message = mono_method_call_message_new (method, params, mono_get_delegate_invoke (method->klass), (params != NULL) ? (&async_callback) : NULL, (params != NULL) ? (&state) : NULL, error);
- return_val_if_nok (error, NULL);
-
- async_call = (MonoAsyncCall*) mono_object_new_checked (domain, async_call_klass, error);
- return_val_if_nok (error, NULL);
-
- MONO_OBJECT_SETREF (async_call, msg, message);
- MONO_OBJECT_SETREF (async_call, state, state);
-
- if (async_callback) {
- MONO_OBJECT_SETREF (async_call, cb_method, mono_get_delegate_invoke (((MonoObject*) async_callback)->vtable->klass));
- MONO_OBJECT_SETREF (async_call, cb_target, async_callback);
- }
-
- async_result = mono_async_result_new (domain, NULL, async_call->state, NULL, (MonoObject*) async_call, error);
- return_val_if_nok (error, NULL);
- MONO_OBJECT_SETREF (async_result, async_delegate, target);
-
- mono_threadpool_ms_enqueue_work_item (domain, (MonoObject*) async_result, error);
- return_val_if_nok (error, NULL);
-
- return async_result;
-}
-
-MonoObject *
-mono_threadpool_ms_end_invoke (MonoAsyncResult *ares, MonoArray **out_args, MonoObject **exc, MonoError *error)
-{
- MonoAsyncCall *ac;
-
- mono_error_init (error);
- g_assert (exc);
- g_assert (out_args);
-
- *exc = NULL;
- *out_args = NULL;
-
- /* check if already finished */
- mono_monitor_enter ((MonoObject*) ares);
-
- if (ares->endinvoke_called) {
- mono_error_set_invalid_operation(error, "Delegate EndInvoke method called more than once");
- mono_monitor_exit ((MonoObject*) ares);
- return NULL;
- }
-
- ares->endinvoke_called = 1;
-
- /* wait until we are really finished */
- if (ares->completed) {
- mono_monitor_exit ((MonoObject *) ares);
- } else {
- gpointer wait_event;
- if (ares->handle) {
- wait_event = mono_wait_handle_get_handle ((MonoWaitHandle*) ares->handle);
- } else {
- wait_event = mono_w32event_create (TRUE, FALSE);
- g_assert(wait_event);
- MonoWaitHandle *wait_handle = mono_wait_handle_new (mono_object_domain (ares), wait_event, error);
- if (!is_ok (error)) {
- CloseHandle (wait_event);
- return NULL;
- }
- MONO_OBJECT_SETREF (ares, handle, (MonoObject*) wait_handle);
- }
- mono_monitor_exit ((MonoObject*) ares);
- MONO_ENTER_GC_SAFE;
-#ifdef HOST_WIN32
- WaitForSingleObjectEx (wait_event, INFINITE, TRUE);
-#else
- mono_w32handle_wait_one (wait_event, MONO_INFINITE_WAIT, TRUE);
-#endif
- MONO_EXIT_GC_SAFE;
- }
-
- ac = (MonoAsyncCall*) ares->object_data;
- g_assert (ac);
-
- *exc = ac->msg->exc; /* FIXME: GC add write barrier */
- *out_args = ac->out_args;
- return ac->res;
-}
-
-gboolean
-mono_threadpool_ms_remove_domain_jobs (MonoDomain *domain, int timeout)
-{
- gint64 end;
- ThreadPoolDomain *tpdomain;
- gboolean ret;
-
- g_assert (domain);
- g_assert (timeout >= -1);
-
- g_assert (mono_domain_is_unloading (domain));
-
- if (timeout != -1)
- end = mono_msec_ticks () + timeout;
-
-#ifndef DISABLE_SOCKETS
- mono_threadpool_ms_io_remove_domain_jobs (domain);
- if (timeout != -1) {
- if (mono_msec_ticks () > end)
- return FALSE;
- }
-#endif
-
- /*
- * Wait for all threads which execute jobs in the domain to exit.
- * The is_unloading () check in worker_request () ensures that
- * no new jobs are added after we enter the lock below.
- */
- mono_lazy_initialize (&status, initialize);
- domains_lock ();
-
- tpdomain = tpdomain_get (domain, FALSE);
- if (!tpdomain) {
- domains_unlock ();
- return TRUE;
- }
-
- ret = TRUE;
-
- while (tpdomain->outstanding_request + tpdomain->threadpool_jobs > 0) {
- if (timeout == -1) {
- mono_coop_cond_wait (&tpdomain->cleanup_cond, &threadpool->domains_lock);
- } else {
- gint64 now;
- gint res;
-
- now = mono_msec_ticks();
- if (now > end) {
- ret = FALSE;
- break;
- }
-
- res = mono_coop_cond_timedwait (&tpdomain->cleanup_cond, &threadpool->domains_lock, end - now);
- if (res != 0) {
- ret = FALSE;
- break;
- }
- }
- }
-
- /* Remove from the list the worker threads look at */
- tpdomain_remove (tpdomain);
-
- domains_unlock ();
-
- mono_coop_cond_destroy (&tpdomain->cleanup_cond);
- tpdomain_free (tpdomain);
-
- return ret;
-}
-
-void
-mono_threadpool_ms_suspend (void)
-{
- if (threadpool)
- threadpool->suspended = TRUE;
-}
-
-void
-mono_threadpool_ms_resume (void)
-{
- if (threadpool)
- threadpool->suspended = FALSE;
-}
-
-void
-ves_icall_System_Threading_ThreadPool_GetAvailableThreadsNative (gint32 *worker_threads, gint32 *completion_port_threads)
-{
- ThreadPoolCounter counter;
-
- if (!worker_threads || !completion_port_threads)
- return;
-
- mono_lazy_initialize (&status, initialize);
-
- counter.as_gint64 = COUNTER_READ ();
-
- *worker_threads = MAX (0, threadpool->limit_worker_max - counter._.active);
- *completion_port_threads = threadpool->limit_io_max;
-}
-
-void
-ves_icall_System_Threading_ThreadPool_GetMinThreadsNative (gint32 *worker_threads, gint32 *completion_port_threads)
-{
- if (!worker_threads || !completion_port_threads)
- return;
-
- mono_lazy_initialize (&status, initialize);
-
- *worker_threads = threadpool->limit_worker_min;
- *completion_port_threads = threadpool->limit_io_min;
-}
-
-void
-ves_icall_System_Threading_ThreadPool_GetMaxThreadsNative (gint32 *worker_threads, gint32 *completion_port_threads)
-{
- if (!worker_threads || !completion_port_threads)
- return;
-
- mono_lazy_initialize (&status, initialize);
-
- *worker_threads = threadpool->limit_worker_max;
- *completion_port_threads = threadpool->limit_io_max;
-}
-
-MonoBoolean
-ves_icall_System_Threading_ThreadPool_SetMinThreadsNative (gint32 worker_threads, gint32 completion_port_threads)
-{
- mono_lazy_initialize (&status, initialize);
-
- if (worker_threads <= 0 || worker_threads > threadpool->limit_worker_max)
- return FALSE;
- if (completion_port_threads <= 0 || completion_port_threads > threadpool->limit_io_max)
- return FALSE;
-
- threadpool->limit_worker_min = worker_threads;
- threadpool->limit_io_min = completion_port_threads;
-
- return TRUE;
-}
-
-MonoBoolean
-ves_icall_System_Threading_ThreadPool_SetMaxThreadsNative (gint32 worker_threads, gint32 completion_port_threads)
-{
- gint cpu_count = mono_cpu_count ();
-
- mono_lazy_initialize (&status, initialize);
-
- if (worker_threads < threadpool->limit_worker_min || worker_threads < cpu_count)
- return FALSE;
- if (completion_port_threads < threadpool->limit_io_min || completion_port_threads < cpu_count)
- return FALSE;
-
- threadpool->limit_worker_max = worker_threads;
- threadpool->limit_io_max = completion_port_threads;
-
- return TRUE;
-}
-
-void
-ves_icall_System_Threading_ThreadPool_InitializeVMTp (MonoBoolean *enable_worker_tracking)
-{
- if (enable_worker_tracking) {
- // TODO implement some kind of switch to have the possibily to use it
- *enable_worker_tracking = FALSE;
- }
-
- mono_lazy_initialize (&status, initialize);
-}
-
-MonoBoolean
-ves_icall_System_Threading_ThreadPool_NotifyWorkItemComplete (void)
-{
- ThreadPoolCounter counter;
-
- if (mono_domain_is_unloading (mono_domain_get ()) || mono_runtime_is_shutting_down ())
- return FALSE;
-
- heuristic_notify_work_completed ();
-
- if (heuristic_should_adjust ())
- heuristic_adjust ();
-
- counter.as_gint64 = COUNTER_READ ();
- return counter._.working <= counter._.max_working;
-}
-
-void
-ves_icall_System_Threading_ThreadPool_NotifyWorkItemProgressNative (void)
-{
- heuristic_notify_work_completed ();
-
- if (heuristic_should_adjust ())
- heuristic_adjust ();
-}
-
-void
-ves_icall_System_Threading_ThreadPool_ReportThreadStatus (MonoBoolean is_working)
-{
- // TODO
- MonoError error;
- mono_error_set_not_implemented (&error, "");
- mono_error_set_pending_exception (&error);
-}
-
-MonoBoolean
-ves_icall_System_Threading_ThreadPool_RequestWorkerThread (void)
-{
- return worker_request (mono_domain_get ());
-}
-
-MonoBoolean G_GNUC_UNUSED
-ves_icall_System_Threading_ThreadPool_PostQueuedCompletionStatus (MonoNativeOverlapped *native_overlapped)
-{
- /* This copy the behavior of the current Mono implementation */
- MonoError error;
- mono_error_set_not_implemented (&error, "");
- mono_error_set_pending_exception (&error);
- return FALSE;
-}
-
-MonoBoolean G_GNUC_UNUSED
-ves_icall_System_Threading_ThreadPool_BindIOCompletionCallbackNative (gpointer file_handle)
-{
- /* This copy the behavior of the current Mono implementation */
- return TRUE;
-}
-
-MonoBoolean G_GNUC_UNUSED
-ves_icall_System_Threading_ThreadPool_IsThreadPoolHosted (void)
-{
- return FALSE;
-}
+++ /dev/null
-#ifndef _MONO_THREADPOOL_MICROSOFT_H_
-#define _MONO_THREADPOOL_MICROSOFT_H_
-
-#include <config.h>
-#include <glib.h>
-
-#include <mono/metadata/exception.h>
-#include <mono/metadata/object-internals.h>
-
-#define SMALL_STACK (sizeof (gpointer) * 32 * 1024)
-
-typedef struct _MonoNativeOverlapped MonoNativeOverlapped;
-
-void
-mono_threadpool_ms_cleanup (void);
-
-MonoAsyncResult *
-mono_threadpool_ms_begin_invoke (MonoDomain *domain, MonoObject *target, MonoMethod *method, gpointer *params, MonoError *error);
-MonoObject *
-mono_threadpool_ms_end_invoke (MonoAsyncResult *ares, MonoArray **out_args, MonoObject **exc, MonoError *error);
-
-gboolean
-mono_threadpool_ms_remove_domain_jobs (MonoDomain *domain, int timeout);
-
-void
-mono_threadpool_ms_suspend (void);
-void
-mono_threadpool_ms_resume (void);
-
-void
-ves_icall_System_Threading_ThreadPool_GetAvailableThreadsNative (gint32 *worker_threads, gint32 *completion_port_threads);
-void
-ves_icall_System_Threading_ThreadPool_GetMinThreadsNative (gint32 *worker_threads, gint32 *completion_port_threads);
-void
-ves_icall_System_Threading_ThreadPool_GetMaxThreadsNative (gint32 *worker_threads, gint32 *completion_port_threads);
-MonoBoolean
-ves_icall_System_Threading_ThreadPool_SetMinThreadsNative (gint32 worker_threads, gint32 completion_port_threads);
-MonoBoolean
-ves_icall_System_Threading_ThreadPool_SetMaxThreadsNative (gint32 worker_threads, gint32 completion_port_threads);
-void
-ves_icall_System_Threading_ThreadPool_InitializeVMTp (MonoBoolean *enable_worker_tracking);
-MonoBoolean
-ves_icall_System_Threading_ThreadPool_NotifyWorkItemComplete (void);
-void
-ves_icall_System_Threading_ThreadPool_NotifyWorkItemProgressNative (void);
-void
-ves_icall_System_Threading_ThreadPool_ReportThreadStatus (MonoBoolean is_working);
-MonoBoolean
-ves_icall_System_Threading_ThreadPool_RequestWorkerThread (void);
-
-MonoBoolean
-ves_icall_System_Threading_ThreadPool_PostQueuedCompletionStatus (MonoNativeOverlapped *native_overlapped);
-
-MonoBoolean
-ves_icall_System_Threading_ThreadPool_BindIOCompletionCallbackNative (gpointer file_handle);
-
-MonoBoolean
-ves_icall_System_Threading_ThreadPool_IsThreadPoolHosted (void);
-
-/* Internals */
-
-gboolean
-mono_threadpool_ms_enqueue_work_item (MonoDomain *domain, MonoObject *work_item, MonoError *error);
-
-#endif // _MONO_THREADPOOL_MICROSOFT_H_
--- /dev/null
+/*
+ * threadpool-worker.c: native threadpool worker
+ *
+ * Author:
+ * Ludovic Henry (ludovic.henry@xamarin.com)
+ *
+ * Licensed under the MIT license. See LICENSE file in the project root for full license information.
+ */
+
+#include <stdlib.h>
+#define _USE_MATH_DEFINES // needed by MSVC to define math constants
+#include <math.h>
+#include <config.h>
+#include <glib.h>
+
+#include <mono/metadata/class-internals.h>
+#include <mono/metadata/exception.h>
+#include <mono/metadata/gc-internals.h>
+#include <mono/metadata/object.h>
+#include <mono/metadata/object-internals.h>
+#include <mono/metadata/threadpool.h>
+#include <mono/metadata/threadpool-worker.h>
+#include <mono/metadata/threadpool-io.h>
+#include <mono/metadata/w32event.h>
+#include <mono/utils/atomic.h>
+#include <mono/utils/mono-compiler.h>
+#include <mono/utils/mono-complex.h>
+#include <mono/utils/mono-lazy-init.h>
+#include <mono/utils/mono-logger.h>
+#include <mono/utils/mono-logger-internals.h>
+#include <mono/utils/mono-proclib.h>
+#include <mono/utils/mono-threads.h>
+#include <mono/utils/mono-time.h>
+#include <mono/utils/mono-rand.h>
+#include <mono/utils/refcount.h>
+
+#define CPU_USAGE_LOW 80
+#define CPU_USAGE_HIGH 95
+
+#define MONITOR_INTERVAL 500 // ms
+#define MONITOR_MINIMAL_LIFETIME 60 * 1000 // ms
+
+#define WORKER_CREATION_MAX_PER_SEC 10
+
+/* The exponent to apply to the gain. 1.0 means to use linear gain,
+ * higher values will enhance large moves and damp small ones.
+ * default: 2.0 */
+#define HILL_CLIMBING_GAIN_EXPONENT 2.0
+
+/* The 'cost' of a thread. 0 means drive for increased throughput regardless
+ * of thread count, higher values bias more against higher thread counts.
+ * default: 0.15 */
+#define HILL_CLIMBING_BIAS 0.15
+
+#define HILL_CLIMBING_WAVE_PERIOD 4
+#define HILL_CLIMBING_MAX_WAVE_MAGNITUDE 20
+#define HILL_CLIMBING_WAVE_MAGNITUDE_MULTIPLIER 1.0
+#define HILL_CLIMBING_WAVE_HISTORY_SIZE 8
+#define HILL_CLIMBING_TARGET_SIGNAL_TO_NOISE_RATIO 3.0
+#define HILL_CLIMBING_MAX_CHANGE_PER_SECOND 4
+#define HILL_CLIMBING_MAX_CHANGE_PER_SAMPLE 20
+#define HILL_CLIMBING_SAMPLE_INTERVAL_LOW 10
+#define HILL_CLIMBING_SAMPLE_INTERVAL_HIGH 200
+#define HILL_CLIMBING_ERROR_SMOOTHING_FACTOR 0.01
+#define HILL_CLIMBING_MAX_SAMPLE_ERROR_PERCENT 0.15
+
+typedef enum {
+ TRANSITION_WARMUP,
+ TRANSITION_INITIALIZING,
+ TRANSITION_RANDOM_MOVE,
+ TRANSITION_CLIMBING_MOVE,
+ TRANSITION_CHANGE_POINT,
+ TRANSITION_STABILIZING,
+ TRANSITION_STARVATION,
+ TRANSITION_THREAD_TIMED_OUT,
+ TRANSITION_UNDEFINED,
+} ThreadPoolHeuristicStateTransition;
+
+typedef struct {
+ gint32 wave_period;
+ gint32 samples_to_measure;
+ gdouble target_throughput_ratio;
+ gdouble target_signal_to_noise_ratio;
+ gdouble max_change_per_second;
+ gdouble max_change_per_sample;
+ gint32 max_thread_wave_magnitude;
+ gint32 sample_interval_low;
+ gdouble thread_magnitude_multiplier;
+ gint32 sample_interval_high;
+ gdouble throughput_error_smoothing_factor;
+ gdouble gain_exponent;
+ gdouble max_sample_error;
+
+ gdouble current_control_setting;
+ gint64 total_samples;
+ gint16 last_thread_count;
+ gdouble elapsed_since_last_change;
+ gdouble completions_since_last_change;
+
+ gdouble average_throughput_noise;
+
+ gdouble *samples;
+ gdouble *thread_counts;
+
+ guint32 current_sample_interval;
+ gpointer random_interval_generator;
+
+ gint32 accumulated_completion_count;
+ gdouble accumulated_sample_duration;
+} ThreadPoolHillClimbing;
+
+typedef struct {
+ MonoThreadPoolWorkerCallback callback;
+ gpointer data;
+} ThreadPoolWorkItem;
+
+typedef union {
+ struct {
+ gint16 max_working; /* determined by heuristic */
+ gint16 starting; /* starting, but not yet in worker_thread */
+ gint16 working; /* executing worker_thread */
+ gint16 parked; /* parked */
+ } _;
+ gint64 as_gint64;
+} ThreadPoolWorkerCounter;
+
+typedef MonoInternalThread ThreadPoolWorkerThread;
+
+struct MonoThreadPoolWorker {
+ MonoRefCount ref;
+
+ ThreadPoolWorkerCounter counters;
+
+ GPtrArray *threads; // ThreadPoolWorkerThread* []
+ MonoCoopMutex threads_lock; /* protect access to working_threads and parked_threads */
+ gint32 parked_threads_count;
+ MonoCoopCond parked_threads_cond;
+ MonoCoopCond threads_exit_cond;
+
+ ThreadPoolWorkItem *work_items; // ThreadPoolWorkItem []
+ gint32 work_items_count;
+ gint32 work_items_size;
+ MonoCoopMutex work_items_lock;
+
+ guint32 worker_creation_current_second;
+ guint32 worker_creation_current_count;
+ MonoCoopMutex worker_creation_lock;
+
+ gint32 heuristic_completions;
+ gint64 heuristic_sample_start;
+ gint64 heuristic_last_dequeue; // ms
+ gint64 heuristic_last_adjustment; // ms
+ gint64 heuristic_adjustment_interval; // ms
+ ThreadPoolHillClimbing heuristic_hill_climbing;
+ MonoCoopMutex heuristic_lock;
+
+ gint32 limit_worker_min;
+ gint32 limit_worker_max;
+
+ MonoCpuUsageState *cpu_usage_state;
+ gint32 cpu_usage;
+
+ /* suspended by the debugger */
+ gboolean suspended;
+
+ gint32 monitor_status;
+};
+
+enum {
+ MONITOR_STATUS_REQUESTED,
+ MONITOR_STATUS_WAITING_FOR_REQUEST,
+ MONITOR_STATUS_NOT_RUNNING,
+};
+
+#define COUNTER_CHECK(counter) \
+ do { \
+ g_assert (counter._.max_working > 0); \
+ g_assert (counter._.starting >= 0); \
+ g_assert (counter._.working >= 0); \
+ } while (0)
+
+#define COUNTER_ATOMIC(worker,var,block) \
+ do { \
+ ThreadPoolWorkerCounter __old; \
+ do { \
+ g_assert (worker); \
+ __old = COUNTER_READ (worker); \
+ (var) = __old; \
+ { block; } \
+ COUNTER_CHECK (var); \
+ } while (InterlockedCompareExchange64 (&worker->counters.as_gint64, (var).as_gint64, __old.as_gint64) != __old.as_gint64); \
+ } while (0)
+
+static inline ThreadPoolWorkerCounter
+COUNTER_READ (MonoThreadPoolWorker *worker)
+{
+ ThreadPoolWorkerCounter counter;
+ counter.as_gint64 = InterlockedRead64 (&worker->counters.as_gint64);
+ return counter;
+}
+
+static gpointer
+rand_create (void)
+{
+ mono_rand_open ();
+ return mono_rand_init (NULL, 0);
+}
+
+static guint32
+rand_next (gpointer *handle, guint32 min, guint32 max)
+{
+ MonoError error;
+ guint32 val;
+ mono_rand_try_get_uint32 (handle, &val, min, max, &error);
+ // FIXME handle error
+ mono_error_assert_ok (&error);
+ return val;
+}
+
+static void
+destroy (gpointer data)
+{
+ MonoThreadPoolWorker *worker;
+
+ worker = (MonoThreadPoolWorker*) data;
+ g_assert (worker);
+
+ // FIXME destroy everything
+
+ g_free (worker);
+}
+
+void
+mono_threadpool_worker_init (MonoThreadPoolWorker **worker)
+{
+ MonoThreadPoolWorker *wk;
+ ThreadPoolHillClimbing *hc;
+ const char *threads_per_cpu_env;
+ gint threads_per_cpu;
+ gint threads_count;
+
+ g_assert (worker);
+
+ wk = *worker = g_new0 (MonoThreadPoolWorker, 1);
+
+ mono_refcount_init (wk, destroy);
+
+ wk->threads = g_ptr_array_new ();
+ mono_coop_mutex_init (&wk->threads_lock);
+ wk->parked_threads_count = 0;
+ mono_coop_cond_init (&wk->parked_threads_cond);
+ mono_coop_cond_init (&wk->threads_exit_cond);
+
+ /* wk->work_items_size is inited to 0 */
+ mono_coop_mutex_init (&wk->work_items_lock);
+
+ wk->worker_creation_current_second = -1;
+ mono_coop_mutex_init (&wk->worker_creation_lock);
+
+ wk->heuristic_adjustment_interval = 10;
+ mono_coop_mutex_init (&wk->heuristic_lock);
+
+ mono_rand_open ();
+
+ hc = &wk->heuristic_hill_climbing;
+
+ hc->wave_period = HILL_CLIMBING_WAVE_PERIOD;
+ hc->max_thread_wave_magnitude = HILL_CLIMBING_MAX_WAVE_MAGNITUDE;
+ hc->thread_magnitude_multiplier = (gdouble) HILL_CLIMBING_WAVE_MAGNITUDE_MULTIPLIER;
+ hc->samples_to_measure = hc->wave_period * HILL_CLIMBING_WAVE_HISTORY_SIZE;
+ hc->target_throughput_ratio = (gdouble) HILL_CLIMBING_BIAS;
+ hc->target_signal_to_noise_ratio = (gdouble) HILL_CLIMBING_TARGET_SIGNAL_TO_NOISE_RATIO;
+ hc->max_change_per_second = (gdouble) HILL_CLIMBING_MAX_CHANGE_PER_SECOND;
+ hc->max_change_per_sample = (gdouble) HILL_CLIMBING_MAX_CHANGE_PER_SAMPLE;
+ hc->sample_interval_low = HILL_CLIMBING_SAMPLE_INTERVAL_LOW;
+ hc->sample_interval_high = HILL_CLIMBING_SAMPLE_INTERVAL_HIGH;
+ hc->throughput_error_smoothing_factor = (gdouble) HILL_CLIMBING_ERROR_SMOOTHING_FACTOR;
+ hc->gain_exponent = (gdouble) HILL_CLIMBING_GAIN_EXPONENT;
+ hc->max_sample_error = (gdouble) HILL_CLIMBING_MAX_SAMPLE_ERROR_PERCENT;
+ hc->current_control_setting = 0;
+ hc->total_samples = 0;
+ hc->last_thread_count = 0;
+ hc->average_throughput_noise = 0;
+ hc->elapsed_since_last_change = 0;
+ hc->accumulated_completion_count = 0;
+ hc->accumulated_sample_duration = 0;
+ hc->samples = g_new0 (gdouble, hc->samples_to_measure);
+ hc->thread_counts = g_new0 (gdouble, hc->samples_to_measure);
+ hc->random_interval_generator = rand_create ();
+ hc->current_sample_interval = rand_next (&hc->random_interval_generator, hc->sample_interval_low, hc->sample_interval_high);
+
+ if (!(threads_per_cpu_env = g_getenv ("MONO_THREADS_PER_CPU")))
+ threads_per_cpu = 1;
+ else
+ threads_per_cpu = CLAMP (atoi (threads_per_cpu_env), 1, 50);
+
+ threads_count = mono_cpu_count () * threads_per_cpu;
+
+ wk->limit_worker_min = threads_count;
+
+#if defined (PLATFORM_ANDROID) || defined (HOST_IOS)
+ wk->limit_worker_max = CLAMP (threads_count * 100, MIN (threads_count, 200), MAX (threads_count, 200));
+#else
+ wk->limit_worker_max = threads_count * 100;
+#endif
+
+ wk->counters._.max_working = wk->limit_worker_min;
+
+ wk->cpu_usage_state = g_new0 (MonoCpuUsageState, 1);
+
+ wk->suspended = FALSE;
+
+ wk->monitor_status = MONITOR_STATUS_NOT_RUNNING;
+}
+
+void
+mono_threadpool_worker_cleanup (MonoThreadPoolWorker *worker)
+{
+ MonoInternalThread *current;
+
+ /* we make the assumption along the code that we are
+ * cleaning up only if the runtime is shutting down */
+ g_assert (mono_runtime_is_shutting_down ());
+
+ current = mono_thread_internal_current ();
+
+ while (worker->monitor_status != MONITOR_STATUS_NOT_RUNNING)
+ mono_thread_info_sleep (1, NULL);
+
+ mono_coop_mutex_lock (&worker->threads_lock);
+
+ /* unpark all worker->parked_threads */
+ mono_coop_cond_broadcast (&worker->parked_threads_cond);
+
+ for (;;) {
+ ThreadPoolWorkerCounter counter;
+
+ counter = COUNTER_READ (worker);
+ if (counter._.starting + counter._.working + counter._.parked == 0)
+ break;
+
+ if (counter._.starting + counter._.working + counter._.parked == 1) {
+ if (worker->threads->len == 1 && g_ptr_array_index (worker->threads, 0) == current) {
+ /* We are waiting on ourselves */
+ break;
+ }
+ }
+
+ mono_coop_cond_wait (&worker->threads_exit_cond, &worker->threads_lock);
+ }
+
+ mono_coop_mutex_unlock (&worker->threads_lock);
+
+ mono_refcount_dec (worker);
+}
+
+static void
+work_item_lock (MonoThreadPoolWorker *worker)
+{
+ mono_coop_mutex_lock (&worker->work_items_lock);
+}
+
+static void
+work_item_unlock (MonoThreadPoolWorker *worker)
+{
+ mono_coop_mutex_unlock (&worker->work_items_lock);
+}
+
+static void
+work_item_push (MonoThreadPoolWorker *worker, MonoThreadPoolWorkerCallback callback, gpointer data)
+{
+ ThreadPoolWorkItem work_item;
+
+ g_assert (worker);
+ g_assert (callback);
+
+ work_item.callback = callback;
+ work_item.data = data;
+
+ work_item_lock (worker);
+
+ g_assert (worker->work_items_count <= worker->work_items_size);
+
+ if (G_UNLIKELY (worker->work_items_count == worker->work_items_size)) {
+ worker->work_items_size += 64;
+ worker->work_items = g_renew (ThreadPoolWorkItem, worker->work_items, worker->work_items_size);
+ }
+
+ g_assert (worker->work_items);
+
+ worker->work_items [worker->work_items_count ++] = work_item;
+
+ // printf ("[push] worker->work_items = %p, worker->work_items_count = %d, worker->work_items_size = %d\n",
+ // worker->work_items, worker->work_items_count, worker->work_items_size);
+
+ work_item_unlock (worker);
+}
+
+static gboolean
+work_item_try_pop (MonoThreadPoolWorker *worker, ThreadPoolWorkItem *work_item)
+{
+ g_assert (worker);
+ g_assert (work_item);
+
+ work_item_lock (worker);
+
+ // printf ("[pop] worker->work_items = %p, worker->work_items_count = %d, worker->work_items_size = %d\n",
+ // worker->work_items, worker->work_items_count, worker->work_items_size);
+
+ if (worker->work_items_count == 0) {
+ work_item_unlock (worker);
+ return FALSE;
+ }
+
+ *work_item = worker->work_items [-- worker->work_items_count];
+
+ if (G_UNLIKELY (worker->work_items_count >= 64 * 3 && worker->work_items_count < worker->work_items_size / 2)) {
+ worker->work_items_size -= 64;
+ worker->work_items = g_renew (ThreadPoolWorkItem, worker->work_items, worker->work_items_size);
+ }
+
+ work_item_unlock (worker);
+
+ return TRUE;
+}
+
+static gint32
+work_item_count (MonoThreadPoolWorker *worker)
+{
+ gint32 count;
+
+ work_item_lock (worker);
+ count = worker->work_items_count;
+ work_item_unlock (worker);
+
+ return count;
+}
+
+static void worker_request (MonoThreadPoolWorker *worker);
+
+void
+mono_threadpool_worker_enqueue (MonoThreadPoolWorker *worker, MonoThreadPoolWorkerCallback callback, gpointer data)
+{
+ work_item_push (worker, callback, data);
+
+ worker_request (worker);
+}
+
+static void
+worker_wait_interrupt (gpointer data)
+{
+ MonoThreadPoolWorker *worker;
+
+ worker = (MonoThreadPoolWorker*) data;
+ g_assert (worker);
+
+ mono_coop_mutex_lock (&worker->threads_lock);
+ mono_coop_cond_signal (&worker->parked_threads_cond);
+ mono_coop_mutex_unlock (&worker->threads_lock);
+
+ mono_refcount_dec (worker);
+}
+
+/* return TRUE if timeout, FALSE otherwise (worker unpark or interrupt) */
+static gboolean
+worker_park (MonoThreadPoolWorker *worker)
+{
+ gboolean timeout = FALSE;
+
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] worker parking", mono_native_thread_id_get ());
+
+ mono_coop_mutex_lock (&worker->threads_lock);
+
+ if (!mono_runtime_is_shutting_down ()) {
+ static gpointer rand_handle = NULL;
+ MonoInternalThread *thread;
+ gboolean interrupted = FALSE;
+ ThreadPoolWorkerCounter counter;
+
+ if (!rand_handle)
+ rand_handle = rand_create ();
+ g_assert (rand_handle);
+
+ thread = mono_thread_internal_current ();
+ g_assert (thread);
+
+ COUNTER_ATOMIC (worker, counter, {
+ counter._.working --;
+ counter._.parked ++;
+ });
+
+ worker->parked_threads_count += 1;
+
+ mono_thread_info_install_interrupt (worker_wait_interrupt, mono_refcount_inc (worker), &interrupted);
+ if (interrupted) {
+ mono_refcount_dec (worker);
+ goto done;
+ }
+
+ if (mono_coop_cond_timedwait (&worker->parked_threads_cond, &worker->threads_lock, rand_next (&rand_handle, 5 * 1000, 60 * 1000)) != 0)
+ timeout = TRUE;
+
+ mono_thread_info_uninstall_interrupt (&interrupted);
+ if (!interrupted)
+ mono_refcount_dec (worker);
+
+done:
+ worker->parked_threads_count -= 1;
+
+ COUNTER_ATOMIC (worker, counter, {
+ counter._.working ++;
+ counter._.parked --;
+ });
+ }
+
+ mono_coop_mutex_unlock (&worker->threads_lock);
+
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] worker unparking, timeout? %s", mono_native_thread_id_get (), timeout ? "yes" : "no");
+
+ return timeout;
+}
+
+static gboolean
+worker_try_unpark (MonoThreadPoolWorker *worker)
+{
+ gboolean res = FALSE;
+
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try unpark worker", mono_native_thread_id_get ());
+
+ mono_coop_mutex_lock (&worker->threads_lock);
+ if (worker->parked_threads_count > 0) {
+ mono_coop_cond_signal (&worker->parked_threads_cond);
+ res = TRUE;
+ }
+ mono_coop_mutex_unlock (&worker->threads_lock);
+
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try unpark worker, success? %s", mono_native_thread_id_get (), res ? "yes" : "no");
+
+ return res;
+}
+
+static void
+worker_thread (gpointer data)
+{
+ MonoThreadPoolWorker *worker;
+ MonoError error;
+ MonoInternalThread *thread;
+ ThreadPoolWorkerCounter counter;
+
+ mono_trace (G_LOG_LEVEL_INFO, MONO_TRACE_THREADPOOL, "[%p] worker starting", mono_native_thread_id_get ());
+
+ worker = (MonoThreadPoolWorker*) data;
+ g_assert (worker);
+
+ COUNTER_ATOMIC (worker, counter, {
+ counter._.starting --;
+ counter._.working ++;
+ });
+
+ thread = mono_thread_internal_current ();
+ g_assert (thread);
+
+ mono_coop_mutex_lock (&worker->threads_lock);
+ g_ptr_array_add (worker->threads, thread);
+ mono_coop_mutex_unlock (&worker->threads_lock);
+
+ mono_thread_set_name_internal (thread, mono_string_new (mono_get_root_domain (), "Threadpool worker"), FALSE, &error);
+ mono_error_assert_ok (&error);
+
+ while (!mono_runtime_is_shutting_down ()) {
+ ThreadPoolWorkItem work_item;
+
+ if (mono_thread_interruption_checkpoint ())
+ continue;
+
+ if (!work_item_try_pop (worker, &work_item)) {
+ gboolean timeout;
+
+ timeout = worker_park (worker);
+ if (timeout)
+ break;
+
+ continue;
+ }
+
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] worker executing %p (%p)",
+ mono_native_thread_id_get (), work_item.callback, work_item.data);
+
+ work_item.callback (work_item.data);
+ }
+
+ mono_coop_mutex_lock (&worker->threads_lock);
+
+ COUNTER_ATOMIC (worker, counter, {
+ counter._.working --;
+ });
+
+ g_ptr_array_remove (worker->threads, thread);
+
+ mono_coop_cond_signal (&worker->threads_exit_cond);
+
+ mono_coop_mutex_unlock (&worker->threads_lock);
+
+ mono_trace (G_LOG_LEVEL_INFO, MONO_TRACE_THREADPOOL, "[%p] worker finishing", mono_native_thread_id_get ());
+
+ mono_refcount_dec (worker);
+}
+
+static gboolean
+worker_try_create (MonoThreadPoolWorker *worker)
+{
+ MonoError error;
+ MonoInternalThread *thread;
+ gint64 current_ticks;
+ gint32 now;
+ ThreadPoolWorkerCounter counter;
+
+ if (mono_runtime_is_shutting_down ())
+ return FALSE;
+
+ mono_coop_mutex_lock (&worker->worker_creation_lock);
+
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try create worker", mono_native_thread_id_get ());
+
+ current_ticks = mono_100ns_ticks ();
+ if (0 == current_ticks) {
+ g_warning ("failed to get 100ns ticks");
+ } else {
+ now = current_ticks / (10 * 1000 * 1000);
+ if (worker->worker_creation_current_second != now) {
+ worker->worker_creation_current_second = now;
+ worker->worker_creation_current_count = 0;
+ } else {
+ g_assert (worker->worker_creation_current_count <= WORKER_CREATION_MAX_PER_SEC);
+ if (worker->worker_creation_current_count == WORKER_CREATION_MAX_PER_SEC) {
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try create worker, failed: maximum number of worker created per second reached, current count = %d",
+ mono_native_thread_id_get (), worker->worker_creation_current_count);
+ mono_coop_mutex_unlock (&worker->worker_creation_lock);
+ return FALSE;
+ }
+ }
+ }
+
+ COUNTER_ATOMIC (worker, counter, {
+ if (counter._.working >= counter._.max_working) {
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try create worker, failed: maximum number of working threads reached",
+ mono_native_thread_id_get ());
+ mono_coop_mutex_unlock (&worker->worker_creation_lock);
+ return FALSE;
+ }
+ counter._.starting ++;
+ });
+
+ thread = mono_thread_create_internal (mono_get_root_domain (), worker_thread, mono_refcount_inc (worker), TRUE, 0, &error);
+ if (!thread) {
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try create worker, failed: could not create thread due to %s", mono_native_thread_id_get (), mono_error_get_message (&error));
+ mono_error_cleanup (&error);
+
+ COUNTER_ATOMIC (worker, counter, {
+ counter._.starting --;
+ });
+
+ mono_coop_mutex_unlock (&worker->worker_creation_lock);
+
+ mono_refcount_dec (worker);
+
+ return FALSE;
+ }
+
+ worker->worker_creation_current_count += 1;
+
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try create worker, created %p, now = %d count = %d",
+ mono_native_thread_id_get (), (gpointer) thread->tid, now, worker->worker_creation_current_count);
+
+ mono_coop_mutex_unlock (&worker->worker_creation_lock);
+ return TRUE;
+}
+
+static void monitor_ensure_running (MonoThreadPoolWorker *worker);
+
+static void
+worker_request (MonoThreadPoolWorker *worker)
+{
+ g_assert (worker);
+
+ if (worker->suspended)
+ return;
+
+ monitor_ensure_running (worker);
+
+ if (worker_try_unpark (worker)) {
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] request worker, unparked", mono_native_thread_id_get ());
+ return;
+ }
+
+ if (worker_try_create (worker)) {
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] request worker, created", mono_native_thread_id_get ());
+ return;
+ }
+
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] request worker, failed", mono_native_thread_id_get ());
+}
+
+static gboolean
+monitor_should_keep_running (MonoThreadPoolWorker *worker)
+{
+ static gint64 last_should_keep_running = -1;
+
+ g_assert (worker->monitor_status == MONITOR_STATUS_WAITING_FOR_REQUEST || worker->monitor_status == MONITOR_STATUS_REQUESTED);
+
+ if (InterlockedExchange (&worker->monitor_status, MONITOR_STATUS_WAITING_FOR_REQUEST) == MONITOR_STATUS_WAITING_FOR_REQUEST) {
+ gboolean should_keep_running = TRUE, force_should_keep_running = FALSE;
+
+ if (mono_runtime_is_shutting_down ()) {
+ should_keep_running = FALSE;
+ } else {
+ if (work_item_count (worker) == 0)
+ should_keep_running = FALSE;
+
+ if (!should_keep_running) {
+ if (last_should_keep_running == -1 || mono_100ns_ticks () - last_should_keep_running < MONITOR_MINIMAL_LIFETIME * 1000 * 10) {
+ should_keep_running = force_should_keep_running = TRUE;
+ }
+ }
+ }
+
+ if (should_keep_running) {
+ if (last_should_keep_running == -1 || !force_should_keep_running)
+ last_should_keep_running = mono_100ns_ticks ();
+ } else {
+ last_should_keep_running = -1;
+ if (InterlockedCompareExchange (&worker->monitor_status, MONITOR_STATUS_NOT_RUNNING, MONITOR_STATUS_WAITING_FOR_REQUEST) == MONITOR_STATUS_WAITING_FOR_REQUEST)
+ return FALSE;
+ }
+ }
+
+ g_assert (worker->monitor_status == MONITOR_STATUS_WAITING_FOR_REQUEST || worker->monitor_status == MONITOR_STATUS_REQUESTED);
+
+ return TRUE;
+}
+
+static gboolean
+monitor_sufficient_delay_since_last_dequeue (MonoThreadPoolWorker *worker)
+{
+ gint64 threshold;
+
+ g_assert (worker);
+
+ if (worker->cpu_usage < CPU_USAGE_LOW) {
+ threshold = MONITOR_INTERVAL;
+ } else {
+ ThreadPoolWorkerCounter counter;
+ counter = COUNTER_READ (worker);
+ threshold = counter._.max_working * MONITOR_INTERVAL * 2;
+ }
+
+ return mono_msec_ticks () >= worker->heuristic_last_dequeue + threshold;
+}
+
+static void hill_climbing_force_change (MonoThreadPoolWorker *worker, gint16 new_thread_count, ThreadPoolHeuristicStateTransition transition);
+
+static void
+monitor_thread (gpointer data)
+{
+ MonoThreadPoolWorker *worker;
+ MonoInternalThread *internal;
+ guint i;
+
+ worker = (MonoThreadPoolWorker*) data;
+ g_assert (worker);
+
+ internal = mono_thread_internal_current ();
+ g_assert (internal);
+
+ mono_cpu_usage (worker->cpu_usage_state);
+
+ // printf ("monitor_thread: start\n");
+
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] monitor thread, started", mono_native_thread_id_get ());
+
+ do {
+ ThreadPoolWorkerCounter counter;
+ gboolean limit_worker_max_reached;
+ gint32 interval_left = MONITOR_INTERVAL;
+ gint32 awake = 0; /* number of spurious awakes we tolerate before doing a round of rebalancing */
+
+ g_assert (worker->monitor_status != MONITOR_STATUS_NOT_RUNNING);
+
+ // counter = COUNTER_READ (worker);
+ // printf ("monitor_thread: starting = %d working = %d parked = %d max_working = %d\n",
+ // counter._.starting, counter._.working, counter._.parked, counter._.max_working);
+
+ do {
+ gint64 ts;
+ gboolean alerted = FALSE;
+
+ if (mono_runtime_is_shutting_down ())
+ break;
+
+ ts = mono_msec_ticks ();
+ if (mono_thread_info_sleep (interval_left, &alerted) == 0)
+ break;
+ interval_left -= mono_msec_ticks () - ts;
+
+ g_assert (!(internal->state & ThreadState_StopRequested));
+ mono_thread_interruption_checkpoint ();
+ } while (interval_left > 0 && ++awake < 10);
+
+ if (mono_runtime_is_shutting_down ())
+ continue;
+
+ if (worker->suspended)
+ continue;
+
+ if (work_item_count (worker) == 0)
+ continue;
+
+ worker->cpu_usage = mono_cpu_usage (worker->cpu_usage_state);
+
+ if (!monitor_sufficient_delay_since_last_dequeue (worker))
+ continue;
+
+ limit_worker_max_reached = FALSE;
+
+ COUNTER_ATOMIC (worker, counter, {
+ if (counter._.max_working >= worker->limit_worker_max) {
+ limit_worker_max_reached = TRUE;
+ break;
+ }
+ counter._.max_working ++;
+ });
+
+ if (limit_worker_max_reached)
+ continue;
+
+ hill_climbing_force_change (worker, counter._.max_working, TRANSITION_STARVATION);
+
+ for (i = 0; i < 5; ++i) {
+ if (mono_runtime_is_shutting_down ())
+ break;
+
+ if (worker_try_unpark (worker)) {
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] monitor thread, unparked", mono_native_thread_id_get ());
+ break;
+ }
+
+ if (worker_try_create (worker)) {
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] monitor thread, created", mono_native_thread_id_get ());
+ break;
+ }
+ }
+ } while (monitor_should_keep_running (worker));
+
+ // printf ("monitor_thread: stop\n");
+
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] monitor thread, finished", mono_native_thread_id_get ());
+}
+
+static void
+monitor_ensure_running (MonoThreadPoolWorker *worker)
+{
+ MonoError error;
+ for (;;) {
+ switch (worker->monitor_status) {
+ case MONITOR_STATUS_REQUESTED:
+ // printf ("monitor_thread: requested\n");
+ return;
+ case MONITOR_STATUS_WAITING_FOR_REQUEST:
+ // printf ("monitor_thread: waiting for request\n");
+ InterlockedCompareExchange (&worker->monitor_status, MONITOR_STATUS_REQUESTED, MONITOR_STATUS_WAITING_FOR_REQUEST);
+ break;
+ case MONITOR_STATUS_NOT_RUNNING:
+ // printf ("monitor_thread: not running\n");
+ if (mono_runtime_is_shutting_down ())
+ return;
+ if (InterlockedCompareExchange (&worker->monitor_status, MONITOR_STATUS_REQUESTED, MONITOR_STATUS_NOT_RUNNING) == MONITOR_STATUS_NOT_RUNNING) {
+ // printf ("monitor_thread: creating\n");
+ if (!mono_thread_create_internal (mono_get_root_domain (), monitor_thread, worker, TRUE, SMALL_STACK, &error)) {
+ // printf ("monitor_thread: creating failed\n");
+ worker->monitor_status = MONITOR_STATUS_NOT_RUNNING;
+ mono_error_cleanup (&error);
+ }
+ return;
+ }
+ break;
+ default: g_assert_not_reached ();
+ }
+ }
+}
+
+static void
+hill_climbing_change_thread_count (MonoThreadPoolWorker *worker, gint16 new_thread_count, ThreadPoolHeuristicStateTransition transition)
+{
+ ThreadPoolHillClimbing *hc;
+
+ g_assert (worker);
+
+ hc = &worker->heuristic_hill_climbing;
+
+ mono_trace (G_LOG_LEVEL_INFO, MONO_TRACE_THREADPOOL, "[%p] hill climbing, change max number of threads %d", mono_native_thread_id_get (), new_thread_count);
+
+ hc->last_thread_count = new_thread_count;
+ hc->current_sample_interval = rand_next (&hc->random_interval_generator, hc->sample_interval_low, hc->sample_interval_high);
+ hc->elapsed_since_last_change = 0;
+ hc->completions_since_last_change = 0;
+}
+
+static void
+hill_climbing_force_change (MonoThreadPoolWorker *worker, gint16 new_thread_count, ThreadPoolHeuristicStateTransition transition)
+{
+ ThreadPoolHillClimbing *hc;
+
+ g_assert (worker);
+
+ hc = &worker->heuristic_hill_climbing;
+
+ if (new_thread_count != hc->last_thread_count) {
+ hc->current_control_setting += new_thread_count - hc->last_thread_count;
+ hill_climbing_change_thread_count (worker, new_thread_count, transition);
+ }
+}
+
+static double_complex
+hill_climbing_get_wave_component (MonoThreadPoolWorker *worker, gdouble *samples, guint sample_count, gdouble period)
+{
+ ThreadPoolHillClimbing *hc;
+ gdouble w, cosine, sine, coeff, q0, q1, q2;
+ guint i;
+
+ g_assert (worker);
+ g_assert (sample_count >= period);
+ g_assert (period >= 2);
+
+ hc = &worker->heuristic_hill_climbing;
+
+ w = 2.0 * M_PI / period;
+ cosine = cos (w);
+ sine = sin (w);
+ coeff = 2.0 * cosine;
+ q0 = q1 = q2 = 0;
+
+ for (i = 0; i < sample_count; ++i) {
+ q0 = coeff * q1 - q2 + samples [(hc->total_samples - sample_count + i) % hc->samples_to_measure];
+ q2 = q1;
+ q1 = q0;
+ }
+
+ return mono_double_complex_scalar_div (mono_double_complex_make (q1 - q2 * cosine, (q2 * sine)), ((gdouble)sample_count));
+}
+
+static gint16
+hill_climbing_update (MonoThreadPoolWorker *worker, gint16 current_thread_count, guint32 sample_duration, gint32 completions, gint64 *adjustment_interval)
+{
+ ThreadPoolHillClimbing *hc;
+ ThreadPoolHeuristicStateTransition transition;
+ gdouble throughput;
+ gdouble throughput_error_estimate;
+ gdouble confidence;
+ gdouble move;
+ gdouble gain;
+ gint sample_index;
+ gint sample_count;
+ gint new_thread_wave_magnitude;
+ gint new_thread_count;
+ double_complex thread_wave_component;
+ double_complex throughput_wave_component;
+ double_complex ratio;
+
+ g_assert (worker);
+ g_assert (adjustment_interval);
+
+ hc = &worker->heuristic_hill_climbing;
+
+ /* If someone changed the thread count without telling us, update our records accordingly. */
+ if (current_thread_count != hc->last_thread_count)
+ hill_climbing_force_change (worker, current_thread_count, TRANSITION_INITIALIZING);
+
+ /* Update the cumulative stats for this thread count */
+ hc->elapsed_since_last_change += sample_duration;
+ hc->completions_since_last_change += completions;
+
+ /* Add in any data we've already collected about this sample */
+ sample_duration += hc->accumulated_sample_duration;
+ completions += hc->accumulated_completion_count;
+
+ /* We need to make sure we're collecting reasonably accurate data. Since we're just counting the end
+ * of each work item, we are goinng to be missing some data about what really happened during the
+ * sample interval. The count produced by each thread includes an initial work item that may have
+ * started well before the start of the interval, and each thread may have been running some new
+ * work item for some time before the end of the interval, which did not yet get counted. So
+ * our count is going to be off by +/- threadCount workitems.
+ *
+ * The exception is that the thread that reported to us last time definitely wasn't running any work
+ * at that time, and the thread that's reporting now definitely isn't running a work item now. So
+ * we really only need to consider threadCount-1 threads.
+ *
+ * Thus the percent error in our count is +/- (threadCount-1)/numCompletions.
+ *
+ * We cannot rely on the frequency-domain analysis we'll be doing later to filter out this error, because
+ * of the way it accumulates over time. If this sample is off by, say, 33% in the negative direction,
+ * then the next one likely will be too. The one after that will include the sum of the completions
+ * we missed in the previous samples, and so will be 33% positive. So every three samples we'll have
+ * two "low" samples and one "high" sample. This will appear as periodic variation right in the frequency
+ * range we're targeting, which will not be filtered by the frequency-domain translation. */
+ if (hc->total_samples > 0 && ((current_thread_count - 1.0) / completions) >= hc->max_sample_error) {
+ /* Not accurate enough yet. Let's accumulate the data so
+ * far, and tell the MonoThreadPoolWorker to collect a little more. */
+ hc->accumulated_sample_duration = sample_duration;
+ hc->accumulated_completion_count = completions;
+ *adjustment_interval = 10;
+ return current_thread_count;
+ }
+
+ /* We've got enouugh data for our sample; reset our accumulators for next time. */
+ hc->accumulated_sample_duration = 0;
+ hc->accumulated_completion_count = 0;
+
+ /* Add the current thread count and throughput sample to our history. */
+ throughput = ((gdouble) completions) / sample_duration;
+
+ sample_index = hc->total_samples % hc->samples_to_measure;
+ hc->samples [sample_index] = throughput;
+ hc->thread_counts [sample_index] = current_thread_count;
+ hc->total_samples ++;
+
+ /* Set up defaults for our metrics. */
+ thread_wave_component = mono_double_complex_make(0, 0);
+ throughput_wave_component = mono_double_complex_make(0, 0);
+ throughput_error_estimate = 0;
+ ratio = mono_double_complex_make(0, 0);
+ confidence = 0;
+
+ transition = TRANSITION_WARMUP;
+
+ /* How many samples will we use? It must be at least the three wave periods we're looking for, and it must also
+ * be a whole multiple of the primary wave's period; otherwise the frequency we're looking for will fall between
+ * two frequency bands in the Fourier analysis, and we won't be able to measure it accurately. */
+ sample_count = ((gint) MIN (hc->total_samples - 1, hc->samples_to_measure) / hc->wave_period) * hc->wave_period;
+
+ if (sample_count > hc->wave_period) {
+ guint i;
+ gdouble average_throughput;
+ gdouble average_thread_count;
+ gdouble sample_sum = 0;
+ gdouble thread_sum = 0;
+
+ /* Average the throughput and thread count samples, so we can scale the wave magnitudes later. */
+ for (i = 0; i < sample_count; ++i) {
+ guint j = (hc->total_samples - sample_count + i) % hc->samples_to_measure;
+ sample_sum += hc->samples [j];
+ thread_sum += hc->thread_counts [j];
+ }
+
+ average_throughput = sample_sum / sample_count;
+ average_thread_count = thread_sum / sample_count;
+
+ if (average_throughput > 0 && average_thread_count > 0) {
+ gdouble noise_for_confidence, adjacent_period_1, adjacent_period_2;
+
+ /* Calculate the periods of the adjacent frequency bands we'll be using to
+ * measure noise levels. We want the two adjacent Fourier frequency bands. */
+ adjacent_period_1 = sample_count / (((gdouble) sample_count) / ((gdouble) hc->wave_period) + 1);
+ adjacent_period_2 = sample_count / (((gdouble) sample_count) / ((gdouble) hc->wave_period) - 1);
+
+ /* Get the the three different frequency components of the throughput (scaled by average
+ * throughput). Our "error" estimate (the amount of noise that might be present in the
+ * frequency band we're really interested in) is the average of the adjacent bands. */
+ throughput_wave_component = mono_double_complex_scalar_div (hill_climbing_get_wave_component (worker, hc->samples, sample_count, hc->wave_period), average_throughput);
+ throughput_error_estimate = cabs (mono_double_complex_scalar_div (hill_climbing_get_wave_component (worker, hc->samples, sample_count, adjacent_period_1), average_throughput));
+
+ if (adjacent_period_2 <= sample_count) {
+ throughput_error_estimate = MAX (throughput_error_estimate, cabs (mono_double_complex_scalar_div (hill_climbing_get_wave_component (
+ worker, hc->samples, sample_count, adjacent_period_2), average_throughput)));
+ }
+
+ /* Do the same for the thread counts, so we have something to compare to. We don't
+ * measure thread count noise, because there is none; these are exact measurements. */
+ thread_wave_component = mono_double_complex_scalar_div (hill_climbing_get_wave_component (worker, hc->thread_counts, sample_count, hc->wave_period), average_thread_count);
+
+ /* Update our moving average of the throughput noise. We'll use this
+ * later as feedback to determine the new size of the thread wave. */
+ if (hc->average_throughput_noise == 0) {
+ hc->average_throughput_noise = throughput_error_estimate;
+ } else {
+ hc->average_throughput_noise = (hc->throughput_error_smoothing_factor * throughput_error_estimate)
+ + ((1.0 + hc->throughput_error_smoothing_factor) * hc->average_throughput_noise);
+ }
+
+ if (cabs (thread_wave_component) > 0) {
+ /* Adjust the throughput wave so it's centered around the target wave,
+ * and then calculate the adjusted throughput/thread ratio. */
+ ratio = mono_double_complex_div (mono_double_complex_sub (throughput_wave_component, mono_double_complex_scalar_mul(thread_wave_component, hc->target_throughput_ratio)), thread_wave_component);
+ transition = TRANSITION_CLIMBING_MOVE;
+ } else {
+ ratio = mono_double_complex_make (0, 0);
+ transition = TRANSITION_STABILIZING;
+ }
+
+ noise_for_confidence = MAX (hc->average_throughput_noise, throughput_error_estimate);
+ if (noise_for_confidence > 0) {
+ confidence = cabs (thread_wave_component) / noise_for_confidence / hc->target_signal_to_noise_ratio;
+ } else {
+ /* there is no noise! */
+ confidence = 1.0;
+ }
+ }
+ }
+
+ /* We use just the real part of the complex ratio we just calculated. If the throughput signal
+ * is exactly in phase with the thread signal, this will be the same as taking the magnitude of
+ * the complex move and moving that far up. If they're 180 degrees out of phase, we'll move
+ * backward (because this indicates that our changes are having the opposite of the intended effect).
+ * If they're 90 degrees out of phase, we won't move at all, because we can't tell wether we're
+ * having a negative or positive effect on throughput. */
+ move = creal (ratio);
+ move = CLAMP (move, -1.0, 1.0);
+
+ /* Apply our confidence multiplier. */
+ move *= CLAMP (confidence, -1.0, 1.0);
+
+ /* Now apply non-linear gain, such that values around zero are attenuated, while higher values
+ * are enhanced. This allows us to move quickly if we're far away from the target, but more slowly
+ * if we're getting close, giving us rapid ramp-up without wild oscillations around the target. */
+ gain = hc->max_change_per_second * sample_duration;
+ move = pow (fabs (move), hc->gain_exponent) * (move >= 0.0 ? 1 : -1) * gain;
+ move = MIN (move, hc->max_change_per_sample);
+
+ /* If the result was positive, and CPU is > 95%, refuse the move. */
+ if (move > 0.0 && worker->cpu_usage > CPU_USAGE_HIGH)
+ move = 0.0;
+
+ /* Apply the move to our control setting. */
+ hc->current_control_setting += move;
+
+ /* Calculate the new thread wave magnitude, which is based on the moving average we've been keeping of the
+ * throughput error. This average starts at zero, so we'll start with a nice safe little wave at first. */
+ new_thread_wave_magnitude = (gint)(0.5 + (hc->current_control_setting * hc->average_throughput_noise
+ * hc->target_signal_to_noise_ratio * hc->thread_magnitude_multiplier * 2.0));
+ new_thread_wave_magnitude = CLAMP (new_thread_wave_magnitude, 1, hc->max_thread_wave_magnitude);
+
+ /* Make sure our control setting is within the MonoThreadPoolWorker's limits. */
+ hc->current_control_setting = CLAMP (hc->current_control_setting, worker->limit_worker_min, worker->limit_worker_max - new_thread_wave_magnitude);
+
+ /* Calculate the new thread count (control setting + square wave). */
+ new_thread_count = (gint)(hc->current_control_setting + new_thread_wave_magnitude * ((hc->total_samples / (hc->wave_period / 2)) % 2));
+
+ /* Make sure the new thread count doesn't exceed the MonoThreadPoolWorker's limits. */
+ new_thread_count = CLAMP (new_thread_count, worker->limit_worker_min, worker->limit_worker_max);
+
+ if (new_thread_count != current_thread_count)
+ hill_climbing_change_thread_count (worker, new_thread_count, transition);
+
+ if (creal (ratio) < 0.0 && new_thread_count == worker->limit_worker_min)
+ *adjustment_interval = (gint)(0.5 + hc->current_sample_interval * (10.0 * MAX (-1.0 * creal (ratio), 1.0)));
+ else
+ *adjustment_interval = hc->current_sample_interval;
+
+ return new_thread_count;
+}
+
+static gboolean
+heuristic_should_adjust (MonoThreadPoolWorker *worker)
+{
+ if (worker->heuristic_last_dequeue > worker->heuristic_last_adjustment + worker->heuristic_adjustment_interval) {
+ ThreadPoolWorkerCounter counter;
+ counter = COUNTER_READ (worker);
+ if (counter._.working <= counter._.max_working)
+ return TRUE;
+ }
+
+ return FALSE;
+}
+
+static void
+heuristic_adjust (MonoThreadPoolWorker *worker)
+{
+ if (mono_coop_mutex_trylock (&worker->heuristic_lock) == 0) {
+ gint32 completions = InterlockedExchange (&worker->heuristic_completions, 0);
+ gint64 sample_end = mono_msec_ticks ();
+ gint64 sample_duration = sample_end - worker->heuristic_sample_start;
+
+ if (sample_duration >= worker->heuristic_adjustment_interval / 2) {
+ ThreadPoolWorkerCounter counter;
+ gint16 new_thread_count;
+
+ counter = COUNTER_READ (worker);
+ new_thread_count = hill_climbing_update (worker, counter._.max_working, sample_duration, completions, &worker->heuristic_adjustment_interval);
+
+ COUNTER_ATOMIC (worker, counter, {
+ counter._.max_working = new_thread_count;
+ });
+
+ if (new_thread_count > counter._.max_working)
+ worker_request (worker);
+
+ worker->heuristic_sample_start = sample_end;
+ worker->heuristic_last_adjustment = mono_msec_ticks ();
+ }
+
+ mono_coop_mutex_unlock (&worker->heuristic_lock);
+ }
+}
+
+static void
+heuristic_notify_work_completed (MonoThreadPoolWorker *worker)
+{
+ g_assert (worker);
+
+ InterlockedIncrement (&worker->heuristic_completions);
+ worker->heuristic_last_dequeue = mono_msec_ticks ();
+
+ if (heuristic_should_adjust (worker))
+ heuristic_adjust (worker);
+}
+
+gboolean
+mono_threadpool_worker_notify_completed (MonoThreadPoolWorker *worker)
+{
+ ThreadPoolWorkerCounter counter;
+
+ heuristic_notify_work_completed (worker);
+
+ counter = COUNTER_READ (worker);
+ return counter._.working <= counter._.max_working;
+}
+
+gint32
+mono_threadpool_worker_get_min (MonoThreadPoolWorker *worker)
+{
+ return worker->limit_worker_min;
+}
+
+gboolean
+mono_threadpool_worker_set_min (MonoThreadPoolWorker *worker, gint32 value)
+{
+ if (value <= 0 || value > worker->limit_worker_max)
+ return FALSE;
+
+ worker->limit_worker_min = value;
+ return TRUE;
+}
+
+gint32
+mono_threadpool_worker_get_max (MonoThreadPoolWorker *worker)
+{
+ return worker->limit_worker_max;
+}
+
+gboolean
+mono_threadpool_worker_set_max (MonoThreadPoolWorker *worker, gint32 value)
+{
+ gint32 cpu_count = mono_cpu_count ();
+
+ if (value < worker->limit_worker_min || value < cpu_count)
+ return FALSE;
+
+ worker->limit_worker_max = value;
+ return TRUE;
+}
+
+void
+mono_threadpool_worker_set_suspended (MonoThreadPoolWorker *worker, gboolean suspended)
+{
+ worker->suspended = suspended;
+ if (!suspended)
+ worker_request (worker);
+}
--- /dev/null
+
+#ifndef _MONO_METADATA_THREADPOOL_WORKER_H
+#define _MONO_METADATA_THREADPOOL_WORKER_H
+
+typedef struct MonoThreadPoolWorker MonoThreadPoolWorker;
+
+typedef void (*MonoThreadPoolWorkerCallback)(gpointer);
+
+void
+mono_threadpool_worker_init (MonoThreadPoolWorker **worker);
+
+void
+mono_threadpool_worker_cleanup (MonoThreadPoolWorker *worker);
+
+void
+mono_threadpool_worker_enqueue (MonoThreadPoolWorker *worker, MonoThreadPoolWorkerCallback callback, gpointer data);
+
+gboolean
+mono_threadpool_worker_notify_completed (MonoThreadPoolWorker *worker);
+
+gint32
+mono_threadpool_worker_get_min (MonoThreadPoolWorker *worker);
+gboolean
+mono_threadpool_worker_set_min (MonoThreadPoolWorker *worker, gint32 value);
+
+gint32
+mono_threadpool_worker_get_max (MonoThreadPoolWorker *worker);
+gboolean
+mono_threadpool_worker_set_max (MonoThreadPoolWorker *worker, gint32 value);
+
+void
+mono_threadpool_worker_set_suspended (MonoThreadPoolWorker *worker, gboolean suspended);
+
+#endif /* _MONO_METADATA_THREADPOOL_WORKER_H */
--- /dev/null
+/*
+ * threadpool.c: Microsoft threadpool runtime support
+ *
+ * Author:
+ * Ludovic Henry (ludovic.henry@xamarin.com)
+ *
+ * Copyright 2015 Xamarin, Inc (http://www.xamarin.com)
+ * Licensed under the MIT license. See LICENSE file in the project root for full license information.
+ */
+
+//
+// Copyright (c) Microsoft. All rights reserved.
+// Licensed under the MIT license. See LICENSE file in the project root for full license information.
+//
+// Files:
+// - src/vm/comthreadpool.cpp
+// - src/vm/win32threadpoolcpp
+// - src/vm/threadpoolrequest.cpp
+// - src/vm/hillclimbing.cpp
+//
+// Ported from C++ to C and adjusted to Mono runtime
+
+#include <stdlib.h>
+#define _USE_MATH_DEFINES // needed by MSVC to define math constants
+#include <math.h>
+#include <config.h>
+#include <glib.h>
+
+#include <mono/metadata/class-internals.h>
+#include <mono/metadata/exception.h>
+#include <mono/metadata/gc-internals.h>
+#include <mono/metadata/object.h>
+#include <mono/metadata/object-internals.h>
+#include <mono/metadata/threadpool.h>
+#include <mono/metadata/threadpool-worker.h>
+#include <mono/metadata/threadpool-io.h>
+#include <mono/metadata/w32event.h>
+#include <mono/utils/atomic.h>
+#include <mono/utils/mono-compiler.h>
+#include <mono/utils/mono-complex.h>
+#include <mono/utils/mono-lazy-init.h>
+#include <mono/utils/mono-logger.h>
+#include <mono/utils/mono-logger-internals.h>
+#include <mono/utils/mono-proclib.h>
+#include <mono/utils/mono-threads.h>
+#include <mono/utils/mono-time.h>
+#include <mono/utils/refcount.h>
+
+typedef struct {
+ MonoDomain *domain;
+ /* Number of outstanding jobs */
+ gint32 outstanding_request;
+ /* Number of currently executing jobs */
+ gint32 threadpool_jobs;
+ /* Signalled when threadpool_jobs + outstanding_request is 0 */
+ /* Protected by threadpool->domains_lock */
+ MonoCoopCond cleanup_cond;
+} ThreadPoolDomain;
+
+typedef union {
+ struct {
+ gint16 starting; /* starting, but not yet in worker_callback */
+ gint16 working; /* executing worker_callback */
+ } _;
+ gint32 as_gint32;
+} ThreadPoolCounter;
+
+typedef struct {
+ MonoRefCount ref;
+
+ GPtrArray *domains; // ThreadPoolDomain* []
+ MonoCoopMutex domains_lock;
+
+ GPtrArray *threads; // MonoInternalThread* []
+ MonoCoopMutex threads_lock;
+ MonoCoopCond threads_exit_cond;
+
+ ThreadPoolCounter counters;
+
+ gint32 limit_io_min;
+ gint32 limit_io_max;
+
+ MonoThreadPoolWorker *worker;
+} ThreadPool;
+
+static mono_lazy_init_t status = MONO_LAZY_INIT_STATUS_NOT_INITIALIZED;
+
+static ThreadPool* threadpool;
+
+#define COUNTER_CHECK(counter) \
+ do { \
+ g_assert (sizeof (ThreadPoolCounter) == sizeof (gint32)); \
+ g_assert (counter._.starting >= 0); \
+ g_assert (counter._.working >= 0); \
+ } while (0)
+
+#define COUNTER_ATOMIC(threadpool,var,block) \
+ do { \
+ ThreadPoolCounter __old; \
+ do { \
+ g_assert (threadpool); \
+ __old = COUNTER_READ (threadpool); \
+ (var) = __old; \
+ { block; } \
+ COUNTER_CHECK (var); \
+ } while (InterlockedCompareExchange (&threadpool->counters.as_gint32, (var).as_gint32, __old.as_gint32) != __old.as_gint32); \
+ } while (0)
+
+static inline ThreadPoolCounter
+COUNTER_READ (ThreadPool *threadpool)
+{
+ ThreadPoolCounter counter;
+ counter.as_gint32 = InterlockedRead (&threadpool->counters.as_gint32);
+ return counter;
+}
+
+static inline void
+domains_lock (void)
+{
+ mono_coop_mutex_lock (&threadpool->domains_lock);
+}
+
+static inline void
+domains_unlock (void)
+{
+ mono_coop_mutex_unlock (&threadpool->domains_lock);
+}
+
+static void
+destroy (gpointer unused)
+{
+ g_ptr_array_free (threadpool->domains, TRUE);
+ mono_coop_mutex_destroy (&threadpool->domains_lock);
+
+ g_ptr_array_free (threadpool->threads, TRUE);
+ mono_coop_mutex_destroy (&threadpool->threads_lock);
+ mono_coop_cond_destroy (&threadpool->threads_exit_cond);
+
+ g_free (threadpool);
+}
+
+static void
+initialize (void)
+{
+ g_assert (!threadpool);
+ threadpool = g_new0 (ThreadPool, 1);
+ g_assert (threadpool);
+
+ mono_refcount_init (threadpool, destroy);
+
+ threadpool->domains = g_ptr_array_new ();
+ mono_coop_mutex_init (&threadpool->domains_lock);
+
+ threadpool->threads = g_ptr_array_new ();
+ mono_coop_mutex_init (&threadpool->threads_lock);
+ mono_coop_cond_init (&threadpool->threads_exit_cond);
+
+ threadpool->limit_io_min = mono_cpu_count ();
+ threadpool->limit_io_max = CLAMP (threadpool->limit_io_min * 100, MIN (threadpool->limit_io_min, 200), MAX (threadpool->limit_io_min, 200));
+
+ mono_threadpool_worker_init (&threadpool->worker);
+}
+
+static void
+cleanup (void)
+{
+ guint i;
+ MonoInternalThread *current;
+
+ /* we make the assumption along the code that we are
+ * cleaning up only if the runtime is shutting down */
+ g_assert (mono_runtime_is_shutting_down ());
+
+ current = mono_thread_internal_current ();
+
+ mono_coop_mutex_lock (&threadpool->threads_lock);
+
+ /* stop all threadpool->threads */
+ for (i = 0; i < threadpool->threads->len; ++i) {
+ MonoInternalThread *thread = (MonoInternalThread*) g_ptr_array_index (threadpool->threads, i);
+ if (thread != current)
+ mono_thread_internal_abort (thread);
+ }
+
+ mono_coop_mutex_unlock (&threadpool->threads_lock);
+
+ /* give a chance to the other threads to exit */
+ mono_thread_info_yield ();
+
+ mono_coop_mutex_lock (&threadpool->threads_lock);
+
+ for (;;) {
+ ThreadPoolCounter counter;
+
+ counter = COUNTER_READ (threadpool);
+ if (counter._.working == 0)
+ break;
+
+ if (counter._.working == 1) {
+ if (threadpool->threads->len == 1 && g_ptr_array_index (threadpool->threads, 0) == current) {
+ /* We are waiting on ourselves */
+ break;
+ }
+ }
+
+ mono_coop_cond_wait (&threadpool->threads_exit_cond, &threadpool->threads_lock);
+ }
+
+ mono_coop_mutex_unlock (&threadpool->threads_lock);
+
+ mono_threadpool_worker_cleanup (threadpool->worker);
+
+ mono_refcount_dec (threadpool);
+}
+
+gboolean
+mono_threadpool_enqueue_work_item (MonoDomain *domain, MonoObject *work_item, MonoError *error)
+{
+ static MonoClass *threadpool_class = NULL;
+ static MonoMethod *unsafe_queue_custom_work_item_method = NULL;
+ MonoDomain *current_domain;
+ MonoBoolean f;
+ gpointer args [2];
+
+ mono_error_init (error);
+ g_assert (work_item);
+
+ if (!threadpool_class)
+ threadpool_class = mono_class_load_from_name (mono_defaults.corlib, "System.Threading", "ThreadPool");
+
+ if (!unsafe_queue_custom_work_item_method)
+ unsafe_queue_custom_work_item_method = mono_class_get_method_from_name (threadpool_class, "UnsafeQueueCustomWorkItem", 2);
+ g_assert (unsafe_queue_custom_work_item_method);
+
+ f = FALSE;
+
+ args [0] = (gpointer) work_item;
+ args [1] = (gpointer) &f;
+
+ current_domain = mono_domain_get ();
+ if (current_domain == domain) {
+ mono_runtime_invoke_checked (unsafe_queue_custom_work_item_method, NULL, args, error);
+ return_val_if_nok (error, FALSE);
+ } else {
+ mono_thread_push_appdomain_ref (domain);
+ if (mono_domain_set (domain, FALSE)) {
+ mono_runtime_invoke_checked (unsafe_queue_custom_work_item_method, NULL, args, error);
+ if (!is_ok (error)) {
+ mono_thread_pop_appdomain_ref ();
+ return FALSE;
+ }
+ mono_domain_set (current_domain, TRUE);
+ }
+ mono_thread_pop_appdomain_ref ();
+ }
+ return TRUE;
+}
+
+/* LOCKING: domains_lock must be held */
+static void
+tpdomain_add (ThreadPoolDomain *tpdomain)
+{
+ guint i, len;
+
+ g_assert (tpdomain);
+
+ len = threadpool->domains->len;
+ for (i = 0; i < len; ++i) {
+ if (g_ptr_array_index (threadpool->domains, i) == tpdomain)
+ break;
+ }
+
+ if (i == len)
+ g_ptr_array_add (threadpool->domains, tpdomain);
+}
+
+/* LOCKING: domains_lock must be held. */
+static gboolean
+tpdomain_remove (ThreadPoolDomain *tpdomain)
+{
+ g_assert (tpdomain);
+ return g_ptr_array_remove (threadpool->domains, tpdomain);
+}
+
+/* LOCKING: domains_lock must be held */
+static ThreadPoolDomain *
+tpdomain_get (MonoDomain *domain, gboolean create)
+{
+ guint i;
+ ThreadPoolDomain *tpdomain;
+
+ g_assert (domain);
+
+ for (i = 0; i < threadpool->domains->len; ++i) {
+ ThreadPoolDomain *tpdomain;
+
+ tpdomain = (ThreadPoolDomain *)g_ptr_array_index (threadpool->domains, i);
+ if (tpdomain->domain == domain)
+ return tpdomain;
+ }
+
+ if (!create)
+ return NULL;
+
+ tpdomain = g_new0 (ThreadPoolDomain, 1);
+ tpdomain->domain = domain;
+ mono_coop_cond_init (&tpdomain->cleanup_cond);
+
+ tpdomain_add (tpdomain);
+
+ return tpdomain;
+}
+
+static void
+tpdomain_free (ThreadPoolDomain *tpdomain)
+{
+ g_free (tpdomain);
+}
+
+/* LOCKING: domains_lock must be held */
+static ThreadPoolDomain *
+tpdomain_get_next (ThreadPoolDomain *current)
+{
+ ThreadPoolDomain *tpdomain = NULL;
+ guint len;
+
+ len = threadpool->domains->len;
+ if (len > 0) {
+ guint i, current_idx = -1;
+ if (current) {
+ for (i = 0; i < len; ++i) {
+ if (current == g_ptr_array_index (threadpool->domains, i)) {
+ current_idx = i;
+ break;
+ }
+ }
+ g_assert (current_idx != (guint)-1);
+ }
+ for (i = current_idx + 1; i < len + current_idx + 1; ++i) {
+ ThreadPoolDomain *tmp = (ThreadPoolDomain *)g_ptr_array_index (threadpool->domains, i % len);
+ if (tmp->outstanding_request > 0) {
+ tpdomain = tmp;
+ break;
+ }
+ }
+ }
+
+ return tpdomain;
+}
+
+static void
+worker_callback (gpointer unused)
+{
+ MonoError error;
+ ThreadPoolDomain *tpdomain, *previous_tpdomain;
+ ThreadPoolCounter counter;
+ MonoInternalThread *thread;
+
+ thread = mono_thread_internal_current ();
+
+ COUNTER_ATOMIC (threadpool, counter, {
+ counter._.starting --;
+ counter._.working ++;
+ });
+
+ if (mono_runtime_is_shutting_down ()) {
+ COUNTER_ATOMIC (threadpool, counter, {
+ counter._.working --;
+ });
+
+ mono_refcount_dec (threadpool);
+ return;
+ }
+
+ mono_coop_mutex_lock (&threadpool->threads_lock);
+ g_ptr_array_add (threadpool->threads, thread);
+ mono_coop_mutex_unlock (&threadpool->threads_lock);
+
+ /*
+ * This is needed so there is always an lmf frame in the runtime invoke call below,
+ * so ThreadAbortExceptions are caught even if the thread is in native code.
+ */
+ mono_defaults.threadpool_perform_wait_callback_method->save_lmf = TRUE;
+
+ domains_lock ();
+
+ previous_tpdomain = NULL;
+
+ while (!mono_runtime_is_shutting_down ()) {
+ gboolean retire = FALSE;
+
+ if ((thread->state & (ThreadState_AbortRequested | ThreadState_SuspendRequested)) != 0) {
+ domains_unlock ();
+ mono_thread_interruption_checkpoint ();
+ domains_lock ();
+ }
+
+ tpdomain = tpdomain_get_next (previous_tpdomain);
+ if (!tpdomain)
+ break;
+
+ tpdomain->outstanding_request --;
+ g_assert (tpdomain->outstanding_request >= 0);
+
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] worker running in domain %p (outstanding requests %d)",
+ mono_native_thread_id_get (), tpdomain->domain, tpdomain->outstanding_request);
+
+ g_assert (tpdomain->threadpool_jobs >= 0);
+ tpdomain->threadpool_jobs ++;
+
+ domains_unlock ();
+
+ mono_thread_clr_state (thread, (MonoThreadState)~ThreadState_Background);
+ if (!mono_thread_test_state (thread , ThreadState_Background))
+ ves_icall_System_Threading_Thread_SetState (thread, ThreadState_Background);
+
+ mono_thread_push_appdomain_ref (tpdomain->domain);
+ if (mono_domain_set (tpdomain->domain, FALSE)) {
+ MonoObject *exc = NULL, *res;
+
+ res = mono_runtime_try_invoke (mono_defaults.threadpool_perform_wait_callback_method, NULL, NULL, &exc, &error);
+ if (exc || !mono_error_ok(&error)) {
+ if (exc == NULL)
+ exc = (MonoObject *) mono_error_convert_to_exception (&error);
+ else
+ mono_error_cleanup (&error);
+ mono_thread_internal_unhandled_exception (exc);
+ } else if (res && *(MonoBoolean*) mono_object_unbox (res) == FALSE) {
+ retire = TRUE;
+ }
+
+ mono_domain_set (mono_get_root_domain (), TRUE);
+ }
+ mono_thread_pop_appdomain_ref ();
+
+ domains_lock ();
+
+ tpdomain->threadpool_jobs --;
+ g_assert (tpdomain->threadpool_jobs >= 0);
+
+ if (tpdomain->outstanding_request + tpdomain->threadpool_jobs == 0 && mono_domain_is_unloading (tpdomain->domain)) {
+ gboolean removed;
+
+ removed = tpdomain_remove (tpdomain);
+ g_assert (removed);
+
+ mono_coop_cond_signal (&tpdomain->cleanup_cond);
+ tpdomain = NULL;
+ }
+
+ if (retire)
+ break;
+
+ previous_tpdomain = tpdomain;
+ }
+
+ domains_unlock ();
+
+ mono_coop_mutex_lock (&threadpool->threads_lock);
+
+ COUNTER_ATOMIC (threadpool, counter, {
+ counter._.working --;
+ });
+
+ g_ptr_array_remove_fast (threadpool->threads, thread);
+
+ mono_coop_cond_signal (&threadpool->threads_exit_cond);
+
+ mono_coop_mutex_unlock (&threadpool->threads_lock);
+
+ mono_refcount_dec (threadpool);
+}
+
+void
+mono_threadpool_cleanup (void)
+{
+#ifndef DISABLE_SOCKETS
+ mono_threadpool_io_cleanup ();
+#endif
+ mono_lazy_cleanup (&status, cleanup);
+}
+
+MonoAsyncResult *
+mono_threadpool_begin_invoke (MonoDomain *domain, MonoObject *target, MonoMethod *method, gpointer *params, MonoError *error)
+{
+ static MonoClass *async_call_klass = NULL;
+ MonoMethodMessage *message;
+ MonoAsyncResult *async_result;
+ MonoAsyncCall *async_call;
+ MonoDelegate *async_callback = NULL;
+ MonoObject *state = NULL;
+
+ if (!async_call_klass)
+ async_call_klass = mono_class_load_from_name (mono_defaults.corlib, "System", "MonoAsyncCall");
+
+ mono_lazy_initialize (&status, initialize);
+
+ mono_error_init (error);
+
+ message = mono_method_call_message_new (method, params, mono_get_delegate_invoke (method->klass), (params != NULL) ? (&async_callback) : NULL, (params != NULL) ? (&state) : NULL, error);
+ return_val_if_nok (error, NULL);
+
+ async_call = (MonoAsyncCall*) mono_object_new_checked (domain, async_call_klass, error);
+ return_val_if_nok (error, NULL);
+
+ MONO_OBJECT_SETREF (async_call, msg, message);
+ MONO_OBJECT_SETREF (async_call, state, state);
+
+ if (async_callback) {
+ MONO_OBJECT_SETREF (async_call, cb_method, mono_get_delegate_invoke (((MonoObject*) async_callback)->vtable->klass));
+ MONO_OBJECT_SETREF (async_call, cb_target, async_callback);
+ }
+
+ async_result = mono_async_result_new (domain, NULL, async_call->state, NULL, (MonoObject*) async_call, error);
+ return_val_if_nok (error, NULL);
+ MONO_OBJECT_SETREF (async_result, async_delegate, target);
+
+ mono_threadpool_enqueue_work_item (domain, (MonoObject*) async_result, error);
+ return_val_if_nok (error, NULL);
+
+ return async_result;
+}
+
+MonoObject *
+mono_threadpool_end_invoke (MonoAsyncResult *ares, MonoArray **out_args, MonoObject **exc, MonoError *error)
+{
+ MonoAsyncCall *ac;
+
+ mono_error_init (error);
+ g_assert (exc);
+ g_assert (out_args);
+
+ *exc = NULL;
+ *out_args = NULL;
+
+ /* check if already finished */
+ mono_monitor_enter ((MonoObject*) ares);
+
+ if (ares->endinvoke_called) {
+ mono_error_set_invalid_operation(error, "Delegate EndInvoke method called more than once");
+ mono_monitor_exit ((MonoObject*) ares);
+ return NULL;
+ }
+
+ ares->endinvoke_called = 1;
+
+ /* wait until we are really finished */
+ if (ares->completed) {
+ mono_monitor_exit ((MonoObject *) ares);
+ } else {
+ gpointer wait_event;
+ if (ares->handle) {
+ wait_event = mono_wait_handle_get_handle ((MonoWaitHandle*) ares->handle);
+ } else {
+ wait_event = mono_w32event_create (TRUE, FALSE);
+ g_assert(wait_event);
+ MonoWaitHandle *wait_handle = mono_wait_handle_new (mono_object_domain (ares), wait_event, error);
+ if (!is_ok (error)) {
+ CloseHandle (wait_event);
+ return NULL;
+ }
+ MONO_OBJECT_SETREF (ares, handle, (MonoObject*) wait_handle);
+ }
+ mono_monitor_exit ((MonoObject*) ares);
+ MONO_ENTER_GC_SAFE;
+#ifdef HOST_WIN32
+ WaitForSingleObjectEx (wait_event, INFINITE, TRUE);
+#else
+ mono_w32handle_wait_one (wait_event, MONO_INFINITE_WAIT, TRUE);
+#endif
+ MONO_EXIT_GC_SAFE;
+ }
+
+ ac = (MonoAsyncCall*) ares->object_data;
+ g_assert (ac);
+
+ *exc = ac->msg->exc; /* FIXME: GC add write barrier */
+ *out_args = ac->out_args;
+ return ac->res;
+}
+
+gboolean
+mono_threadpool_remove_domain_jobs (MonoDomain *domain, int timeout)
+{
+ gint64 end;
+ ThreadPoolDomain *tpdomain;
+ gboolean ret;
+
+ g_assert (domain);
+ g_assert (timeout >= -1);
+
+ g_assert (mono_domain_is_unloading (domain));
+
+ if (timeout != -1)
+ end = mono_msec_ticks () + timeout;
+
+#ifndef DISABLE_SOCKETS
+ mono_threadpool_io_remove_domain_jobs (domain);
+ if (timeout != -1) {
+ if (mono_msec_ticks () > end)
+ return FALSE;
+ }
+#endif
+
+ /*
+ * Wait for all threads which execute jobs in the domain to exit.
+ * The is_unloading () check in worker_request () ensures that
+ * no new jobs are added after we enter the lock below.
+ */
+ mono_lazy_initialize (&status, initialize);
+ domains_lock ();
+
+ tpdomain = tpdomain_get (domain, FALSE);
+ if (!tpdomain) {
+ domains_unlock ();
+ return TRUE;
+ }
+
+ ret = TRUE;
+
+ while (tpdomain->outstanding_request + tpdomain->threadpool_jobs > 0) {
+ if (timeout == -1) {
+ mono_coop_cond_wait (&tpdomain->cleanup_cond, &threadpool->domains_lock);
+ } else {
+ gint64 now;
+ gint res;
+
+ now = mono_msec_ticks();
+ if (now > end) {
+ ret = FALSE;
+ break;
+ }
+
+ res = mono_coop_cond_timedwait (&tpdomain->cleanup_cond, &threadpool->domains_lock, end - now);
+ if (res != 0) {
+ ret = FALSE;
+ break;
+ }
+ }
+ }
+
+ /* Remove from the list the worker threads look at */
+ tpdomain_remove (tpdomain);
+
+ domains_unlock ();
+
+ mono_coop_cond_destroy (&tpdomain->cleanup_cond);
+ tpdomain_free (tpdomain);
+
+ return ret;
+}
+
+void
+mono_threadpool_suspend (void)
+{
+ if (threadpool)
+ mono_threadpool_worker_set_suspended (threadpool->worker, TRUE);
+}
+
+void
+mono_threadpool_resume (void)
+{
+ if (threadpool)
+ mono_threadpool_worker_set_suspended (threadpool->worker, FALSE);
+}
+
+void
+ves_icall_System_Threading_ThreadPool_GetAvailableThreadsNative (gint32 *worker_threads, gint32 *completion_port_threads)
+{
+ ThreadPoolCounter counter;
+
+ if (!worker_threads || !completion_port_threads)
+ return;
+
+ mono_lazy_initialize (&status, initialize);
+
+ counter = COUNTER_READ (threadpool);
+
+ *worker_threads = MAX (0, mono_threadpool_worker_get_max (threadpool->worker) - counter._.working);
+ *completion_port_threads = threadpool->limit_io_max;
+}
+
+void
+ves_icall_System_Threading_ThreadPool_GetMinThreadsNative (gint32 *worker_threads, gint32 *completion_port_threads)
+{
+ if (!worker_threads || !completion_port_threads)
+ return;
+
+ mono_lazy_initialize (&status, initialize);
+
+ *worker_threads = mono_threadpool_worker_get_min (threadpool->worker);
+ *completion_port_threads = threadpool->limit_io_min;
+}
+
+void
+ves_icall_System_Threading_ThreadPool_GetMaxThreadsNative (gint32 *worker_threads, gint32 *completion_port_threads)
+{
+ if (!worker_threads || !completion_port_threads)
+ return;
+
+ mono_lazy_initialize (&status, initialize);
+
+ *worker_threads = mono_threadpool_worker_get_max (threadpool->worker);
+ *completion_port_threads = threadpool->limit_io_max;
+}
+
+MonoBoolean
+ves_icall_System_Threading_ThreadPool_SetMinThreadsNative (gint32 worker_threads, gint32 completion_port_threads)
+{
+ mono_lazy_initialize (&status, initialize);
+
+ if (completion_port_threads <= 0 || completion_port_threads > threadpool->limit_io_max)
+ return FALSE;
+
+ if (!mono_threadpool_worker_set_min (threadpool->worker, worker_threads))
+ return FALSE;
+
+ threadpool->limit_io_min = completion_port_threads;
+
+ return TRUE;
+}
+
+MonoBoolean
+ves_icall_System_Threading_ThreadPool_SetMaxThreadsNative (gint32 worker_threads, gint32 completion_port_threads)
+{
+ gint cpu_count = mono_cpu_count ();
+
+ mono_lazy_initialize (&status, initialize);
+
+ if (completion_port_threads < threadpool->limit_io_min || completion_port_threads < cpu_count)
+ return FALSE;
+
+ if (!mono_threadpool_worker_set_max (threadpool->worker, worker_threads))
+ return FALSE;
+
+ threadpool->limit_io_max = completion_port_threads;
+
+ return TRUE;
+}
+
+void
+ves_icall_System_Threading_ThreadPool_InitializeVMTp (MonoBoolean *enable_worker_tracking)
+{
+ if (enable_worker_tracking) {
+ // TODO implement some kind of switch to have the possibily to use it
+ *enable_worker_tracking = FALSE;
+ }
+
+ mono_lazy_initialize (&status, initialize);
+}
+
+MonoBoolean
+ves_icall_System_Threading_ThreadPool_NotifyWorkItemComplete (void)
+{
+ if (mono_domain_is_unloading (mono_domain_get ()) || mono_runtime_is_shutting_down ())
+ return FALSE;
+
+ return mono_threadpool_worker_notify_completed (threadpool->worker);
+}
+
+void
+ves_icall_System_Threading_ThreadPool_NotifyWorkItemProgressNative (void)
+{
+ mono_threadpool_worker_notify_completed (threadpool->worker);
+}
+
+void
+ves_icall_System_Threading_ThreadPool_ReportThreadStatus (MonoBoolean is_working)
+{
+ // TODO
+ MonoError error;
+ mono_error_set_not_implemented (&error, "");
+ mono_error_set_pending_exception (&error);
+}
+
+MonoBoolean
+ves_icall_System_Threading_ThreadPool_RequestWorkerThread (void)
+{
+ MonoDomain *domain;
+ ThreadPoolDomain *tpdomain;
+ ThreadPoolCounter counter;
+
+ domain = mono_domain_get ();
+ if (mono_domain_is_unloading (domain))
+ return FALSE;
+
+ domains_lock ();
+
+ /* synchronize with mono_threadpool_remove_domain_jobs */
+ if (mono_domain_is_unloading (domain)) {
+ domains_unlock ();
+ return FALSE;
+ }
+
+ tpdomain = tpdomain_get (domain, TRUE);
+ g_assert (tpdomain);
+
+ tpdomain->outstanding_request ++;
+ g_assert (tpdomain->outstanding_request >= 1);
+
+ mono_refcount_inc (threadpool);
+
+ COUNTER_ATOMIC (threadpool, counter, {
+ counter._.starting ++;
+ });
+
+ mono_threadpool_worker_enqueue (threadpool->worker, worker_callback, NULL);
+
+ domains_unlock ();
+
+ return TRUE;
+}
+
+MonoBoolean G_GNUC_UNUSED
+ves_icall_System_Threading_ThreadPool_PostQueuedCompletionStatus (MonoNativeOverlapped *native_overlapped)
+{
+ /* This copy the behavior of the current Mono implementation */
+ MonoError error;
+ mono_error_set_not_implemented (&error, "");
+ mono_error_set_pending_exception (&error);
+ return FALSE;
+}
+
+MonoBoolean G_GNUC_UNUSED
+ves_icall_System_Threading_ThreadPool_BindIOCompletionCallbackNative (gpointer file_handle)
+{
+ /* This copy the behavior of the current Mono implementation */
+ return TRUE;
+}
+
+MonoBoolean G_GNUC_UNUSED
+ves_icall_System_Threading_ThreadPool_IsThreadPoolHosted (void)
+{
+ return FALSE;
+}
--- /dev/null
+#ifndef _MONO_METADATA_THREADPOOL_H_
+#define _MONO_METADATA_THREADPOOL_H_
+
+#include <config.h>
+#include <glib.h>
+
+#include <mono/metadata/exception.h>
+#include <mono/metadata/object-internals.h>
+
+#define SMALL_STACK (sizeof (gpointer) * 32 * 1024)
+
+typedef struct _MonoNativeOverlapped MonoNativeOverlapped;
+
+void
+mono_threadpool_cleanup (void);
+
+MonoAsyncResult *
+mono_threadpool_begin_invoke (MonoDomain *domain, MonoObject *target, MonoMethod *method, gpointer *params, MonoError *error);
+MonoObject *
+mono_threadpool_end_invoke (MonoAsyncResult *ares, MonoArray **out_args, MonoObject **exc, MonoError *error);
+
+gboolean
+mono_threadpool_remove_domain_jobs (MonoDomain *domain, int timeout);
+
+void
+mono_threadpool_suspend (void);
+void
+mono_threadpool_resume (void);
+
+void
+ves_icall_System_Threading_ThreadPool_GetAvailableThreadsNative (gint32 *worker_threads, gint32 *completion_port_threads);
+void
+ves_icall_System_Threading_ThreadPool_GetMinThreadsNative (gint32 *worker_threads, gint32 *completion_port_threads);
+void
+ves_icall_System_Threading_ThreadPool_GetMaxThreadsNative (gint32 *worker_threads, gint32 *completion_port_threads);
+MonoBoolean
+ves_icall_System_Threading_ThreadPool_SetMinThreadsNative (gint32 worker_threads, gint32 completion_port_threads);
+MonoBoolean
+ves_icall_System_Threading_ThreadPool_SetMaxThreadsNative (gint32 worker_threads, gint32 completion_port_threads);
+void
+ves_icall_System_Threading_ThreadPool_InitializeVMTp (MonoBoolean *enable_worker_tracking);
+MonoBoolean
+ves_icall_System_Threading_ThreadPool_NotifyWorkItemComplete (void);
+void
+ves_icall_System_Threading_ThreadPool_NotifyWorkItemProgressNative (void);
+void
+ves_icall_System_Threading_ThreadPool_ReportThreadStatus (MonoBoolean is_working);
+MonoBoolean
+ves_icall_System_Threading_ThreadPool_RequestWorkerThread (void);
+
+MonoBoolean
+ves_icall_System_Threading_ThreadPool_PostQueuedCompletionStatus (MonoNativeOverlapped *native_overlapped);
+
+MonoBoolean
+ves_icall_System_Threading_ThreadPool_BindIOCompletionCallbackNative (gpointer file_handle);
+
+MonoBoolean
+ves_icall_System_Threading_ThreadPool_IsThreadPoolHosted (void);
+
+/* Internals */
+
+gboolean
+mono_threadpool_enqueue_work_item (MonoDomain *domain, MonoObject *work_item, MonoError *error);
+
+#endif // _MONO_METADATA_THREADPOOL_H_
#include <mono/metadata/image.h>
#include <mono/metadata/cil-coff.h>
#include <mono/metadata/exception.h>
-#include <mono/metadata/threadpool-ms-io.h>
+#include <mono/metadata/threadpool-io.h>
#include <mono/utils/strenc.h>
#include <mono/utils/mono-proclib.h>
#include <mono/io-layer/io-layer.h>
#include <mono/metadata/gc-internals.h>
#include <mono/metadata/environment.h>
#include <mono/metadata/threads-types.h>
-#include <mono/metadata/threadpool-ms.h>
+#include <mono/metadata/threadpool.h>
#include <mono/metadata/socket-io.h>
#include <mono/metadata/assembly.h>
#include <mono/metadata/runtime.h>
/*
* Suspend creation of new threadpool threads, since they cannot run
*/
- mono_threadpool_ms_suspend ();
+ mono_threadpool_suspend ();
mono_loader_unlock ();
}
//g_assert (err == 0);
if (suspend_count == 0)
- mono_threadpool_ms_resume ();
+ mono_threadpool_resume ();
mono_loader_unlock ();
}
/*
* These functions should be used if you want some form of lazy initialization. You can have a look at the
- * threadpool-ms for a more detailed example.
+ * threadpool for a more detailed example.
*
* The idea is that a module can be in 5 different states:
* - not initialized: it is the first state it starts in
<ClCompile Include="..\mono\metadata\mono-security.c" />\r
<ClCompile Include="..\mono\metadata\seq-points-data.c" />\r
<ClCompile Include="..\mono\metadata\sgen-mono.c" />\r
- <ClCompile Include="..\mono\metadata\threadpool-ms-io.c" />\r
- <ClCompile Include="..\mono\metadata\threadpool-ms.c" />\r
+ <ClCompile Include="..\mono\metadata\threadpool-io.c" />\r
+ <ClCompile Include="..\mono\metadata\threadpool.c" />\r
+ <ClCompile Include="..\mono\metadata\threadpool-worker-default.c" />\r
<ClCompile Include="..\mono\metadata\sgen-bridge.c" />\r
<ClCompile Include="..\mono\metadata\sgen-new-bridge.c" />\r
<ClCompile Include="..\mono\metadata\sgen-old-bridge.c" />\r
<ClInclude Include="..\mono\metadata\sgen-bridge-internals.h" />\r
<ClInclude Include="..\mono\metadata\sgen-client-mono.h" />\r
<ClInclude Include="..\mono\metadata\socket-io-windows-internals.h" />\r
- <ClInclude Include="..\mono\metadata\threadpool-ms-io.h" />\r
- <ClInclude Include="..\mono\metadata\threadpool-ms.h" />\r
+ <ClInclude Include="..\mono\metadata\threadpool-io.h" />\r
+ <ClInclude Include="..\mono\metadata\threadpool.h" />\r
+ <ClInclude Include="..\mono\metadata\threadpool-worker.h" />\r
<ClInclude Include="..\mono\sgen\gc-internal-agnostic.h" />\r
<ClInclude Include="..\mono\metadata\icall-def.h" />\r
<ClInclude Include="..\mono\metadata\image.h" />\r
<ClCompile Include="..\mono\metadata\sysmath.c">\r
<Filter>Source Files</Filter>\r
</ClCompile>\r
- <ClCompile Include="..\mono\metadata\threadpool-ms.c">\r
+ <ClCompile Include="..\mono\metadata\threadpool.c">\r
<Filter>Source Files</Filter>\r
</ClCompile>\r
- <ClCompile Include="..\mono\metadata\threadpool-ms-io.c">\r
+ <ClCompile Include="..\mono\metadata\threadpool-worker-default.c">\r
+ <Filter>Source Files</Filter>\r
+ </ClCompile>\r
+ <ClCompile Include="..\mono\metadata\threadpool-io.c">\r
<Filter>Source Files</Filter>\r
</ClCompile>\r
<ClCompile Include="..\mono\metadata\threads.c">\r
<ClInclude Include="..\mono\metadata\tabledefs.h">\r
<Filter>Header Files</Filter>\r
</ClInclude>\r
- <ClInclude Include="..\mono\metadata\threadpool-ms.h">\r
+ <ClInclude Include="..\mono\metadata\threadpool.h">\r
+ <Filter>Header Files</Filter>\r
+ </ClInclude>\r
+ <ClInclude Include="..\mono\metadata\threadpool-worker.h">\r
<Filter>Header Files</Filter>\r
</ClInclude>\r
<ClInclude Include="..\mono\metadata\wrapper-types.h">\r
<Filter>Header Files</Filter>\r
</ClInclude>\r
- <ClInclude Include="..\mono\metadata\threadpool-ms-io.h">\r
+ <ClInclude Include="..\mono\metadata\threadpool-io.h">\r
<Filter>Header Files</Filter>\r
</ClInclude>\r
<ClInclude Include="..\mono\metadata\threads-types.h">\r