#include "threadpool-ms-io-kqueue.c"
#include "threadpool-ms-io-poll.c"
+#define UPDATES_CAPACITY 128
+
/* Keep in sync with System.Net.Sockets.Socket.SocketOperation */
enum {
AIO_OP_FIRST,
AIO_OP_LAST
};
+typedef enum {
+ UPDATE_EMPTY = 0,
+ UPDATE_ADD,
+ UPDATE_REMOVE_SOCKET,
+ UPDATE_REMOVE_DOMAIN,
+} ThreadPoolIOUpdateType;
+
typedef struct {
gint fd;
MonoSocketAsyncResult *sockares;
-} ThreadPoolIOUpdate;
+} ThreadPoolIOUpdate_Add;
typedef struct {
- ThreadPoolIOBackend backend;
+ gint fd;
+} ThreadPoolIOUpdate_RemoveSocket;
- mono_mutex_t lock;
+typedef struct {
+ MonoDomain *domain;
+} ThreadPoolIOUpdate_RemoveDomain;
- mono_cond_t updates_signal;
+typedef struct {
+ ThreadPoolIOUpdateType type;
+ union {
+ ThreadPoolIOUpdate_Add add;
+ ThreadPoolIOUpdate_RemoveSocket remove_socket;
+ ThreadPoolIOUpdate_RemoveDomain remove_domain;
+ } data;
+} ThreadPoolIOUpdate;
- MonoGHashTable *states;
+typedef struct {
+ ThreadPoolIOBackend backend;
- ThreadPoolIOUpdate *updates;
- guint updates_size;
- guint updates_capacity;
+ ThreadPoolIOUpdate updates [UPDATES_CAPACITY];
+ gint updates_size;
+ mono_mutex_t updates_lock;
+ mono_cond_t updates_cond;
#if !defined(HOST_WIN32)
gint wakeup_pipes [2];
MonoMList *current;
gint events = 0;
- for (current = list; current; current = mono_mlist_next (current)) {
- MonoSocketAsyncResult *ares = (MonoSocketAsyncResult*) mono_mlist_get_data (current);
- if (ares)
- events |= get_events_from_sockares (ares);
- }
+ for (current = list; current; current = mono_mlist_next (current))
+ events |= get_events_from_sockares ((MonoSocketAsyncResult*) mono_mlist_get_data (current));
return events;
}
-static void
-selector_thread_wakeup (void);
-
-/*
- * If sockares is NULL, then it means we want to delete the corresponding fd
- */
-static void
-update_add (gint fd, MonoSocketAsyncResult *sockares)
-{
- ThreadPoolIOUpdate *update;
-
- mono_mutex_lock (&threadpool_io->lock);
-
- threadpool_io->updates_size += 1;
- if (threadpool_io->updates_size > threadpool_io->updates_capacity) {
- ThreadPoolIOUpdate *updates_new, *updates_old;
- gint updates_new_capacity, updates_old_capacity;
-
- updates_old_capacity = threadpool_io->updates_capacity;
- updates_new_capacity = updates_old_capacity + 16;
-
- updates_old = threadpool_io->updates;
- updates_new = mono_gc_alloc_fixed (sizeof (ThreadPoolIOUpdate) * updates_new_capacity, MONO_GC_DESCRIPTOR_NULL);
- g_assert (updates_new);
-
- if (updates_old)
- memcpy (updates_new, updates_old, sizeof (ThreadPoolIOUpdate) * updates_old_capacity);
-
- threadpool_io->updates = updates_new;
- threadpool_io->updates_capacity = updates_new_capacity;
-
- if (updates_old)
- mono_gc_free_fixed (updates_old);
- }
-
- update = &threadpool_io->updates [threadpool_io->updates_size - 1];
- update->fd = fd;
- update->sockares = sockares;
-
- selector_thread_wakeup ();
-
- MONO_PREPARE_BLOCKING;
- mono_cond_wait (&threadpool_io->updates_signal, &threadpool_io->lock);
- MONO_FINISH_BLOCKING;
-
- mono_mutex_unlock (&threadpool_io->lock);
-}
-
-static void
-update_drain (void (*callback) (gint fd, gint events, gboolean is_new))
-{
- gint i;
-
- mono_mutex_lock (&threadpool_io->lock);
-
- for (i = 0; i < threadpool_io->updates_size; ++i) {
- ThreadPoolIOUpdate *update;
- MonoMList *list = NULL;
- gpointer k;
- gboolean is_new;
-
- update = &threadpool_io->updates [i];
-
- is_new = !mono_g_hash_table_lookup_extended (threadpool_io->states, GINT_TO_POINTER (update->fd), &k, (gpointer*) &list);
-
- if (!update->sockares) {
- callback (update->fd, 0, is_new);
- } else {
- list = mono_mlist_append (list, (MonoObject*) update->sockares);
- mono_g_hash_table_replace (threadpool_io->states, update->sockares->handle, list);
-
- callback (update->fd, get_events (list), is_new);
- }
- }
-
- mono_cond_broadcast (&threadpool_io->updates_signal);
-
- if (threadpool_io->updates_size > 0) {
- ThreadPoolIOUpdate *updates_old;
-
- threadpool_io->updates_size = 0;
- threadpool_io->updates_capacity = 16;
-
- updates_old = threadpool_io->updates;
-
- threadpool_io->updates = mono_gc_alloc_fixed (sizeof (ThreadPoolIOUpdate) * threadpool_io->updates_capacity, MONO_GC_DESCRIPTOR_NULL);
- g_assert (threadpool_io->updates);
-
- mono_gc_free_fixed (updates_old);
- }
-
- mono_mutex_unlock (&threadpool_io->lock);
-}
-
-static void
-update_remove (gboolean (*predicate) (ThreadPoolIOUpdate *update, gpointer user_data), gpointer user_data)
-{
- gint i;
-
- mono_mutex_lock (&threadpool_io->lock);
-
- for (i = 0; i < threadpool_io->updates_size; ++i) {
- if (predicate (&threadpool_io->updates [i], user_data)) {
- if (i < threadpool_io->updates_size - 1)
- memmove (threadpool_io->updates + i, threadpool_io->updates + i + 1, sizeof (ThreadPoolIOUpdate) * threadpool_io->updates_size - i - 1);
- memset (threadpool_io->updates + threadpool_io->updates_size - 1, 0, sizeof (ThreadPoolIOUpdate));
-
- threadpool_io->updates_size --;
- i --;
- }
- }
-
- mono_mutex_unlock (&threadpool_io->lock);
-}
-
static void
selector_thread_wakeup (void)
{
}
}
+typedef struct {
+ MonoDomain *domain;
+ MonoGHashTable *states;
+} FilterSockaresForDomainData;
+
+static void
+filter_sockares_for_domain (gpointer key, gpointer value, gpointer user_data)
+{
+ FilterSockaresForDomainData *data;
+ MonoMList *list = value, *element;
+ MonoDomain *domain;
+ MonoGHashTable *states;
+
+ g_assert (user_data);
+ data = user_data;
+ domain = data->domain;
+ states = data->states;
+
+ for (element = list; element; element = mono_mlist_next (element)) {
+ MonoSocketAsyncResult *sockares = (MonoSocketAsyncResult*) mono_mlist_get_data (element);
+ if (mono_object_domain (sockares) == domain)
+ mono_mlist_set_data (element, NULL);
+ }
+
+ /* we skip all the first elements which are NULL */
+ for (; list; list = mono_mlist_next (list)) {
+ if (mono_mlist_get_data (list))
+ break;
+ }
+
+ if (list) {
+ g_assert (mono_mlist_get_data (list));
+
+ /* we delete all the NULL elements after the first one */
+ for (element = list; element;) {
+ MonoMList *next;
+ if (!(next = mono_mlist_next (element)))
+ break;
+ if (mono_mlist_get_data (next))
+ element = next;
+ else
+ mono_mlist_set_next (element, mono_mlist_next (next));
+ }
+ }
+
+ mono_g_hash_table_replace (states, key, list);
+}
+
static void
selector_thread (gpointer data)
{
+ MonoGHashTable *states;
+
io_selector_running = TRUE;
if (mono_runtime_is_shutting_down ()) {
return;
}
- mono_mutex_lock (&threadpool_io->lock);
+ states = mono_g_hash_table_new_type (g_direct_hash, g_direct_equal, MONO_HASH_VALUE_GC);
for (;;) {
- guint i;
+ guint i, j;
guint max;
gint ready = 0;
- update_drain (threadpool_io->backend.register_fd);
+ mono_mutex_lock (&threadpool_io->updates_lock);
+
+ for (i = 0; i < threadpool_io->updates_size; ++i) {
+ ThreadPoolIOUpdate *update = &threadpool_io->updates [i];
- mono_mutex_unlock (&threadpool_io->lock);
+ switch (update->type) {
+ case UPDATE_EMPTY:
+ break;
+ case UPDATE_ADD: {
+ gint fd;
+ gpointer k;
+ gboolean exists;
+ MonoMList *list = NULL;
+ MonoSocketAsyncResult *sockares;
+
+ fd = update->data.add.fd;
+ g_assert (fd >= 0);
+
+ sockares = update->data.add.sockares;
+ g_assert (sockares);
+
+ exists = mono_g_hash_table_lookup_extended (states, GINT_TO_POINTER (fd), &k, (gpointer*) &list);
+ list = mono_mlist_append (list, (MonoObject*) sockares);
+ mono_g_hash_table_replace (states, sockares->handle, list);
+
+ threadpool_io->backend.register_fd (fd, get_events (list), !exists);
+
+ break;
+ }
+ case UPDATE_REMOVE_SOCKET: {
+ gint fd;
+ gpointer k;
+ MonoMList *list = NULL;
+
+ fd = update->data.remove_socket.fd;
+ g_assert (fd >= 0);
+
+ if (mono_g_hash_table_lookup_extended (states, GINT_TO_POINTER (fd), &k, (gpointer*) &list)) {
+ mono_g_hash_table_remove (states, GINT_TO_POINTER (fd));
+
+ for (j = i + 1; j < threadpool_io->updates_size; ++j) {
+ ThreadPoolIOUpdate *update = &threadpool_io->updates [j];
+ if (update->type == UPDATE_ADD && update->data.add.fd == fd)
+ memset (update, 0, sizeof (ThreadPoolIOUpdate));
+ }
+
+ for (; list; list = mono_mlist_remove_item (list, list)) {
+ MonoSocketAsyncResult *sockares = (MonoSocketAsyncResult*) mono_mlist_get_data (list);
+
+ switch (sockares->operation) {
+ case AIO_OP_RECEIVE:
+ sockares->operation = AIO_OP_RECV_JUST_CALLBACK;
+ break;
+ case AIO_OP_SEND:
+ sockares->operation = AIO_OP_SEND_JUST_CALLBACK;
+ break;
+ }
+
+ mono_threadpool_ms_enqueue_work_item (mono_object_domain (sockares), (MonoObject*) sockares);
+ }
+
+ threadpool_io->backend.register_fd (fd, 0, FALSE);
+ }
+
+ break;
+ }
+ case UPDATE_REMOVE_DOMAIN: {
+ MonoDomain *domain;
+
+ domain = update->data.remove_domain.domain;
+ g_assert (domain);
+
+ FilterSockaresForDomainData user_data = { .domain = domain, .states = states };
+ mono_g_hash_table_foreach (states, filter_sockares_for_domain, &user_data);
+
+ for (j = i + 1; j < threadpool_io->updates_size; ++j) {
+ ThreadPoolIOUpdate *update = &threadpool_io->updates [j];
+ if (update->type == UPDATE_ADD && mono_object_domain (update->data.add.sockares) == domain)
+ memset (update, 0, sizeof (ThreadPoolIOUpdate));
+ }
+
+ break;
+ }
+ default:
+ g_assert_not_reached ();
+ }
+ }
+
+ mono_cond_broadcast (&threadpool_io->updates_cond);
+
+ if (threadpool_io->updates_size > 0) {
+ threadpool_io->updates_size = 0;
+ memset (&threadpool_io->updates, 0, UPDATES_CAPACITY * sizeof (ThreadPoolIOUpdate));
+ }
+
+ mono_mutex_unlock (&threadpool_io->updates_lock);
mono_gc_set_skip_thread (TRUE);
mono_gc_set_skip_thread (FALSE);
- mono_mutex_lock (&threadpool_io->lock);
-
if (ready == -1 || mono_runtime_is_shutting_down ())
break;
MonoMList *list = NULL;
gpointer k;
- if (mono_g_hash_table_lookup_extended (threadpool_io->states, GINT_TO_POINTER (fd), &k, (gpointer*) &list)) {
+ if (mono_g_hash_table_lookup_extended (states, GINT_TO_POINTER (fd), &k, (gpointer*) &list)) {
if (list && (events & MONO_POLLIN) != 0) {
MonoSocketAsyncResult *sockares = get_sockares_for_event (&list, MONO_POLLIN);
if (sockares)
}
if (!list)
- mono_g_hash_table_remove (threadpool_io->states, GINT_TO_POINTER (fd));
+ mono_g_hash_table_remove (states, GINT_TO_POINTER (fd));
else
- mono_g_hash_table_replace (threadpool_io->states, GINT_TO_POINTER (fd), list);
+ mono_g_hash_table_replace (states, GINT_TO_POINTER (fd), list);
threadpool_io->backend.register_fd (fd, get_events (list), FALSE);
}
}
}
- mono_mutex_unlock (&threadpool_io->lock);
+ mono_g_hash_table_destroy (states);
io_selector_running = FALSE;
}
+/* Locking: threadpool_io->updates_lock must be held */
+static ThreadPoolIOUpdate*
+update_get_new (void)
+{
+ ThreadPoolIOUpdate *update = NULL;
+ g_assert (threadpool_io->updates_size <= UPDATES_CAPACITY);
+
+ while (threadpool_io->updates_size == UPDATES_CAPACITY) {
+ /* we wait for updates to be applied in the selector_thread and we loop
+ * as long as none are available. if it happends too much, then we need
+ * to increase UPDATES_CAPACITY */
+ mono_cond_wait (&threadpool_io->updates_cond, &threadpool_io->updates_lock);
+ }
+
+ g_assert (threadpool_io->updates_size < UPDATES_CAPACITY);
+
+ update = &threadpool_io->updates [threadpool_io->updates_size ++];
+
+ return update;
+}
+
static void
wakeup_pipes_init (void)
{
threadpool_io = g_new0 (ThreadPoolIO, 1);
g_assert (threadpool_io);
- mono_mutex_init_recursive (&threadpool_io->lock);
-
- mono_cond_init (&threadpool_io->updates_signal, NULL);
+ mono_mutex_init_recursive (&threadpool_io->updates_lock);
+ mono_cond_init (&threadpool_io->updates_cond, NULL);
+ mono_gc_register_root ((void*)&threadpool_io->updates [0], sizeof (threadpool_io->updates), MONO_GC_DESCRIPTOR_NULL);
- threadpool_io->states = mono_g_hash_table_new_type (g_direct_hash, g_direct_equal, MONO_HASH_VALUE_GC);
- MONO_GC_REGISTER_ROOT_FIXED (threadpool_io->states);
-
- threadpool_io->updates = NULL;
threadpool_io->updates_size = 0;
- threadpool_io->updates_capacity = 0;
threadpool_io->backend = backend_poll;
if (g_getenv ("MONO_ENABLE_AIO") != NULL) {
while (io_selector_running)
g_usleep (1000);
- mono_mutex_destroy (&threadpool_io->lock);
-
- mono_cond_destroy (&threadpool_io->updates_signal);
-
- MONO_GC_UNREGISTER_ROOT (threadpool_io->states);
- mono_g_hash_table_destroy (threadpool_io->states);
-
- if (threadpool_io->updates)
- mono_gc_free_fixed (threadpool_io->updates);
+ mono_mutex_destroy (&threadpool_io->updates_lock);
+ mono_cond_destroy (&threadpool_io->updates_cond);
threadpool_io->backend.cleanup ();
MonoAsyncResult *
mono_threadpool_ms_io_add (MonoAsyncResult *ares, MonoSocketAsyncResult *sockares)
{
+ ThreadPoolIOUpdate *update;
+
g_assert (ares);
g_assert (sockares);
if (mono_runtime_is_shutting_down ())
return NULL;
+ if (mono_domain_is_unloading (mono_object_domain (sockares)))
+ return NULL;
mono_lazy_initialize (&io_status, initialize);
MONO_OBJECT_SETREF (sockares, ares, ares);
- update_add (GPOINTER_TO_INT (sockares->handle), sockares);
+ mono_mutex_lock (&threadpool_io->updates_lock);
- return ares;
-}
+ update = update_get_new ();
+ update->type = UPDATE_ADD;
+ update->data.add.fd = GPOINTER_TO_INT (sockares->handle);
+ update->data.add.sockares = sockares;
+ mono_memory_barrier (); /* Ensure this is safely published before we wake up the selector */
-static gboolean
-remove_update_for_socket (ThreadPoolIOUpdate *update, gpointer user_data)
-{
- if (!update->sockares)
- return FALSE;
+ selector_thread_wakeup ();
+
+ mono_mutex_unlock (&threadpool_io->updates_lock);
- return GPOINTER_TO_INT (update->sockares->handle) == GPOINTER_TO_INT (user_data);
+ return ares;
}
void
mono_threadpool_ms_io_remove_socket (int fd)
{
- MonoMList *list = NULL;
- gpointer k;
+ ThreadPoolIOUpdate *update;
if (!mono_lazy_is_initialized (&io_status))
return;
- mono_mutex_lock (&threadpool_io->lock);
-
- g_assert (threadpool_io->states);
+ mono_mutex_lock (&threadpool_io->updates_lock);
- if (mono_g_hash_table_lookup_extended (threadpool_io->states, GINT_TO_POINTER (fd), &k, (gpointer*) &list))
- mono_g_hash_table_remove (threadpool_io->states, GINT_TO_POINTER (fd));
+ update = update_get_new ();
+ update->type = UPDATE_REMOVE_SOCKET;
+ update->data.add.fd = fd;
+ mono_memory_barrier (); /* Ensure this is safely published before we wake up the selector */
- update_remove (remove_update_for_socket, GINT_TO_POINTER (fd));
-
- mono_mutex_unlock (&threadpool_io->lock);
-
- for (; list; list = mono_mlist_remove_item (list, list)) {
- MonoSocketAsyncResult *sockares = (MonoSocketAsyncResult*) mono_mlist_get_data (list);
-
- if (!sockares)
- continue;
-
- switch (sockares->operation) {
- case AIO_OP_RECEIVE:
- sockares->operation = AIO_OP_RECV_JUST_CALLBACK;
- break;
- case AIO_OP_SEND:
- sockares->operation = AIO_OP_SEND_JUST_CALLBACK;
- break;
- }
-
- mono_threadpool_ms_enqueue_work_item (((MonoObject*) sockares)->vtable->domain, (MonoObject*) sockares);
- }
-
- update_add (fd, NULL);
-}
-
-static gboolean
-remove_sockstate_for_domain (gpointer key, gpointer value, gpointer user_data)
-{
- MonoMList *list;
- gboolean remove = FALSE;
-
- for (list = value; list; list = mono_mlist_next (list)) {
- MonoObject *data = mono_mlist_get_data (list);
- if (mono_object_domain (data) == user_data) {
- remove = TRUE;
- mono_mlist_set_data (list, NULL);
- }
- }
-
- //FIXME is there some sort of additional unregistration we need to perform here?
- return remove;
-}
+ selector_thread_wakeup ();
-static gboolean
-remove_update_for_domain (ThreadPoolIOUpdate *update, gpointer user_data)
-{
- if (!update->sockares)
- return FALSE;
+ mono_cond_wait (&threadpool_io->updates_cond, &threadpool_io->updates_lock);
- return mono_object_domain (update->sockares) == (MonoDomain*) user_data;
+ mono_mutex_unlock (&threadpool_io->updates_lock);
}
void
mono_threadpool_ms_io_remove_domain_jobs (MonoDomain *domain)
{
+ ThreadPoolIOUpdate *update;
+
if (!mono_lazy_is_initialized (&io_status))
return;
- mono_mutex_lock (&threadpool_io->lock);
+ mono_mutex_lock (&threadpool_io->updates_lock);
- mono_g_hash_table_foreach_remove (threadpool_io->states, remove_sockstate_for_domain, domain);
+ update = update_get_new ();
+ update->type = UPDATE_REMOVE_DOMAIN;
+ update->data.remove_domain.domain = domain;
+ mono_memory_barrier (); /* Ensure this is safely published before we wake up the selector */
+
+ selector_thread_wakeup ();
- update_remove (remove_update_for_domain, domain);
+ mono_cond_wait (&threadpool_io->updates_cond, &threadpool_io->updates_lock);
- mono_mutex_unlock (&threadpool_io->lock);
+ mono_mutex_unlock (&threadpool_io->updates_lock);
}
void