* Gonzalo Paniagua Javier (gonzalo@ximian.com)
*
* Copyright 2001-2003 Ximian, Inc (http://www.ximian.com)
- * Copyright 2004-2009 Novell, Inc (http://www.novell.com)
+ * Copyright 2004-2010 Novell, Inc (http://www.novell.com)
*/
#include <config.h>
#include <glib.h>
-#define THREADS_PER_CPU 10 /* 20 + THREADS_PER_CPU * number of CPUs */
-#define THREAD_EXIT_TIMEOUT 1000
-
-#include <mono/metadata/domain-internals.h>
-#include <mono/metadata/tabledefs.h>
+#include <mono/metadata/profiler-private.h>
#include <mono/metadata/threads.h>
#include <mono/metadata/threads-types.h>
#include <mono/metadata/threadpool-internals.h>
#include <mono/metadata/exception.h>
-#include <mono/metadata/file-io.h>
-#include <mono/metadata/monitor.h>
#include <mono/metadata/mono-mlist.h>
-#include <mono/metadata/marshal.h>
+#include <mono/metadata/mono-perfcounters.h>
#include <mono/metadata/socket-io.h>
+#include <mono/metadata/mono-cq.h>
+#include <mono/metadata/mono-wsq.h>
#include <mono/io-layer/io-layer.h>
-#include <mono/metadata/gc-internal.h>
#include <mono/utils/mono-time.h>
#include <mono/utils/mono-proclib.h>
+#include <mono/utils/mono-semaphore.h>
#include <errno.h>
#ifdef HAVE_SYS_TIME_H
#include <sys/time.h>
#ifdef HAVE_EPOLL
#include <sys/epoll.h>
#endif
+#ifdef HAVE_KQUEUE
+#include <sys/event.h>
+#endif
+
#ifndef DISABLE_SOCKETS
#include "mono/io-layer/socket-wrappers.h"
#define THREAD_WANTS_A_BREAK(t) ((t->state & (ThreadState_StopRequested | \
ThreadState_SuspendRequested)) != 0)
-#undef EPOLL_DEBUG
+#define SPIN_TRYLOCK(i) (InterlockedCompareExchange (&(i), 1, 0) == 0)
+#define SPIN_LOCK(i) do { \
+ if (SPIN_TRYLOCK (i)) \
+ break; \
+ } while (1)
-/* maximum number of worker threads */
-static int mono_max_worker_threads;
-static int mono_min_worker_threads;
-static int mono_io_max_worker_threads;
-static int mono_io_min_worker_threads;
+#define SPIN_UNLOCK(i) i = 0
+#define SMALL_STACK (128 * (sizeof (gpointer) / 4) * 1024)
-/* current number of worker threads */
-static int mono_worker_threads = 0;
-static int io_worker_threads = 0;
-
-/* current number of busy threads */
-static int busy_worker_threads = 0;
-static int busy_io_worker_threads;
+/* DEBUG: prints tp data every 2s */
+#undef DEBUG
/* mono_thread_pool_init called */
-static int tp_inited;
-/* started idle threads */
-static int tp_idle_started;
-
-
-/* we use this to store a reference to the AsyncResult to avoid GC */
-static MonoGHashTable *ares_htable = NULL;
+static volatile int tp_inited;
-static CRITICAL_SECTION ares_lock;
-static CRITICAL_SECTION io_queue_lock;
-static int pending_io_items;
+enum {
+ POLL_BACKEND,
+ EPOLL_BACKEND,
+ KQUEUE_BACKEND
+};
typedef struct {
CRITICAL_SECTION io_lock; /* access to sock_to_state */
- int inited;
- int pipe [2];
+ int inited; // 0 -> not initialized , 1->initializing, 2->initialized, 3->cleaned up
MonoGHashTable *sock_to_state;
- HANDLE new_sem; /* access to newpfd and write side of the pipe */
- mono_pollfd *newpfd;
- gboolean epoll_disabled;
-#ifdef HAVE_EPOLL
- int epollfd;
-#endif
+ gint event_system;
+ gpointer event_data;
+ void (*modify) (gpointer event_data, int fd, int operation, int events, gboolean is_new);
+ void (*wait) (gpointer sock_data);
+ void (*shutdown) (gpointer event_data);
} SocketIOData;
static SocketIOData socket_io_data;
-/* we append a job */
-static HANDLE job_added;
-static HANDLE io_job_added;
-
/* Keep in sync with the System.MonoAsyncCall class which provides GC tracking */
typedef struct {
MonoObject object;
MonoObject *state;
MonoObject *res;
MonoArray *out_args;
- /* This is a HANDLE, we use guint64 so the managed object layout remains constant */
- guint64 wait_event;
} ASyncCall;
typedef struct {
- MonoArray *array;
- int first_elem;
- int next_elem;
-} TPQueue;
+ MonoSemType lock;
+ MonoCQ *queue; /* GC root */
+ MonoSemType new_job;
+ volatile gint waiting; /* threads waiting for a work item */
+
+ /**/
+ volatile gint pool_status; /* 0 -> not initialized, 1 -> initialized, 2 -> cleaning up */
+ /* min, max, n and busy -> Interlocked */
+ volatile gint min_threads;
+ volatile gint max_threads;
+ volatile gint nthreads;
+ volatile gint busy_threads;
+
+ void (*async_invoke) (gpointer data);
+ void *pc_nitems; /* Performance counter for total number of items in added */
+ void *pc_nthreads; /* Performance counter for total number of active threads */
+ /**/
+ volatile gint destroy_thread;
+ volatile gint ignore_times; /* Used when there's a thread being created or destroyed */
+ volatile gint sp_lock; /* spin lock used to protect ignore_times */
+ volatile gint64 last_check;
+ volatile gint64 time_sum;
+ volatile gint n_sum;
+ gint64 averages [2];
+ gboolean is_io;
+} ThreadPool;
+
+static ThreadPool async_tp;
+static ThreadPool async_io_tp;
static void async_invoke_thread (gpointer data);
-static void append_job (CRITICAL_SECTION *cs, TPQueue *list, MonoObject *ar);
-static void start_thread_or_queue (MonoAsyncResult *ares);
-static void start_tpthread (MonoAsyncResult *ares);
-static void mono_async_invoke (MonoAsyncResult *ares);
-static MonoObject* dequeue_job (CRITICAL_SECTION *cs, TPQueue *list);
-static void free_queue (TPQueue *list);
-
-static TPQueue async_call_queue = {NULL, 0, 0};
-static TPQueue async_io_queue = {NULL, 0, 0};
+static MonoObject *mono_async_invoke (ThreadPool *tp, MonoAsyncResult *ares);
+static void threadpool_free_queue (ThreadPool *tp);
+static void threadpool_append_job (ThreadPool *tp, MonoObject *ar);
+static void threadpool_append_jobs (ThreadPool *tp, MonoObject **jobs, gint njobs);
+static void threadpool_init (ThreadPool *tp, int min_threads, int max_threads, void (*async_invoke) (gpointer));
+static void threadpool_start_idle_threads (ThreadPool *tp);
+static void threadpool_kill_idle_threads (ThreadPool *tp);
+static gboolean threadpool_start_thread (ThreadPool *tp);
+static void monitor_thread (gpointer data);
+static void socket_io_cleanup (SocketIOData *data);
+static MonoObject *get_io_event (MonoMList **list, gint event);
+static int get_events_from_list (MonoMList *list);
+static int get_event_from_state (MonoSocketAsyncResult *state);
static MonoClass *async_call_klass;
static MonoClass *socket_async_call_klass;
static MonoClass *process_async_call_klass;
-#define INIT_POLLFD(a, b, c) {(a)->fd = b; (a)->events = c; (a)->revents = 0;}
+static GPtrArray *wsqs;
+CRITICAL_SECTION wsqs_lock;
+
+/* Hooks */
+static MonoThreadPoolFunc tp_start_func;
+static MonoThreadPoolFunc tp_finish_func;
+static gpointer tp_hooks_user_data;
+static MonoThreadPoolItemFunc tp_item_begin_func;
+static MonoThreadPoolItemFunc tp_item_end_func;
+static gpointer tp_item_user_data;
+
enum {
AIO_OP_FIRST,
AIO_OP_ACCEPT = 0,
AIO_OP_RECV_JUST_CALLBACK,
AIO_OP_SEND_JUST_CALLBACK,
AIO_OP_READPIPE,
+ AIO_OP_CONSOLE2,
+ AIO_OP_DISCONNECT,
+ AIO_OP_ACCEPTRECEIVE,
+ AIO_OP_RECEIVE_BUFFERS,
+ AIO_OP_SEND_BUFFERS,
AIO_OP_LAST
};
+#include <mono/metadata/tpool-poll.c>
+#ifdef HAVE_EPOLL
+#include <mono/metadata/tpool-epoll.c>
+#elif defined(HAVE_KQUEUE)
+#include <mono/metadata/tpool-kqueue.c>
+#endif
+/*
+ * Functions to check whenever a class is given system class. We need to cache things in MonoDomain since some of the
+ * assemblies can be unloaded.
+ */
+
+static gboolean
+is_system_type (MonoDomain *domain, MonoClass *klass)
+{
+ if (domain->system_image == NULL)
+ domain->system_image = mono_image_loaded ("System");
+
+ return klass->image == domain->system_image;
+}
+
+static gboolean
+is_corlib_type (MonoDomain *domain, MonoClass *klass)
+{
+ return klass->image == mono_defaults.corlib;
+}
+
+/*
+ * Note that we call it is_socket_type() where 'socket' refers to the image
+ * that contains the System.Net.Sockets.Socket type.
+ * For moonlight there is a System.Net.Sockets.Socket class in both System.dll and System.Net.dll.
+*/
+static gboolean
+is_socket_type (MonoDomain *domain, MonoClass *klass)
+{
+ static const char *version = NULL;
+ static gboolean moonlight;
+
+ if (is_system_type (domain, klass))
+ return TRUE;
+
+ /* If moonlight, check if the type is in System.Net.dll too */
+ if (version == NULL) {
+ version = mono_get_runtime_info ()->framework_version;
+ moonlight = !strcmp (version, "2.1");
+ }
+
+ if (!moonlight)
+ return FALSE;
+
+ if (domain->system_net_dll == NULL)
+ domain->system_net_dll = mono_image_loaded ("System.Net");
+
+ return klass->image == domain->system_net_dll;
+}
+
+#define check_type_cached(domain, ASSEMBLY, _class, _namespace, _name, loc) do { \
+ if (*loc) \
+ return *loc == _class; \
+ if (is_##ASSEMBLY##_type (domain, _class) && !strcmp (_name, _class->name) && !strcmp (_namespace, _class->name_space)) { \
+ *loc = _class; \
+ return TRUE; \
+ } \
+ return FALSE; \
+} while (0) \
+
+#define check_corlib_type_cached(domain, _class, _namespace, _name, loc) check_type_cached (domain, corlib, _class, _namespace, _name, loc)
+
+#define check_socket_type_cached(domain, _class, _namespace, _name, loc) check_type_cached (domain, socket, _class, _namespace, _name, loc)
+
+#define check_system_type_cached(domain, _class, _namespace, _name, loc) check_type_cached (domain, system, _class, _namespace, _name, loc)
+
+static gboolean
+is_corlib_asyncresult (MonoDomain *domain, MonoClass *klass)
+{
+ check_corlib_type_cached (domain, klass, "System.Runtime.Remoting.Messaging", "AsyncResult", &domain->corlib_asyncresult_class);
+}
+
+static gboolean
+is_socket (MonoDomain *domain, MonoClass *klass)
+{
+ check_socket_type_cached (domain, klass, "System.Net.Sockets", "Socket", &domain->socket_class);
+}
+
+static gboolean
+is_socketasyncresult (MonoDomain *domain, MonoClass *klass)
+{
+ return (klass->nested_in &&
+ is_socket (domain, klass->nested_in) &&
+ !strcmp (klass->name, "SocketAsyncResult"));
+}
+
+static gboolean
+is_socketasynccall (MonoDomain *domain, MonoClass *klass)
+{
+ return (klass->nested_in &&
+ is_socket (domain, klass->nested_in) &&
+ !strcmp (klass->name, "SocketAsyncCall"));
+}
+
+static gboolean
+is_appdomainunloaded_exception (MonoDomain *domain, MonoClass *klass)
+{
+ check_corlib_type_cached (domain, klass, "System", "AppDomainUnloadedException", &domain->ad_unloaded_ex_class);
+}
+
+static gboolean
+is_sd_process (MonoDomain *domain, MonoClass *klass)
+{
+ check_system_type_cached (domain, klass, "System.Diagnostics", "Process", &domain->process_class);
+}
+
+static gboolean
+is_sdp_asyncreadhandler (MonoDomain *domain, MonoClass *klass)
+{
+
+ return (klass->nested_in &&
+ is_sd_process (domain, klass->nested_in) &&
+ !strcmp (klass->name, "AsyncReadHandler"));
+}
+
+
#ifdef DISABLE_SOCKETS
+
#define socket_io_cleanup(x)
+
+static int
+get_event_from_state (MonoSocketAsyncResult *state)
+{
+ g_assert_not_reached ();
+ return -1;
+}
+
+static int
+get_events_from_list (MonoMList *list)
+{
+ return 0;
+}
+
#else
+
static void
socket_io_cleanup (SocketIOData *data)
{
- gint release;
-
- if (data->inited == 0)
- return;
-
EnterCriticalSection (&data->io_lock);
- data->inited = 0;
-#ifdef PLATFORM_WIN32
- closesocket (data->pipe [0]);
- closesocket (data->pipe [1]);
-#else
- close (data->pipe [0]);
- close (data->pipe [1]);
-#endif
- data->pipe [0] = -1;
- data->pipe [1] = -1;
- if (data->new_sem)
- CloseHandle (data->new_sem);
- data->new_sem = NULL;
- mono_g_hash_table_destroy (data->sock_to_state);
- data->sock_to_state = NULL;
- free_queue (&async_io_queue);
- release = (gint) InterlockedCompareExchange (&io_worker_threads, 0, -1);
- if (io_job_added)
- ReleaseSemaphore (io_job_added, release, NULL);
- g_free (data->newpfd);
- data->newpfd = NULL;
-#ifdef HAVE_EPOLL
- if (FALSE == data->epoll_disabled)
- close (data->epollfd);
-#endif
+ if (data->inited != 2) {
+ LeaveCriticalSection (&data->io_lock);
+ return;
+ }
+ data->inited = 3;
+ data->shutdown (data->event_data);
LeaveCriticalSection (&data->io_lock);
}
case AIO_OP_RECV_JUST_CALLBACK:
case AIO_OP_RECEIVEFROM:
case AIO_OP_READPIPE:
+ case AIO_OP_ACCEPTRECEIVE:
+ case AIO_OP_RECEIVE_BUFFERS:
return MONO_POLLIN;
case AIO_OP_SEND:
case AIO_OP_SEND_JUST_CALLBACK:
case AIO_OP_SENDTO:
case AIO_OP_CONNECT:
+ case AIO_OP_SEND_BUFFERS:
+ case AIO_OP_DISCONNECT:
return MONO_POLLOUT;
default: /* Should never happen */
- g_print ("get_event_from_state: unknown value in switch!!!\n");
+ g_message ("get_event_from_state: unknown value in switch!!!");
return 0;
}
}
#endif /* !DISABLE_SOCKETS */
-static void
-unregister_job (MonoAsyncResult *obj)
-{
- EnterCriticalSection (&ares_lock);
- mono_g_hash_table_remove (ares_htable, obj);
- LeaveCriticalSection (&ares_lock);
-}
-
static void
threadpool_jobs_inc (MonoObject *obj)
{
return FALSE;
}
-#ifndef DISABLE_SOCKETS
-static void
-async_invoke_io_thread (gpointer data)
-{
- MonoDomain *domain;
- MonoThread *thread;
- const gchar *version;
-
- thread = mono_thread_current ();
-
- version = mono_get_runtime_info ()->framework_version;
- for (;;) {
- MonoSocketAsyncResult *state;
- MonoAsyncResult *ar;
-
- state = (MonoSocketAsyncResult *) data;
- if (state) {
- InterlockedDecrement (&pending_io_items);
- ar = state->ares;
- switch (state->operation) {
- case AIO_OP_RECEIVE:
- state->total = ICALL_RECV (state);
- break;
- case AIO_OP_SEND:
- state->total = ICALL_SEND (state);
- break;
- }
-
- /* worker threads invokes methods in different domains,
- * so we need to set the right domain here */
- domain = ((MonoObject *)ar)->vtable->domain;
-
- g_assert (domain);
-
- if (domain->state == MONO_APPDOMAIN_UNLOADED || domain->state == MONO_APPDOMAIN_UNLOADING) {
- threadpool_jobs_dec ((MonoObject *)ar);
- unregister_job (ar);
- data = NULL;
- } else {
- mono_thread_push_appdomain_ref (domain);
- if (threadpool_jobs_dec ((MonoObject *)ar)) {
- unregister_job (ar);
- data = NULL;
- mono_thread_pop_appdomain_ref ();
- continue;
- }
- if (mono_domain_set (domain, FALSE)) {
- ASyncCall *ac;
-
- mono_async_invoke (ar);
- ac = (ASyncCall *) ar->object_data;
- /*
- if (ac->msg->exc != NULL)
- mono_unhandled_exception (ac->msg->exc);
- */
- mono_domain_set (mono_get_root_domain (), TRUE);
- }
- mono_thread_pop_appdomain_ref ();
- InterlockedDecrement (&busy_io_worker_threads);
- /* If the callee changes the background status, set it back to TRUE */
- if (*version != '1' && !mono_thread_test_state (thread , ThreadState_Background))
- ves_icall_System_Threading_Thread_SetState (thread, ThreadState_Background);
- }
- }
-
- data = dequeue_job (&io_queue_lock, &async_io_queue);
-
- if (!data) {
- guint32 wr;
- int timeout = THREAD_EXIT_TIMEOUT;
- guint32 start_time = mono_msec_ticks ();
-
- do {
- wr = WaitForSingleObjectEx (io_job_added, (guint32)timeout, TRUE);
- if (THREAD_WANTS_A_BREAK (thread))
- mono_thread_interruption_checkpoint ();
-
- timeout -= mono_msec_ticks () - start_time;
-
- if (wr != WAIT_TIMEOUT && wr != WAIT_IO_COMPLETION)
- data = dequeue_job (&io_queue_lock, &async_io_queue);
- }
- while (!data && timeout > 0);
- }
-
- if (!data) {
- if (InterlockedDecrement (&io_worker_threads) < 2) {
- /* If we have pending items, keep the thread alive */
- if (InterlockedCompareExchange (&pending_io_items, 0, 0) != 0) {
- InterlockedIncrement (&io_worker_threads);
- continue;
- }
- }
- return;
- }
-
- InterlockedIncrement (&busy_io_worker_threads);
- }
-
- g_assert_not_reached ();
-}
-
-static void
-start_io_thread_or_queue (MonoSocketAsyncResult *ares)
+static MonoObject *
+get_io_event (MonoMList **list, gint event)
{
- int busy, worker;
-
- busy = (int) InterlockedCompareExchange (&busy_io_worker_threads, 0, -1);
- worker = (int) InterlockedCompareExchange (&io_worker_threads, 0, -1);
- if (worker <= ++busy &&
- worker < mono_io_max_worker_threads) {
- InterlockedIncrement (&busy_io_worker_threads);
- InterlockedIncrement (&io_worker_threads);
- threadpool_jobs_inc ((MonoObject *)ares);
- mono_thread_create_internal (mono_get_root_domain (), async_invoke_io_thread, ares, TRUE);
- } else {
- append_job (&io_queue_lock, &async_io_queue, (MonoObject*)ares);
- ReleaseSemaphore (io_job_added, 1, NULL);
- }
-}
-
-static MonoMList *
-process_io_event (MonoMList *list, int event)
-{
- MonoSocketAsyncResult *state;
- MonoMList *oldlist;
+ MonoObject *state;
+ MonoMList *current;
+ MonoMList *prev;
- oldlist = list;
+ current = *list;
+ prev = NULL;
state = NULL;
- while (list) {
- state = (MonoSocketAsyncResult *) mono_mlist_get_data (list);
- if (get_event_from_state (state) == event)
+ while (current) {
+ state = mono_mlist_get_data (current);
+ if (get_event_from_state ((MonoSocketAsyncResult *) state) == event)
break;
-
- list = mono_mlist_next (list);
- }
- if (list != NULL) {
- oldlist = mono_mlist_remove_item (oldlist, list);
-#ifdef EPOLL_DEBUG
- g_print ("Dispatching event %d on socket %d\n", event, state->handle);
-#endif
- InterlockedIncrement (&pending_io_items);
- start_io_thread_or_queue (state);
+ state = NULL;
+ prev = current;
+ current = mono_mlist_next (current);
}
- return oldlist;
-}
-
-static int
-mark_bad_fds (mono_pollfd *pfds, int nfds)
-{
- int i, ret;
- mono_pollfd *pfd;
- int count = 0;
-
- for (i = 0; i < nfds; i++) {
- pfd = &pfds [i];
- if (pfd->fd == -1)
- continue;
-
- ret = mono_poll (pfd, 1, 0);
- if (ret == -1 && errno == EBADF) {
- pfd->revents |= MONO_POLLNVAL;
- count++;
- } else if (ret == 1) {
- count++;
- }
- }
-
- return count;
-}
-
-static void
-socket_io_poll_main (gpointer p)
-{
-#define INITIAL_POLLFD_SIZE 1024
-#define POLL_ERRORS (MONO_POLLERR | MONO_POLLHUP | MONO_POLLNVAL)
- SocketIOData *data = p;
- mono_pollfd *pfds;
- gint maxfd = 1;
- gint allocated;
- gint i;
- MonoThread *thread;
-
- thread = mono_thread_current ();
-
- allocated = INITIAL_POLLFD_SIZE;
- pfds = g_new0 (mono_pollfd, allocated);
- INIT_POLLFD (pfds, data->pipe [0], MONO_POLLIN);
- for (i = 1; i < allocated; i++)
- INIT_POLLFD (&pfds [i], -1, 0);
-
- while (1) {
- int nsock = 0;
- mono_pollfd *pfd;
- char one [1];
- MonoMList *list;
-
- do {
- if (nsock == -1) {
- if (THREAD_WANTS_A_BREAK (thread))
- mono_thread_interruption_checkpoint ();
- }
-
- nsock = mono_poll (pfds, maxfd, -1);
- } while (nsock == -1 && errno == EINTR);
-
- /*
- * Apart from EINTR, we only check EBADF, for the rest:
- * EINVAL: mono_poll() 'protects' us from descriptor
- * numbers above the limit if using select() by marking
- * then as MONO_POLLERR. If a system poll() is being
- * used, the number of descriptor we're passing will not
- * be over sysconf(_SC_OPEN_MAX), as the error would have
- * happened when opening.
- *
- * EFAULT: we own the memory pointed by pfds.
- * ENOMEM: we're doomed anyway
- *
- */
-
- if (nsock == -1 && errno == EBADF) {
- pfds->revents = 0; /* Just in case... */
- nsock = mark_bad_fds (pfds, maxfd);
- }
-
- if ((pfds->revents & POLL_ERRORS) != 0) {
- /* We're supposed to die now, as the pipe has been closed */
- g_free (pfds);
- socket_io_cleanup (data);
- return;
- }
-
- /* Got a new socket */
- if ((pfds->revents & MONO_POLLIN) != 0) {
- int nread;
-
- for (i = 1; i < allocated; i++) {
- pfd = &pfds [i];
- if (pfd->fd == -1 || pfd->fd == data->newpfd->fd)
- break;
- }
-
- if (i == allocated) {
- mono_pollfd *oldfd;
-
- oldfd = pfds;
- i = allocated;
- allocated = allocated * 2;
- pfds = g_renew (mono_pollfd, oldfd, allocated);
- g_free (oldfd);
- for (; i < allocated; i++)
- INIT_POLLFD (&pfds [i], -1, 0);
- }
-#ifndef PLATFORM_WIN32
- nread = read (data->pipe [0], one, 1);
-#else
- nread = recv ((SOCKET) data->pipe [0], one, 1, 0);
-#endif
- if (nread <= 0) {
- g_free (pfds);
- return; /* we're closed */
- }
-
- INIT_POLLFD (&pfds [i], data->newpfd->fd, data->newpfd->events);
- ReleaseSemaphore (data->new_sem, 1, NULL);
- if (i >= maxfd)
- maxfd = i + 1;
- nsock--;
- }
-
- if (nsock == 0)
- continue;
-
- EnterCriticalSection (&data->io_lock);
- if (data->inited == 0) {
- g_free (pfds);
- LeaveCriticalSection (&data->io_lock);
- return; /* cleanup called */
- }
-
- for (i = 1; i < maxfd && nsock > 0; i++) {
- pfd = &pfds [i];
- if (pfd->fd == -1 || pfd->revents == 0)
- continue;
-
- nsock--;
- list = mono_g_hash_table_lookup (data->sock_to_state, GINT_TO_POINTER (pfd->fd));
- if (list != NULL && (pfd->revents & (MONO_POLLIN | POLL_ERRORS)) != 0) {
- list = process_io_event (list, MONO_POLLIN);
- }
-
- if (list != NULL && (pfd->revents & (MONO_POLLOUT | POLL_ERRORS)) != 0) {
- list = process_io_event (list, MONO_POLLOUT);
- }
-
- if (list != NULL) {
- mono_g_hash_table_replace (data->sock_to_state, GINT_TO_POINTER (pfd->fd), list);
- pfd->events = get_events_from_list (list);
- } else {
- mono_g_hash_table_remove (data->sock_to_state, GINT_TO_POINTER (pfd->fd));
- pfd->fd = -1;
- if (i == maxfd - 1)
- maxfd--;
- }
+ if (current) {
+ if (prev) {
+ mono_mlist_set_next (prev, mono_mlist_next (current));
+ } else {
+ *list = mono_mlist_next (*list);
}
- LeaveCriticalSection (&data->io_lock);
}
-}
-
-#ifdef HAVE_EPOLL
-#define EPOLL_ERRORS (EPOLLERR | EPOLLHUP)
-static void
-socket_io_epoll_main (gpointer p)
-{
- SocketIOData *data;
- int epollfd;
- MonoThread *thread;
- struct epoll_event *events, *evt;
- const int nevents = 512;
- int ready = 0, i;
-
- data = p;
- epollfd = data->epollfd;
- thread = mono_thread_current ();
- events = g_new0 (struct epoll_event, nevents);
-
- while (1) {
- do {
- if (ready == -1) {
- if (THREAD_WANTS_A_BREAK (thread))
- mono_thread_interruption_checkpoint ();
- }
-#ifdef EPOLL_DEBUG
- g_print ("epoll_wait init\n");
-#endif
- ready = epoll_wait (epollfd, events, nevents, -1);
-#ifdef EPOLL_DEBUG
- {
- int err = errno;
- g_print ("epoll_wait end with %d ready sockets (%d %s).\n", ready, err, (err) ? g_strerror (err) : "");
- errno = err;
- }
-#endif
- } while (ready == -1 && errno == EINTR);
-
- if (ready == -1) {
- int err = errno;
- g_free (events);
- if (err != EBADF)
- g_warning ("epoll_wait: %d %s\n", err, g_strerror (err));
-
- close (epollfd);
- return;
- }
-
- EnterCriticalSection (&data->io_lock);
- if (data->inited == 0) {
-#ifdef EPOLL_DEBUG
- g_print ("data->inited == 0\n");
-#endif
- g_free (events);
- close (epollfd);
- return; /* cleanup called */
- }
-
- for (i = 0; i < ready; i++) {
- int fd;
- MonoMList *list;
- evt = &events [i];
- fd = evt->data.fd;
- list = mono_g_hash_table_lookup (data->sock_to_state, GINT_TO_POINTER (fd));
-#ifdef EPOLL_DEBUG
- g_print ("Event %d on %d list length: %d\n", evt->events, fd, mono_mlist_length (list));
-#endif
- if (list != NULL && (evt->events & (EPOLLIN | EPOLL_ERRORS)) != 0) {
- list = process_io_event (list, MONO_POLLIN);
- }
-
- if (list != NULL && (evt->events & (EPOLLOUT | EPOLL_ERRORS)) != 0) {
- list = process_io_event (list, MONO_POLLOUT);
- }
-
- if (list != NULL) {
- mono_g_hash_table_replace (data->sock_to_state, GINT_TO_POINTER (fd), list);
- evt->events = get_events_from_list (list);
-#ifdef EPOLL_DEBUG
- g_print ("MOD %d to %d\n", fd, evt->events);
-#endif
- if (epoll_ctl (epollfd, EPOLL_CTL_MOD, fd, evt)) {
- if (epoll_ctl (epollfd, EPOLL_CTL_ADD, fd, evt) == -1) {
-#ifdef EPOLL_DEBUG
- int err = errno;
- g_message ("epoll_ctl(MOD): %d %s fd: %d events: %d", err, g_strerror (err), fd, evt->events);
- errno = err;
-#endif
- }
- }
- } else {
- mono_g_hash_table_remove (data->sock_to_state, GINT_TO_POINTER (fd));
-#ifdef EPOLL_DEBUG
- g_print ("DEL %d\n", fd);
-#endif
- epoll_ctl (epollfd, EPOLL_CTL_DEL, fd, evt);
- }
- }
- LeaveCriticalSection (&data->io_lock);
- }
+ return state;
}
-#endif
/*
* select/poll wake up when a socket is closed, but epoll just removes
void
mono_thread_pool_remove_socket (int sock)
{
- MonoMList *list, *next;
+ MonoMList *list;
MonoSocketAsyncResult *state;
+ MonoObject *ares;
- if (socket_io_data.inited == FALSE)
+ if (socket_io_data.inited == 0)
return;
EnterCriticalSection (&socket_io_data.io_lock);
+ if (socket_io_data.sock_to_state == NULL) {
+ LeaveCriticalSection (&socket_io_data.io_lock);
+ return;
+ }
list = mono_g_hash_table_lookup (socket_io_data.sock_to_state, GINT_TO_POINTER (sock));
- if (list) {
+ if (list)
mono_g_hash_table_remove (socket_io_data.sock_to_state, GINT_TO_POINTER (sock));
- }
LeaveCriticalSection (&socket_io_data.io_lock);
while (list) {
else if (state->operation == AIO_OP_SEND)
state->operation = AIO_OP_SEND_JUST_CALLBACK;
- next = mono_mlist_remove_item (list, list);
- list = process_io_event (list, MONO_POLLIN);
- if (list)
- process_io_event (list, MONO_POLLOUT);
-
- list = next;
+ ares = get_io_event (&list, MONO_POLLIN);
+ threadpool_append_job (&async_io_tp, ares);
+ if (list) {
+ ares = get_io_event (&list, MONO_POLLOUT);
+ threadpool_append_job (&async_io_tp, ares);
+ }
}
}
-#ifdef PLATFORM_WIN32
static void
-connect_hack (gpointer x)
+init_event_system (SocketIOData *data)
{
- struct sockaddr_in *addr = (struct sockaddr_in *) x;
- int count = 0;
-
- while (connect ((SOCKET) socket_io_data.pipe [1], (SOCKADDR *) addr, sizeof (struct sockaddr_in))) {
- Sleep (500);
- if (++count > 3) {
- g_warning ("Error initializing async. sockets %d.\n", WSAGetLastError ());
- g_assert (WSAGetLastError ());
- }
- }
-}
+#ifdef HAVE_EPOLL
+ if (data->event_system == EPOLL_BACKEND)
+ data->event_data = tp_epoll_init (data);
+#elif defined(HAVE_KQUEUE)
+ if (data->event_system == KQUEUE_BACKEND)
+ data->event_data = tp_kqueue_init (data);
#endif
+ if (data->event_system == POLL_BACKEND)
+ data->event_data = tp_poll_init (data);
+}
static void
socket_io_init (SocketIOData *data)
{
-#ifdef PLATFORM_WIN32
- struct sockaddr_in server;
- struct sockaddr_in client;
- SOCKET srv;
- int len;
-#endif
int inited;
- inited = InterlockedCompareExchange (&data->inited, -1, -1);
- if (inited == 1)
+ if (data->inited >= 2) // 2 -> initialized, 3-> cleaned up
return;
- EnterCriticalSection (&data->io_lock);
- inited = InterlockedCompareExchange (&data->inited, -1, -1);
- if (inited == 1) {
- LeaveCriticalSection (&data->io_lock);
- return;
- }
-
-#ifdef HAVE_EPOLL
- data->epoll_disabled = (g_getenv ("MONO_DISABLE_AIO") != NULL);
- if (FALSE == data->epoll_disabled) {
- data->epollfd = epoll_create (256);
- data->epoll_disabled = (data->epollfd == -1);
- if (data->epoll_disabled && g_getenv ("MONO_DEBUG"))
- g_message ("epoll_create() failed. Using plain poll().");
- } else {
- data->epollfd = -1;
- }
-#else
- data->epoll_disabled = TRUE;
-#endif
-
-#ifndef PLATFORM_WIN32
- if (data->epoll_disabled) {
- if (pipe (data->pipe) != 0) {
- int err = errno;
- perror ("mono");
- g_assert (err);
+ inited = InterlockedCompareExchange (&data->inited, 1, 0);
+ if (inited >= 1) {
+ while (TRUE) {
+ if (data->inited >= 2)
+ return;
+ SleepEx (1, FALSE);
}
- } else {
- data->pipe [0] = -1;
- data->pipe [1] = -1;
- }
-#else
- srv = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
- g_assert (srv != INVALID_SOCKET);
- data->pipe [1] = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
- g_assert (data->pipe [1] != INVALID_SOCKET);
-
- server.sin_family = AF_INET;
- server.sin_addr.s_addr = inet_addr ("127.0.0.1");
- server.sin_port = 0;
- if (bind (srv, (SOCKADDR *) &server, sizeof (server))) {
- g_print ("%d\n", WSAGetLastError ());
- g_assert (1 != 0);
}
- len = sizeof (server);
- getsockname (srv, (SOCKADDR *) &server, &len);
- listen (srv, 1);
- mono_thread_create (mono_get_root_domain (), connect_hack, &server);
- len = sizeof (server);
- data->pipe [0] = accept (srv, (SOCKADDR *) &client, &len);
- g_assert (data->pipe [0] != INVALID_SOCKET);
- closesocket (srv);
-#endif
- mono_io_max_worker_threads = mono_max_worker_threads / 2;
- if (mono_io_max_worker_threads < 10)
- mono_io_max_worker_threads = 10;
-
+ EnterCriticalSection (&data->io_lock);
data->sock_to_state = mono_g_hash_table_new_type (g_direct_hash, g_direct_equal, MONO_HASH_VALUE_GC);
-
- if (data->epoll_disabled) {
- data->new_sem = CreateSemaphore (NULL, 1, 1, NULL);
- g_assert (data->new_sem != NULL);
- }
- io_job_added = CreateSemaphore (NULL, 0, 0x7fffffff, NULL);
- g_assert (io_job_added != NULL);
- if (data->epoll_disabled) {
- mono_thread_create_internal (mono_get_root_domain (), socket_io_poll_main, data, TRUE);
- }
#ifdef HAVE_EPOLL
- else {
- mono_thread_create_internal (mono_get_root_domain (), socket_io_epoll_main, data, TRUE);
- }
+ data->event_system = EPOLL_BACKEND;
+#elif defined(HAVE_KQUEUE)
+ data->event_system = KQUEUE_BACKEND;
+#else
+ data->event_system = POLL_BACKEND;
#endif
- InterlockedCompareExchange (&data->inited, 1, 0);
+ if (g_getenv ("MONO_DISABLE_AIO") != NULL)
+ data->event_system = POLL_BACKEND;
+
+ init_event_system (data);
+ mono_thread_create_internal (mono_get_root_domain (), data->wait, data, TRUE, SMALL_STACK);
LeaveCriticalSection (&data->io_lock);
+ data->inited = 2;
+ mono_thread_create_internal (mono_get_root_domain (), threadpool_start_idle_threads, &async_io_tp, TRUE, SMALL_STACK);
}
static void
-socket_io_add_poll (MonoSocketAsyncResult *state)
+socket_io_add (MonoAsyncResult *ares, MonoSocketAsyncResult *state)
{
- int events;
- char msg [1];
MonoMList *list;
SocketIOData *data = &socket_io_data;
+ int fd;
+ gboolean is_new;
+ int ievt;
-#if defined(PLATFORM_MACOSX) || defined(PLATFORM_BSD6) || defined(PLATFORM_WIN32) || defined(PLATFORM_SOLARIS)
- /* select() for connect() does not work well on the Mac. Bug #75436. */
- /* Bug #77637 for the BSD 6 case */
- /* Bug #78888 for the Windows case */
- if (state->operation == AIO_OP_CONNECT && state->blocking == TRUE) {
- start_io_thread_or_queue (state);
+ socket_io_init (&socket_io_data);
+ if (mono_runtime_is_shutting_down () || data->inited == 3 || data->sock_to_state == NULL)
+ return;
+ if (async_tp.pool_status == 2)
return;
- }
-#endif
- WaitForSingleObject (data->new_sem, INFINITE);
- if (data->newpfd == NULL)
- data->newpfd = g_new0 (mono_pollfd, 1);
-
- EnterCriticalSection (&data->io_lock);
- /* FIXME: 64 bit issue: handle can be a pointer on windows? */
- list = mono_g_hash_table_lookup (data->sock_to_state, GINT_TO_POINTER (state->handle));
- if (list == NULL) {
- list = mono_mlist_alloc ((MonoObject*)state);
- } else {
- list = mono_mlist_append (list, (MonoObject*)state);
- }
-
- events = get_events_from_list (list);
- INIT_POLLFD (data->newpfd, GPOINTER_TO_INT (state->handle), events);
- mono_g_hash_table_replace (data->sock_to_state, GINT_TO_POINTER (state->handle), list);
- LeaveCriticalSection (&data->io_lock);
- *msg = (char) state->operation;
-#ifndef PLATFORM_WIN32
- write (data->pipe [1], msg, 1);
-#else
- send ((SOCKET) data->pipe [1], msg, 1, 0);
-#endif
-}
-#ifdef HAVE_EPOLL
-static gboolean
-socket_io_add_epoll (MonoSocketAsyncResult *state)
-{
- MonoMList *list;
- SocketIOData *data = &socket_io_data;
- struct epoll_event event;
- int epoll_op, ievt;
- int fd;
+ MONO_OBJECT_SETREF (state, ares, ares);
- memset (&event, 0, sizeof (struct epoll_event));
fd = GPOINTER_TO_INT (state->handle);
EnterCriticalSection (&data->io_lock);
- list = mono_g_hash_table_lookup (data->sock_to_state, GINT_TO_POINTER (fd));
- if (list == NULL) {
- list = mono_mlist_alloc ((MonoObject*)state);
- epoll_op = EPOLL_CTL_ADD;
- } else {
- list = mono_mlist_append (list, (MonoObject*)state);
- epoll_op = EPOLL_CTL_MOD;
- }
-
- ievt = get_events_from_list (list);
- if ((ievt & MONO_POLLIN) != 0)
- event.events |= EPOLLIN;
- if ((ievt & MONO_POLLOUT) != 0)
- event.events |= EPOLLOUT;
-
- mono_g_hash_table_replace (data->sock_to_state, state->handle, list);
- event.data.fd = fd;
-#ifdef EPOLL_DEBUG
- g_print ("%s %d with %d\n", epoll_op == EPOLL_CTL_ADD ? "ADD" : "MOD", fd, event.events);
-#endif
- if (epoll_ctl (data->epollfd, epoll_op, fd, &event) == -1) {
- int err = errno;
- if (epoll_op == EPOLL_CTL_ADD && err == EEXIST) {
- epoll_op = EPOLL_CTL_MOD;
- if (epoll_ctl (data->epollfd, epoll_op, fd, &event) == -1) {
- g_message ("epoll_ctl(MOD): %d %s\n", err, g_strerror (err));
- }
- }
+ if (data->sock_to_state == NULL) {
+ LeaveCriticalSection (&data->io_lock);
+ return;
}
-
- LeaveCriticalSection (&data->io_lock);
- return TRUE;
-}
-#endif
-
-static void
-socket_io_add (MonoAsyncResult *ares, MonoSocketAsyncResult *state)
-{
- socket_io_init (&socket_io_data);
- MONO_OBJECT_SETREF (state, ares, ares);
-#ifdef HAVE_EPOLL
- if (socket_io_data.epoll_disabled == FALSE) {
- if (socket_io_add_epoll (state))
- return;
+ list = mono_g_hash_table_lookup (data->sock_to_state, GINT_TO_POINTER (fd));
+ if (list == NULL) {
+ list = mono_mlist_alloc ((MonoObject*)state);
+ is_new = TRUE;
+ } else {
+ list = mono_mlist_append (list, (MonoObject*)state);
+ is_new = FALSE;
}
-#endif
- socket_io_add_poll (state);
+
+ mono_g_hash_table_replace (data->sock_to_state, state->handle, list);
+ ievt = get_events_from_list (list);
+ LeaveCriticalSection (&data->io_lock);
+ data->modify (data->event_data, fd, state->operation, ievt, is_new);
}
+#ifndef DISABLE_SOCKETS
static gboolean
socket_io_filter (MonoObject *target, MonoObject *state)
{
gint op;
- MonoSocketAsyncResult *sock_res = (MonoSocketAsyncResult *) state;
+ MonoSocketAsyncResult *sock_res;
MonoClass *klass;
+ MonoDomain *domain;
if (target == NULL || state == NULL)
return FALSE;
- if (socket_async_call_klass == NULL) {
- klass = target->vtable->klass;
- /* Check if it's SocketAsyncCall in System.Net.Sockets
- * FIXME: check the assembly is signed correctly for extra care
- */
- if (klass->name [0] == 'S' && strcmp (klass->name, "SocketAsyncCall") == 0
- && strcmp (mono_image_get_name (klass->image), "System") == 0
- && klass->nested_in && strcmp (klass->nested_in->name, "Socket") == 0)
- socket_async_call_klass = klass;
- }
+ domain = target->vtable->domain;
+ klass = target->vtable->klass;
+ if (socket_async_call_klass == NULL && is_socketasynccall (domain, klass))
+ socket_async_call_klass = klass;
- if (process_async_call_klass == NULL) {
- klass = target->vtable->klass;
- /* Check if it's AsyncReadHandler in System.Diagnostics.Process
- * FIXME: check the assembly is signed correctly for extra care
- */
- if (klass->name [0] == 'A' && strcmp (klass->name, "AsyncReadHandler") == 0
- && strcmp (mono_image_get_name (klass->image), "System") == 0
- && klass->nested_in && strcmp (klass->nested_in->name, "Process") == 0)
- process_async_call_klass = klass;
- }
- /* return both when socket_async_call_klass has not been seen yet and when
- * the object is not an instance of the class.
- */
- if (target->vtable->klass != socket_async_call_klass && target->vtable->klass != process_async_call_klass)
+ if (process_async_call_klass == NULL && is_sdp_asyncreadhandler (domain, klass))
+ process_async_call_klass = klass;
+
+ if (klass != socket_async_call_klass && klass != process_async_call_klass)
return FALSE;
+ sock_res = (MonoSocketAsyncResult *) state;
op = sock_res->operation;
if (op < AIO_OP_FIRST || op >= AIO_OP_LAST)
return FALSE;
}
#endif /* !DISABLE_SOCKETS */
-static void
-mono_async_invoke (MonoAsyncResult *ares)
+/* Returns the exception thrown when invoking, if any */
+static MonoObject *
+mono_async_invoke (ThreadPool *tp, MonoAsyncResult *ares)
{
ASyncCall *ac = (ASyncCall *)ares->object_data;
- MonoThread *thread = NULL;
MonoObject *res, *exc = NULL;
MonoArray *out_args = NULL;
+ HANDLE wait_event = NULL;
if (ares->execution_context) {
/* use captured ExecutionContext (if available) */
- thread = mono_thread_current ();
- MONO_OBJECT_SETREF (ares, original_context, thread->execution_context);
- MONO_OBJECT_SETREF (thread, execution_context, ares->execution_context);
+ MONO_OBJECT_SETREF (ares, original_context, mono_thread_get_execution_context ());
+ mono_thread_set_execution_context (ares->execution_context);
} else {
ares->original_context = NULL;
}
- ac->msg->exc = NULL;
- res = mono_message_invoke (ares->async_delegate, ac->msg, &exc, &out_args);
- MONO_OBJECT_SETREF (ac, res, res);
- MONO_OBJECT_SETREF (ac, msg->exc, exc);
- MONO_OBJECT_SETREF (ac, out_args, out_args);
-
- ares->completed = 1;
-
- /* call async callback if cb_method != null*/
- if (ac->cb_method) {
- MonoObject *exc = NULL;
- void *pa = &ares;
- mono_runtime_invoke (ac->cb_method, ac->cb_target, pa, &exc);
- /* 'exc' will be the previous ac->msg->exc if not NULL and not
- * catched. If catched, this will be set to NULL and the
- * exception will not be printed. */
- MONO_OBJECT_SETREF (ac->msg, exc, exc);
+ if (ac == NULL) {
+ /* Fast path from ThreadPool.*QueueUserWorkItem */
+ void *pa = ares->async_state;
+ mono_runtime_delegate_invoke (ares->async_delegate, &pa, &exc);
+ } else {
+ MonoObject *cb_exc = NULL;
+
+ ac->msg->exc = NULL;
+ res = mono_message_invoke (ares->async_delegate, ac->msg, &exc, &out_args);
+ MONO_OBJECT_SETREF (ac, res, res);
+ MONO_OBJECT_SETREF (ac, msg->exc, exc);
+ MONO_OBJECT_SETREF (ac, out_args, out_args);
+
+ mono_monitor_enter ((MonoObject *) ares);
+ ares->completed = 1;
+ if (ares->handle != NULL)
+ wait_event = mono_wait_handle_get_handle ((MonoWaitHandle *) ares->handle);
+ mono_monitor_exit ((MonoObject *) ares);
+ /* notify listeners */
+ if (wait_event != NULL)
+ SetEvent (wait_event);
+
+ /* call async callback if cb_method != null*/
+ if (ac != NULL && ac->cb_method) {
+ void *pa = &ares;
+ cb_exc = NULL;
+ mono_runtime_invoke (ac->cb_method, ac->cb_target, pa, &cb_exc);
+ MONO_OBJECT_SETREF (ac->msg, exc, cb_exc);
+ exc = cb_exc;
+ } else {
+ exc = NULL;
+ }
}
/* restore original thread execution context if flow isn't suppressed, i.e. non null */
if (ares->original_context) {
- MONO_OBJECT_SETREF (thread, execution_context, ares->original_context);
+ mono_thread_set_execution_context (ares->original_context);
ares->original_context = NULL;
}
-
- /* notify listeners */
- mono_monitor_enter ((MonoObject *) ares);
- if (ares->handle != NULL) {
- ac->wait_event = (gsize) mono_wait_handle_get_handle ((MonoWaitHandle *) ares->handle);
- SetEvent ((gpointer)(gsize)ac->wait_event);
- }
- mono_monitor_exit ((MonoObject *) ares);
-
- EnterCriticalSection (&ares_lock);
- mono_g_hash_table_remove (ares_htable, ares);
- LeaveCriticalSection (&ares_lock);
+ return exc;
}
static void
-start_idle_threads (MonoAsyncResult *data)
+threadpool_start_idle_threads (ThreadPool *tp)
{
- int needed;
- int existing;
+ int n;
+ guint32 stack_size;
- needed = (int) InterlockedCompareExchange (&mono_min_worker_threads, 0, -1);
+ stack_size = (!tp->is_io) ? 0 : SMALL_STACK;
do {
- existing = (int) InterlockedCompareExchange (&mono_worker_threads, 0, -1);
- if ((needed - existing) > 0) {
- start_tpthread (data);
- if (data)
- threadpool_jobs_dec ((MonoObject*)data);
- data = NULL;
- Sleep (500);
+ while (1) {
+ n = tp->nthreads;
+ if (n >= tp->min_threads)
+ return;
+ if (InterlockedCompareExchange (&tp->nthreads, n + 1, n) == n)
+ break;
}
- } while ((needed - existing) > 0);
+ mono_perfcounter_update_value (tp->pc_nthreads, TRUE, 1);
+ mono_thread_create_internal (mono_get_root_domain (), tp->async_invoke, tp, TRUE, stack_size);
+ SleepEx (100, TRUE);
+ } while (1);
+}
+
+static void
+threadpool_init (ThreadPool *tp, int min_threads, int max_threads, void (*async_invoke) (gpointer))
+{
+ memset (tp, 0, sizeof (ThreadPool));
+ tp->min_threads = min_threads;
+ tp->max_threads = max_threads;
+ tp->async_invoke = async_invoke;
+ tp->queue = mono_cq_create ();
+ MONO_SEM_INIT (&tp->new_job, 0);
+}
+
+static void *
+init_perf_counter (const char *category, const char *counter)
+{
+ MonoString *category_str;
+ MonoString *counter_str;
+ MonoString *machine;
+ MonoDomain *root;
+ MonoBoolean custom;
+ int type;
+
+ if (category == NULL || counter == NULL)
+ return NULL;
+ root = mono_get_root_domain ();
+ category_str = mono_string_new (root, category);
+ counter_str = mono_string_new (root, counter);
+ machine = mono_string_new (root, ".");
+ return mono_perfcounter_get_impl (category_str, counter_str, NULL, machine, &type, &custom);
+}
+
+#ifdef DEBUG
+static void
+print_pool_info (ThreadPool *tp)
+{
- /* If we don't start any thread here, make sure 'data' is processed. */
- if (data != NULL) {
- start_thread_or_queue (data);
- threadpool_jobs_dec ((MonoObject*)data);
+// if (tp->tail - tp->head == 0)
+// return;
+
+ g_print ("Pool status? %d\n", InterlockedCompareExchange (&tp->pool_status, 0, 0));
+ g_print ("Min. threads: %d\n", InterlockedCompareExchange (&tp->min_threads, 0, 0));
+ g_print ("Max. threads: %d\n", InterlockedCompareExchange (&tp->max_threads, 0, 0));
+ g_print ("nthreads: %d\n", InterlockedCompareExchange (&tp->nthreads, 0, 0));
+ g_print ("busy threads: %d\n", InterlockedCompareExchange (&tp->busy_threads, 0, 0));
+ g_print ("Waiting: %d\n", InterlockedCompareExchange (&tp->waiting, 0, 0));
+ g_print ("Queued: %d\n", (tp->tail - tp->head));
+ if (tp == &async_tp) {
+ int i;
+ EnterCriticalSection (&wsqs_lock);
+ for (i = 0; i < wsqs->len; i++) {
+ g_print ("\tWSQ %d: %d\n", i, mono_wsq_count (g_ptr_array_index (wsqs, i)));
+ }
+ LeaveCriticalSection (&wsqs_lock);
+ } else {
+ g_print ("\tSockets: %d\n", mono_g_hash_table_size (socket_io_data.sock_to_state));
}
+ g_print ("-------------\n");
}
static void
-start_tpthread (MonoAsyncResult *data)
+signal_handler (int signo)
{
- InterlockedIncrement (&mono_worker_threads);
- InterlockedIncrement (&busy_worker_threads);
- threadpool_jobs_inc ((MonoObject *)data);
- mono_thread_create_internal (mono_get_root_domain (), async_invoke_thread, data, TRUE);
+ ThreadPool *tp;
+
+ tp = &async_tp;
+ g_print ("\n-----Non-IO-----\n");
+ print_pool_info (tp);
+ tp = &async_io_tp;
+ g_print ("\n-----IO-----\n");
+ print_pool_info (tp);
+ alarm (2);
+}
+#endif
+
+static void
+monitor_thread (gpointer data)
+{
+ ThreadPool *tp;
+ MonoInternalThread *thread;
+ guint32 ms;
+ gboolean need_one;
+ int i;
+
+ tp = data;
+ thread = mono_thread_internal_current ();
+ ves_icall_System_Threading_Thread_SetName_internal (thread, mono_string_new (mono_domain_get (), "Threadpool monitor"));
+ while (1) {
+ ms = 500;
+ do {
+ guint32 ts;
+ ts = mono_msec_ticks ();
+ if (SleepEx (ms, TRUE) == 0)
+ break;
+ ms -= (mono_msec_ticks () - ts);
+ if (mono_runtime_is_shutting_down ())
+ break;
+ if (THREAD_WANTS_A_BREAK (thread))
+ mono_thread_interruption_checkpoint ();
+ } while (ms > 0);
+
+ if (mono_runtime_is_shutting_down ())
+ break;
+ if (tp->waiting > 0)
+ continue;
+ need_one = (mono_cq_count (tp->queue) > 0);
+ if (!need_one) {
+ EnterCriticalSection (&wsqs_lock);
+ for (i = 0; wsqs != NULL && i < wsqs->len; i++) {
+ MonoWSQ *wsq;
+ wsq = g_ptr_array_index (wsqs, i);
+ if (mono_wsq_count (wsq) > 0) {
+ need_one = TRUE;
+ break;
+ }
+ }
+ LeaveCriticalSection (&wsqs_lock);
+ }
+ if (need_one)
+ threadpool_start_thread (tp);
+ }
}
void
mono_thread_pool_init ()
{
- int threads_per_cpu = THREADS_PER_CPU;
- int cpu_count;
+ gint threads_per_cpu = 1;
+ gint thread_count;
+ gint cpu_count = mono_cpu_count ();
+ int result;
- if ((int) InterlockedCompareExchange (&tp_inited, 1, 0) == 1)
+ if (tp_inited == 2)
return;
- MONO_GC_REGISTER_ROOT (ares_htable);
- MONO_GC_REGISTER_ROOT (socket_io_data.sock_to_state);
+ result = InterlockedCompareExchange (&tp_inited, 1, 0);
+ if (result == 1) {
+ while (1) {
+ SleepEx (1, FALSE);
+ if (tp_inited == 2)
+ return;
+ }
+ }
+
+ MONO_GC_REGISTER_ROOT_FIXED (socket_io_data.sock_to_state);
InitializeCriticalSection (&socket_io_data.io_lock);
- InitializeCriticalSection (&ares_lock);
- InitializeCriticalSection (&io_queue_lock);
- ares_htable = mono_g_hash_table_new_type (NULL, NULL, MONO_HASH_KEY_VALUE_GC);
- job_added = CreateSemaphore (NULL, 0, 0x7fffffff, NULL);
- g_assert (job_added != NULL);
if (g_getenv ("MONO_THREADS_PER_CPU") != NULL) {
threads_per_cpu = atoi (g_getenv ("MONO_THREADS_PER_CPU"));
- if (threads_per_cpu <= 0)
- threads_per_cpu = THREADS_PER_CPU;
+ if (threads_per_cpu < 1)
+ threads_per_cpu = 1;
}
- cpu_count = mono_cpu_count ();
- mono_max_worker_threads = 20 + threads_per_cpu * cpu_count;
- mono_min_worker_threads = cpu_count; /* 1 idle thread per cpu */
+ thread_count = MIN (cpu_count * threads_per_cpu, 100 * cpu_count);
+ threadpool_init (&async_tp, thread_count, MAX (100 * cpu_count, thread_count), async_invoke_thread);
+ threadpool_init (&async_io_tp, cpu_count * 2, cpu_count * 4, async_invoke_thread);
+ async_io_tp.is_io = TRUE;
async_call_klass = mono_class_from_name (mono_defaults.corlib, "System", "MonoAsyncCall");
g_assert (async_call_klass);
+
+ InitializeCriticalSection (&wsqs_lock);
+ wsqs = g_ptr_array_sized_new (MAX (100 * cpu_count, thread_count));
+ mono_wsq_init ();
+
+ async_tp.pc_nitems = init_perf_counter ("Mono Threadpool", "Work Items Added");
+ g_assert (async_tp.pc_nitems);
+
+ async_io_tp.pc_nitems = init_perf_counter ("Mono Threadpool", "IO Work Items Added");
+ g_assert (async_io_tp.pc_nitems);
+
+ async_tp.pc_nthreads = init_perf_counter ("Mono Threadpool", "# of Threads");
+ g_assert (async_tp.pc_nthreads);
+
+ async_io_tp.pc_nthreads = init_perf_counter ("Mono Threadpool", "# of IO Threads");
+ g_assert (async_io_tp.pc_nthreads);
+ tp_inited = 2;
+#ifdef DEBUG
+ signal (SIGALRM, signal_handler);
+ alarm (2);
+#endif
+}
+
+static MonoAsyncResult *
+create_simple_asyncresult (MonoObject *target, MonoObject *state)
+{
+ MonoDomain *domain = mono_domain_get ();
+ MonoAsyncResult *ares;
+
+ /* Don't call mono_async_result_new() to avoid capturing the context */
+ ares = (MonoAsyncResult *) mono_object_new (domain, mono_defaults.asyncresult_class);
+ MONO_OBJECT_SETREF (ares, async_delegate, target);
+ MONO_OBJECT_SETREF (ares, async_state, state);
+ return ares;
+}
+
+void
+icall_append_io_job (MonoObject *target, MonoSocketAsyncResult *state)
+{
+ MonoAsyncResult *ares;
+
+ ares = create_simple_asyncresult (target, (MonoObject *) state);
+ socket_io_add (ares, state);
}
MonoAsyncResult *
MonoAsyncResult *ares;
ASyncCall *ac;
- ac = (ASyncCall*)mono_object_new (mono_domain_get (), async_call_klass);
+ ac = (ASyncCall*)mono_object_new (domain, async_call_klass);
MONO_OBJECT_SETREF (ac, msg, msg);
MONO_OBJECT_SETREF (ac, state, state);
ares = mono_async_result_new (domain, NULL, ac->state, NULL, (MonoObject*)ac);
MONO_OBJECT_SETREF (ares, async_delegate, target);
- EnterCriticalSection (&ares_lock);
- mono_g_hash_table_insert (ares_htable, ares, ares);
- LeaveCriticalSection (&ares_lock);
-
#ifndef DISABLE_SOCKETS
if (socket_io_filter (target, state)) {
socket_io_add (ares, (MonoSocketAsyncResult *) state);
return ares;
}
#endif
-
- start_thread_or_queue (ares);
+ threadpool_append_job (&async_tp, (MonoObject *) ares);
return ares;
}
-static void
-start_thread_or_queue (MonoAsyncResult *ares)
-{
- int busy, worker;
-
- if ((int) InterlockedCompareExchange (&tp_idle_started, 1, 0) == 0) {
- threadpool_jobs_inc ((MonoObject*)ares);
- mono_thread_create_internal (mono_get_root_domain (), start_idle_threads, ares, TRUE);
- return;
- }
-
- busy = (int) InterlockedCompareExchange (&busy_worker_threads, 0, -1);
- worker = (int) InterlockedCompareExchange (&mono_worker_threads, 0, -1);
- if (worker <= ++busy &&
- worker < mono_max_worker_threads) {
- start_tpthread (ares);
- } else {
- append_job (&mono_delegate_section, &async_call_queue, (MonoObject*)ares);
- ReleaseSemaphore (job_added, 1, NULL);
- }
-}
-
MonoObject *
mono_thread_pool_finish (MonoAsyncResult *ares, MonoArray **out_args, MonoObject **exc)
{
ASyncCall *ac;
+ HANDLE wait_event;
*exc = NULL;
*out_args = NULL;
mono_monitor_enter ((MonoObject *) ares);
if (ares->endinvoke_called) {
- *exc = (MonoObject *)mono_exception_from_name (mono_defaults.corlib, "System",
- "InvalidOperationException");
+ *exc = (MonoObject *) mono_get_exception_invalid_operation (NULL);
mono_monitor_exit ((MonoObject *) ares);
return NULL;
}
ares->endinvoke_called = 1;
- ac = (ASyncCall *)ares->object_data;
-
- g_assert (ac != NULL);
-
/* wait until we are really finished */
if (!ares->completed) {
if (ares->handle == NULL) {
- ac->wait_event = (gsize)CreateEvent (NULL, TRUE, FALSE, NULL);
- g_assert(ac->wait_event != 0);
- MONO_OBJECT_SETREF (ares, handle, (MonoObject *) mono_wait_handle_new (mono_object_domain (ares), (gpointer)(gsize)ac->wait_event));
+ wait_event = CreateEvent (NULL, TRUE, FALSE, NULL);
+ g_assert(wait_event != 0);
+ MONO_OBJECT_SETREF (ares, handle, (MonoObject *) mono_wait_handle_new (mono_object_domain (ares), wait_event));
+ } else {
+ wait_event = mono_wait_handle_get_handle ((MonoWaitHandle *) ares->handle);
}
mono_monitor_exit ((MonoObject *) ares);
- WaitForSingleObjectEx ((gpointer)(gsize)ac->wait_event, INFINITE, TRUE);
+ WaitForSingleObjectEx (wait_event, INFINITE, TRUE);
} else {
mono_monitor_exit ((MonoObject *) ares);
}
+ ac = (ASyncCall *) ares->object_data;
+ g_assert (ac != NULL);
*exc = ac->msg->exc; /* FIXME: GC add write barrier */
*out_args = ac->out_args;
return ac->res;
}
+static void
+threadpool_kill_idle_threads (ThreadPool *tp)
+{
+ gint n;
+
+ n = (gint) InterlockedCompareExchange (&tp->max_threads, 0, -1);
+ while (n) {
+ n--;
+ MONO_SEM_POST (&tp->new_job);
+ }
+}
+
void
mono_thread_pool_cleanup (void)
{
- gint release;
+ if (!(async_tp.pool_status == 0 || async_tp.pool_status == 2)) {
+ if (!(async_tp.pool_status == 1 && InterlockedCompareExchange (&async_tp.pool_status, 2, 1) == 2)) {
+ InterlockedExchange (&async_io_tp.pool_status, 2);
+ threadpool_free_queue (&async_tp);
+ threadpool_kill_idle_threads (&async_tp);
+
+ socket_io_cleanup (&socket_io_data); /* Empty when DISABLE_SOCKETS is defined */
+ threadpool_free_queue (&async_io_tp);
+ threadpool_kill_idle_threads (&async_io_tp);
+ MONO_SEM_DESTROY (&async_io_tp.new_job);
+ }
+ }
+
+ EnterCriticalSection (&wsqs_lock);
+ mono_wsq_cleanup ();
+ if (wsqs)
+ g_ptr_array_free (wsqs, TRUE);
+ wsqs = NULL;
+ LeaveCriticalSection (&wsqs_lock);
+ MONO_SEM_DESTROY (&async_tp.new_job);
+}
+
+static gboolean
+threadpool_start_thread (ThreadPool *tp)
+{
+ gint n;
+ guint32 stack_size;
+
+ stack_size = (!tp->is_io) ? 0 : SMALL_STACK;
+ while (!mono_runtime_is_shutting_down () && (n = tp->nthreads) < tp->max_threads) {
+ if (InterlockedCompareExchange (&tp->nthreads, n + 1, n) == n) {
+ mono_perfcounter_update_value (tp->pc_nthreads, TRUE, 1);
+ mono_thread_create_internal (mono_get_root_domain (), tp->async_invoke, tp, TRUE, stack_size);
+ return TRUE;
+ }
+ }
+
+ return FALSE;
+}
+
+static void
+pulse_on_new_job (ThreadPool *tp)
+{
+ if (tp->waiting)
+ MONO_SEM_POST (&tp->new_job);
+}
- EnterCriticalSection (&mono_delegate_section);
- free_queue (&async_call_queue);
- release = (gint) InterlockedCompareExchange (&mono_worker_threads, 0, -1);
- LeaveCriticalSection (&mono_delegate_section);
- if (job_added)
- ReleaseSemaphore (job_added, release, NULL);
+void
+icall_append_job (MonoObject *ar)
+{
+ threadpool_append_job (&async_tp, ar);
+}
- socket_io_cleanup (&socket_io_data);
+static void
+threadpool_append_job (ThreadPool *tp, MonoObject *ar)
+{
+ threadpool_append_jobs (tp, &ar, 1);
}
static void
-append_job (CRITICAL_SECTION *cs, TPQueue *list, MonoObject *ar)
+threadpool_append_jobs (ThreadPool *tp, MonoObject **jobs, gint njobs)
{
- threadpool_jobs_inc (ar);
+ static int job_counter;
+ MonoObject *ar;
+ gint i;
- EnterCriticalSection (cs);
- if (list->array && (list->next_elem < mono_array_length (list->array))) {
- mono_array_setref (list->array, list->next_elem, ar);
- list->next_elem++;
- LeaveCriticalSection (cs);
+ if (mono_runtime_is_shutting_down ())
return;
+
+ if (tp->pool_status == 0 && InterlockedCompareExchange (&tp->pool_status, 1, 0) == 0) {
+ if (!tp->is_io)
+ mono_thread_create_internal (mono_get_root_domain (), monitor_thread, tp, TRUE, SMALL_STACK);
+ /* Create on demand up to min_threads to avoid startup penalty for apps that don't use
+ * the threadpool that much
+ * mono_thread_create_internal (mono_get_root_domain (), threadpool_start_idle_threads, tp, TRUE, SMALL_STACK);
+ */
}
- if (!list->array) {
- MONO_GC_REGISTER_ROOT (list->array);
- list->array = mono_array_new (mono_get_root_domain (), mono_defaults.object_class, 16);
- } else {
- int count = list->next_elem - list->first_elem;
- /* slide the array or create a larger one if it's full */
- if (list->first_elem) {
- mono_array_memcpy_refs (list->array, 0, list->array, list->first_elem, count);
- } else {
- MonoArray *newa = mono_array_new (mono_get_root_domain (), mono_defaults.object_class, mono_array_length (list->array) * 2);
- mono_array_memcpy_refs (newa, 0, list->array, list->first_elem, count);
- list->array = newa;
+
+ for (i = 0; i < njobs; i++) {
+ ar = jobs [i];
+ if (ar == NULL || mono_domain_is_unloading (ar->vtable->domain))
+ continue; /* Might happen when cleaning domain jobs */
+ if (!tp->is_io && (InterlockedIncrement (&job_counter) % 10) == 0) {
+ MonoAsyncResult *o = (MonoAsyncResult *) ar;
+ o->add_time = mono_100ns_ticks ();
}
- list->first_elem = 0;
- list->next_elem = count;
+ threadpool_jobs_inc (ar);
+ mono_perfcounter_update_value (tp->pc_nitems, TRUE, 1);
+ if (!tp->is_io && mono_wsq_local_push (ar))
+ continue;
+
+ mono_cq_enqueue (tp->queue, ar);
}
- mono_array_setref (list->array, list->next_elem, ar);
- list->next_elem++;
- LeaveCriticalSection (cs);
-}
+ for (i = 0; i < MIN(njobs, tp->max_threads); i++)
+ pulse_on_new_job (tp);
+}
static void
-clear_queue (CRITICAL_SECTION *cs, TPQueue *list, MonoDomain *domain)
+threadpool_clear_queue (ThreadPool *tp, MonoDomain *domain)
{
- int i, count = 0;
- EnterCriticalSection (cs);
- /*remove*/
- for (i = list->first_elem; i < list->next_elem; ++i) {
- MonoObject *obj = mono_array_get (list->array, MonoObject*, i);
- if (obj->vtable->domain == domain) {
- unregister_job ((MonoAsyncResult*)obj);
-
- mono_array_set (list->array, MonoObject*, i, NULL);
- InterlockedDecrement (&domain->threadpool_jobs);
- ++count;
+ MonoObject *obj;
+ MonoMList *other;
+ int domain_count;
+
+ other = NULL;
+ domain_count = 0;
+ while (mono_cq_dequeue (tp->queue, &obj)) {
+ if (obj != NULL && obj->vtable->domain == domain) {
+ domain_count++;
+ threadpool_jobs_dec (obj);
+ } else if (obj != NULL) {
+ other = mono_mlist_prepend (other, obj);
}
}
- /*compact*/
- if (count) {
- int idx = 0;
- for (i = list->first_elem; i < list->next_elem; ++i) {
- MonoObject *obj = mono_array_get (list->array, MonoObject*, i);
- if (obj)
- mono_array_set (list->array, MonoObject*, idx++, obj);
- }
- list->first_elem = 0;
- list->next_elem = count;
+
+ while (other) {
+ threadpool_append_job (tp, (MonoObject *) mono_mlist_get_data (other));
+ other = mono_mlist_next (other);
}
- LeaveCriticalSection (cs);
}
/*
int result = TRUE;
guint32 start_time = 0;
- clear_queue (&mono_delegate_section, &async_call_queue, domain);
- clear_queue (&io_queue_lock, &async_io_queue, domain);
+ g_assert (domain->state == MONO_APPDOMAIN_UNLOADING);
+
+ threadpool_clear_queue (&async_tp, domain);
+ threadpool_clear_queue (&async_io_tp, domain);
/*
* There might be some threads out that could be about to execute stuff from the given domain.
* We avoid that by setting up a semaphore to be pulsed by the thread that reaches zero.
*/
sem_handle = CreateSemaphore (NULL, 0, 1, NULL);
-
+
domain->cleanup_semaphore = sem_handle;
/*
* The memory barrier here is required to have global ordering between assigning to cleanup_semaphone
return result;
}
+static void
+threadpool_free_queue (ThreadPool *tp)
+{
+ mono_cq_destroy (tp->queue);
+ tp->queue = NULL;
+}
+
+gboolean
+mono_thread_pool_is_queue_array (MonoArray *o)
+{
+ // gpointer obj = o;
+
+ // FIXME: need some fix in sgen code.
+ return FALSE;
+}
-static MonoObject*
-dequeue_job (CRITICAL_SECTION *cs, TPQueue *list)
+static MonoWSQ *
+add_wsq (void)
{
- MonoObject *ar;
- int count;
+ int i;
+ MonoWSQ *wsq;
- EnterCriticalSection (cs);
- if (!list->array || list->first_elem == list->next_elem) {
- LeaveCriticalSection (cs);
+ EnterCriticalSection (&wsqs_lock);
+ wsq = mono_wsq_create ();
+ if (wsqs == NULL) {
+ LeaveCriticalSection (&wsqs_lock);
return NULL;
}
- ar = mono_array_get (list->array, MonoObject*, list->first_elem);
- list->first_elem++;
- count = list->next_elem - list->first_elem;
- /* reduce the size of the array if it's mostly empty */
- if (mono_array_length (list->array) > 16 && count < (mono_array_length (list->array) / 3)) {
- MonoArray *newa = mono_array_new (mono_get_root_domain (), mono_defaults.object_class, mono_array_length (list->array) / 2);
- mono_array_memcpy_refs (newa, 0, list->array, list->first_elem, count);
- list->array = newa;
- list->first_elem = 0;
- list->next_elem = count;
+ for (i = 0; i < wsqs->len; i++) {
+ if (g_ptr_array_index (wsqs, i) == NULL) {
+ wsqs->pdata [i] = wsq;
+ LeaveCriticalSection (&wsqs_lock);
+ return wsq;
+ }
+ }
+ g_ptr_array_add (wsqs, wsq);
+ LeaveCriticalSection (&wsqs_lock);
+ return wsq;
+}
+
+static void
+remove_wsq (MonoWSQ *wsq)
+{
+ gpointer data;
+
+ if (wsq == NULL)
+ return;
+
+ EnterCriticalSection (&wsqs_lock);
+ if (wsqs == NULL) {
+ LeaveCriticalSection (&wsqs_lock);
+ return;
+ }
+ g_ptr_array_remove_fast (wsqs, wsq);
+ data = NULL;
+ /*
+ * Only clean this up when shutting down, any other case will error out
+ * if we're removing a queue that still has work items.
+ */
+ if (mono_runtime_is_shutting_down ()) {
+ while (mono_wsq_local_pop (&data)) {
+ threadpool_jobs_dec (data);
+ data = NULL;
+ }
}
- LeaveCriticalSection (cs);
+ mono_wsq_destroy (wsq);
+ LeaveCriticalSection (&wsqs_lock);
+}
+
+static void
+try_steal (gpointer *data, gboolean retry)
+{
+ int i;
+ int ms;
+
+ if (wsqs == NULL || data == NULL || *data != NULL)
+ return;
+
+ ms = 0;
+ do {
+ if (mono_runtime_is_shutting_down ())
+ return;
+ for (i = 0; wsqs != NULL && i < wsqs->len; i++) {
+ if (mono_runtime_is_shutting_down ()) {
+ return;
+ }
+ mono_wsq_try_steal (wsqs->pdata [i], data, ms);
+ if (*data != NULL) {
+ return;
+ }
+ }
+ ms += 10;
+ } while (retry && ms < 11);
+}
- return ar;
+static gboolean
+dequeue_or_steal (ThreadPool *tp, gpointer *data)
+{
+ if (mono_runtime_is_shutting_down ())
+ return FALSE;
+ mono_cq_dequeue (tp->queue, (MonoObject **) data);
+ if (!tp->is_io && !*data)
+ try_steal (data, FALSE);
+ return (*data != NULL);
}
static void
-free_queue (TPQueue *list)
+process_idle_times (ThreadPool *tp, gint64 t)
+{
+ gint64 ticks;
+ gint64 avg;
+ gboolean compute_avg;
+ gint new_threads;
+ gint64 per1;
+
+ if (tp->ignore_times || t <= 0)
+ return;
+
+ compute_avg = FALSE;
+ ticks = mono_100ns_ticks ();
+ t = ticks - t;
+ SPIN_LOCK (tp->sp_lock);
+ if (tp->ignore_times) {
+ SPIN_UNLOCK (tp->sp_lock);
+ return;
+ }
+ tp->time_sum += t;
+ tp->n_sum++;
+ if (tp->last_check == 0)
+ tp->last_check = ticks;
+ else if (tp->last_check > 0 && (ticks - tp->last_check) > 5000000) {
+ tp->ignore_times = 1;
+ compute_avg = TRUE;
+ }
+ SPIN_UNLOCK (tp->sp_lock);
+
+ if (!compute_avg)
+ return;
+
+ //printf ("Items: %d Time elapsed: %.3fs\n", tp->n_sum, (ticks - tp->last_check) / 10000.0);
+ tp->last_check = ticks;
+ new_threads = 0;
+ avg = tp->time_sum / tp->n_sum;
+ if (tp->averages [1] == 0) {
+ tp->averages [1] = avg;
+ } else {
+ per1 = ((100 * (ABS (avg - tp->averages [1]))) / tp->averages [1]);
+ if (per1 > 5) {
+ if (avg > tp->averages [1]) {
+ if (tp->averages [1] < tp->averages [0]) {
+ new_threads = -1;
+ } else {
+ new_threads = 1;
+ }
+ } else if (avg < tp->averages [1] && tp->averages [1] < tp->averages [0]) {
+ new_threads = 1;
+ }
+ } else {
+ int min, n;
+ min = tp->min_threads;
+ n = tp->nthreads;
+ if ((n - min) < min && tp->busy_threads == n)
+ new_threads = 1;
+ }
+ /*
+ if (new_threads != 0) {
+ printf ("n: %d per1: %lld avg=%lld avg1=%lld avg0=%lld\n", new_threads, per1, avg, tp->averages [1], tp->averages [0]);
+ }
+ */
+ }
+
+ tp->time_sum = 0;
+ tp->n_sum = 0;
+
+ tp->averages [0] = tp->averages [1];
+ tp->averages [1] = avg;
+ tp->ignore_times = 0;
+
+ if (new_threads == -1) {
+ if (tp->destroy_thread == 0 && InterlockedCompareExchange (&tp->destroy_thread, 1, 0) == 0)
+ pulse_on_new_job (tp);
+ }
+}
+
+static gboolean
+should_i_die (ThreadPool *tp)
{
- list->array = NULL;
- list->first_elem = list->next_elem = 0;
+ gboolean result = FALSE;
+ if (tp->destroy_thread == 1 && InterlockedCompareExchange (&tp->destroy_thread, 0, 1) == 1)
+ result = (tp->nthreads > tp->min_threads);
+ return result;
}
static void
async_invoke_thread (gpointer data)
{
MonoDomain *domain;
- MonoThread *thread;
- int workers, min;
- const gchar *version;
-
- thread = mono_thread_current ();
- version = mono_get_runtime_info ()->framework_version;
+ MonoInternalThread *thread;
+ MonoWSQ *wsq;
+ ThreadPool *tp;
+ gboolean must_die;
+ const gchar *name;
+
+ tp = data;
+ wsq = NULL;
+ if (!tp->is_io)
+ wsq = add_wsq ();
+
+ thread = mono_thread_internal_current ();
+
+ mono_profiler_thread_start (thread->tid);
+ name = (tp->is_io) ? "IO Threadpool worker" : "Threadpool worker";
+ ves_icall_System_Threading_Thread_SetName_internal (thread, mono_string_new (mono_domain_get (), name));
+
+ if (tp_start_func)
+ tp_start_func (tp_hooks_user_data);
+
+ data = NULL;
for (;;) {
MonoAsyncResult *ar;
+ MonoClass *klass;
+ gboolean is_io_task;
+ gboolean is_socket;
+ int n_naps = 0;
+ is_io_task = FALSE;
ar = (MonoAsyncResult *) data;
if (ar) {
+ InterlockedIncrement (&tp->busy_threads);
+ domain = ((MonoObject *)ar)->vtable->domain;
+#ifndef DISABLE_SOCKETS
+ klass = ((MonoObject *) data)->vtable->klass;
+ is_io_task = !is_corlib_asyncresult (domain, klass);
+ is_socket = FALSE;
+ if (is_io_task) {
+ MonoSocketAsyncResult *state = (MonoSocketAsyncResult *) data;
+ is_socket = is_socketasyncresult (domain, klass);
+ ar = state->ares;
+ switch (state->operation) {
+ case AIO_OP_RECEIVE:
+ state->total = ICALL_RECV (state);
+ break;
+ case AIO_OP_SEND:
+ state->total = ICALL_SEND (state);
+ break;
+ }
+ }
+#endif
/* worker threads invokes methods in different domains,
* so we need to set the right domain here */
- domain = ((MonoObject *)ar)->vtable->domain;
-
g_assert (domain);
- if (domain->state == MONO_APPDOMAIN_UNLOADED || domain->state == MONO_APPDOMAIN_UNLOADING) {
+ if (mono_domain_is_unloading (domain) || mono_runtime_is_shutting_down ()) {
threadpool_jobs_dec ((MonoObject *)ar);
- unregister_job (ar);
data = NULL;
+ ar = NULL;
+ InterlockedDecrement (&tp->busy_threads);
} else {
mono_thread_push_appdomain_ref (domain);
if (threadpool_jobs_dec ((MonoObject *)ar)) {
- unregister_job (ar);
data = NULL;
+ ar = NULL;
mono_thread_pop_appdomain_ref ();
+ InterlockedDecrement (&tp->busy_threads);
continue;
}
if (mono_domain_set (domain, FALSE)) {
- ASyncCall *ac;
-
- mono_async_invoke (ar);
- ac = (ASyncCall *) ar->object_data;
- /*
- if (ac->msg->exc != NULL)
- mono_unhandled_exception (ac->msg->exc);
- */
+ MonoObject *exc;
+
+ if (tp_item_begin_func)
+ tp_item_begin_func (tp_item_user_data);
+
+ if (!is_io_task && ar->add_time > 0)
+ process_idle_times (tp, ar->add_time);
+ exc = mono_async_invoke (tp, ar);
+ if (tp_item_end_func)
+ tp_item_end_func (tp_item_user_data);
+ if (exc && mono_runtime_unhandled_exception_policy_get () == MONO_UNHANDLED_POLICY_CURRENT) {
+ gboolean unloaded;
+ MonoClass *klass;
+
+ klass = exc->vtable->klass;
+ unloaded = is_appdomainunloaded_exception (exc->vtable->domain, klass);
+ if (!unloaded && klass != mono_defaults.threadabortexception_class) {
+ mono_unhandled_exception (exc);
+ exit (255);
+ }
+ if (klass == mono_defaults.threadabortexception_class)
+ mono_thread_internal_reset_abort (thread);
+ }
+ if (is_socket && tp->is_io) {
+ MonoSocketAsyncResult *state = (MonoSocketAsyncResult *) data;
+
+ if (state->completed && state->callback) {
+ MonoAsyncResult *cb_ares;
+ cb_ares = create_simple_asyncresult ((MonoObject *) state->callback,
+ (MonoObject *) state);
+ icall_append_job ((MonoObject *) cb_ares);
+ }
+ }
mono_domain_set (mono_get_root_domain (), TRUE);
}
mono_thread_pop_appdomain_ref ();
- InterlockedDecrement (&busy_worker_threads);
+ InterlockedDecrement (&tp->busy_threads);
/* If the callee changes the background status, set it back to TRUE */
- if (*version != '1' && !mono_thread_test_state (thread , ThreadState_Background))
+ mono_thread_clr_state (thread , ~ThreadState_Background);
+ if (!mono_thread_test_state (thread , ThreadState_Background))
ves_icall_System_Threading_Thread_SetState (thread, ThreadState_Background);
}
}
- data = dequeue_job (&mono_delegate_section, &async_call_queue);
+ ar = NULL;
+ data = NULL;
+ must_die = should_i_die (tp);
+ if (!must_die && (tp->is_io || !mono_wsq_local_pop (&data)))
+ dequeue_or_steal (tp, &data);
- if (!data) {
- guint32 wr;
- int timeout = THREAD_EXIT_TIMEOUT;
- guint32 start_time = mono_msec_ticks ();
-
- do {
- wr = WaitForSingleObjectEx (job_added, (guint32)timeout, TRUE);
+ n_naps = 0;
+ while (!must_die && !data && n_naps < 4) {
+ gboolean res;
+
+ InterlockedIncrement (&tp->waiting);
+#if defined(__OpenBSD__)
+ while ((res = mono_sem_wait (&tp->new_job, TRUE)) == -1) {// && errno == EINTR) {
+#else
+ while ((res = mono_sem_timedwait (&tp->new_job, 2000, TRUE)) == -1) {// && errno == EINTR) {
+#endif
+ if (mono_runtime_is_shutting_down ())
+ break;
if (THREAD_WANTS_A_BREAK (thread))
mono_thread_interruption_checkpoint ();
-
- timeout -= mono_msec_ticks () - start_time;
-
- if (wr != WAIT_TIMEOUT && wr != WAIT_IO_COMPLETION)
- data = dequeue_job (&mono_delegate_section, &async_call_queue);
}
- while (!data && timeout > 0);
+ InterlockedDecrement (&tp->waiting);
+ if (mono_runtime_is_shutting_down ())
+ break;
+ must_die = should_i_die (tp);
+ dequeue_or_steal (tp, &data);
+ n_naps++;
}
- if (!data) {
- workers = (int) InterlockedCompareExchange (&mono_worker_threads, 0, -1);
- min = (int) InterlockedCompareExchange (&mono_min_worker_threads, 0, -1);
-
- while (!data && workers <= min) {
- WaitForSingleObjectEx (job_added, INFINITE, TRUE);
- if (THREAD_WANTS_A_BREAK (thread))
- mono_thread_interruption_checkpoint ();
-
- data = dequeue_job (&mono_delegate_section, &async_call_queue);
- workers = (int) InterlockedCompareExchange (&mono_worker_threads, 0, -1);
- min = (int) InterlockedCompareExchange (&mono_min_worker_threads, 0, -1);
+ if (!data && !tp->is_io && !mono_runtime_is_shutting_down ()) {
+ mono_wsq_local_pop (&data);
+ if (data && must_die) {
+ InterlockedCompareExchange (&tp->destroy_thread, 1, 0);
+ pulse_on_new_job (tp);
}
}
-
+
if (!data) {
- InterlockedDecrement (&mono_worker_threads);
- return;
+ gint nt;
+ gboolean down;
+ while (1) {
+ nt = tp->nthreads;
+ down = mono_runtime_is_shutting_down ();
+ if (!down && nt <= tp->min_threads)
+ break;
+ if (down || InterlockedCompareExchange (&tp->nthreads, nt - 1, nt) == nt) {
+ mono_perfcounter_update_value (tp->pc_nthreads, TRUE, -1);
+ if (!tp->is_io) {
+ remove_wsq (wsq);
+ }
+
+ mono_profiler_thread_end (thread->tid);
+
+ if (tp_finish_func)
+ tp_finish_func (tp_hooks_user_data);
+ return;
+ }
+ }
}
-
- InterlockedIncrement (&busy_worker_threads);
}
g_assert_not_reached ();
void
ves_icall_System_Threading_ThreadPool_GetAvailableThreads (gint *workerThreads, gint *completionPortThreads)
{
- gint busy, busy_io;
-
- MONO_ARCH_SAVE_REGS;
-
- busy = (gint) InterlockedCompareExchange (&busy_worker_threads, 0, -1);
- busy_io = (gint) InterlockedCompareExchange (&busy_io_worker_threads, 0, -1);
- *workerThreads = mono_max_worker_threads - busy;
- *completionPortThreads = mono_io_max_worker_threads - busy_io;
+ *workerThreads = async_tp.max_threads - async_tp.busy_threads;
+ *completionPortThreads = async_io_tp.max_threads - async_io_tp.busy_threads;
}
void
ves_icall_System_Threading_ThreadPool_GetMaxThreads (gint *workerThreads, gint *completionPortThreads)
{
- MONO_ARCH_SAVE_REGS;
-
- *workerThreads = mono_max_worker_threads;
- *completionPortThreads = mono_io_max_worker_threads;
+ *workerThreads = async_tp.max_threads;
+ *completionPortThreads = async_io_tp.max_threads;
}
void
ves_icall_System_Threading_ThreadPool_GetMinThreads (gint *workerThreads, gint *completionPortThreads)
{
- gint workers, workers_io;
-
- MONO_ARCH_SAVE_REGS;
-
- workers = (gint) InterlockedCompareExchange (&mono_min_worker_threads, 0, -1);
- workers_io = (gint) InterlockedCompareExchange (&mono_io_min_worker_threads, 0, -1);
-
- *workerThreads = workers;
- *completionPortThreads = workers_io;
+ *workerThreads = async_tp.min_threads;
+ *completionPortThreads = async_io_tp.min_threads;
}
MonoBoolean
ves_icall_System_Threading_ThreadPool_SetMinThreads (gint workerThreads, gint completionPortThreads)
{
- MONO_ARCH_SAVE_REGS;
+ gint max_threads;
+ gint max_io_threads;
- if (workerThreads < 0 || workerThreads > mono_max_worker_threads)
+ max_threads = async_tp.max_threads;
+ if (workerThreads <= 0 || workerThreads > max_threads)
return FALSE;
- if (completionPortThreads < 0 || completionPortThreads > mono_io_max_worker_threads)
+ max_io_threads = async_io_tp.max_threads;
+ if (completionPortThreads <= 0 || completionPortThreads > max_io_threads)
return FALSE;
- InterlockedExchange (&mono_min_worker_threads, workerThreads);
- InterlockedExchange (&mono_io_min_worker_threads, completionPortThreads);
- mono_thread_create_internal (mono_get_root_domain (), start_idle_threads, NULL, TRUE);
+ InterlockedExchange (&async_tp.min_threads, workerThreads);
+ InterlockedExchange (&async_io_tp.min_threads, completionPortThreads);
+ if (workerThreads > async_tp.nthreads)
+ mono_thread_create_internal (mono_get_root_domain (), threadpool_start_idle_threads, &async_tp, TRUE, SMALL_STACK);
+ if (completionPortThreads > async_io_tp.nthreads)
+ mono_thread_create_internal (mono_get_root_domain (), threadpool_start_idle_threads, &async_io_tp, TRUE, SMALL_STACK);
return TRUE;
}
MonoBoolean
ves_icall_System_Threading_ThreadPool_SetMaxThreads (gint workerThreads, gint completionPortThreads)
{
- MONO_ARCH_SAVE_REGS;
+ gint min_threads;
+ gint min_io_threads;
+ gint cpu_count;
- if (workerThreads < mono_max_worker_threads)
+ cpu_count = mono_cpu_count ();
+ min_threads = async_tp.min_threads;
+ if (workerThreads < min_threads || workerThreads < cpu_count)
return FALSE;
- if (completionPortThreads < mono_io_max_worker_threads)
+ /* We don't really have the concept of completion ports. Do we care here? */
+ min_io_threads = async_io_tp.min_threads;
+ if (completionPortThreads < min_io_threads || completionPortThreads < cpu_count)
return FALSE;
- InterlockedExchange (&mono_max_worker_threads, workerThreads);
- InterlockedExchange (&mono_io_max_worker_threads, completionPortThreads);
+ InterlockedExchange (&async_tp.max_threads, workerThreads);
+ InterlockedExchange (&async_io_tp.max_threads, completionPortThreads);
return TRUE;
}
+/**
+ * mono_install_threadpool_thread_hooks
+ * @start_func: the function to be called right after a new threadpool thread is created. Can be NULL.
+ * @finish_func: the function to be called right before a thredpool thread is exiting. Can be NULL.
+ * @user_data: argument passed to @start_func and @finish_func.
+ *
+ * @start_fun will be called right after a threadpool thread is created and @finish_func right before a threadpool thread exits.
+ * The calls will be made from the thread itself.
+ */
+void
+mono_install_threadpool_thread_hooks (MonoThreadPoolFunc start_func, MonoThreadPoolFunc finish_func, gpointer user_data)
+{
+ tp_start_func = start_func;
+ tp_finish_func = finish_func;
+ tp_hooks_user_data = user_data;
+}
+
+/**
+ * mono_install_threadpool_item_hooks
+ * @begin_func: the function to be called before a threadpool work item processing starts.
+ * @end_func: the function to be called after a threadpool work item is finished.
+ * @user_data: argument passed to @begin_func and @end_func.
+ *
+ * The calls will be made from the thread itself and from the same AppDomain
+ * where the work item was executed.
+ *
+ */
+void
+mono_install_threadpool_item_hooks (MonoThreadPoolItemFunc begin_func, MonoThreadPoolItemFunc end_func, gpointer user_data)
+{
+ tp_item_begin_func = begin_func;
+ tp_item_end_func = end_func;
+ tp_item_user_data = user_data;
+}
+