[threadpool-io] Move the states table to the selector thread exclusively
authorLudovic Henry <ludovic@xamarin.com>
Wed, 12 Aug 2015 15:24:12 +0000 (12:24 -0300)
committerRodrigo Kumpera <kumpera@gmail.com>
Mon, 24 Aug 2015 20:03:52 +0000 (16:03 -0400)
This greatly simplify the model of the states as we do not need to worry about modifications in different threads.

mono/metadata/threadpool-ms-io.c

index 6969e90a6761d631c5b0938ee86ef8b5b9390c84..2471872cd617336c86a24d5a40fc3571b4c62702 100644 (file)
@@ -42,6 +42,8 @@ typedef struct {
 #include "threadpool-ms-io-kqueue.c"
 #include "threadpool-ms-io-poll.c"
 
+#define UPDATES_CAPACITY 128
+
 /* Keep in sync with System.Net.Sockets.Socket.SocketOperation */
 enum {
        AIO_OP_FIRST,
@@ -62,23 +64,42 @@ enum {
        AIO_OP_LAST
 };
 
+typedef enum {
+       UPDATE_EMPTY = 0,
+       UPDATE_ADD,
+       UPDATE_REMOVE_SOCKET,
+       UPDATE_REMOVE_DOMAIN,
+} ThreadPoolIOUpdateType;
+
 typedef struct {
        gint fd;
        MonoSocketAsyncResult *sockares;
-} ThreadPoolIOUpdate;
+} ThreadPoolIOUpdate_Add;
 
 typedef struct {
-       ThreadPoolIOBackend backend;
+       gint fd;
+} ThreadPoolIOUpdate_RemoveSocket;
 
-       mono_mutex_t lock;
+typedef struct {
+       MonoDomain *domain;
+} ThreadPoolIOUpdate_RemoveDomain;
 
-       mono_cond_t updates_signal;
+typedef struct {
+       ThreadPoolIOUpdateType type;
+       union {
+               ThreadPoolIOUpdate_Add add;
+               ThreadPoolIOUpdate_RemoveSocket remove_socket;
+               ThreadPoolIOUpdate_RemoveDomain remove_domain;
+       } data;
+} ThreadPoolIOUpdate;
 
-       MonoGHashTable *states;
+typedef struct {
+       ThreadPoolIOBackend backend;
 
-       ThreadPoolIOUpdate *updates;
-       guint updates_size;
-       guint updates_capacity;
+       ThreadPoolIOUpdate updates [UPDATES_CAPACITY];
+       gint updates_size;
+       mono_mutex_t updates_lock;
+       mono_cond_t updates_cond;
 
 #if !defined(HOST_WIN32)
        gint wakeup_pipes [2];
@@ -141,130 +162,12 @@ get_events (MonoMList *list)
        MonoMList *current;
        gint events = 0;
 
-       for (current = list; current; current = mono_mlist_next (current)) {
-               MonoSocketAsyncResult *ares = (MonoSocketAsyncResult*) mono_mlist_get_data (current);
-               if (ares)
-                       events |= get_events_from_sockares (ares);
-       }
+       for (current = list; current; current = mono_mlist_next (current))
+               events |= get_events_from_sockares ((MonoSocketAsyncResult*) mono_mlist_get_data (current));
 
        return events;
 }
 
-static void
-selector_thread_wakeup (void);
-
-/*
- * If sockares is NULL, then it means we want to delete the corresponding fd
- */
-static void
-update_add (gint fd, MonoSocketAsyncResult *sockares)
-{
-       ThreadPoolIOUpdate *update;
-
-       mono_mutex_lock (&threadpool_io->lock);
-
-       threadpool_io->updates_size += 1;
-       if (threadpool_io->updates_size > threadpool_io->updates_capacity) {
-               ThreadPoolIOUpdate *updates_new, *updates_old;
-               gint updates_new_capacity, updates_old_capacity;
-
-               updates_old_capacity = threadpool_io->updates_capacity;
-               updates_new_capacity = updates_old_capacity + 16;
-
-               updates_old = threadpool_io->updates;
-               updates_new = mono_gc_alloc_fixed (sizeof (ThreadPoolIOUpdate) * updates_new_capacity, MONO_GC_DESCRIPTOR_NULL);
-               g_assert (updates_new);
-
-               if (updates_old)
-                       memcpy (updates_new, updates_old, sizeof (ThreadPoolIOUpdate) * updates_old_capacity);
-
-               threadpool_io->updates = updates_new;
-               threadpool_io->updates_capacity = updates_new_capacity;
-
-               if (updates_old)
-                       mono_gc_free_fixed (updates_old);
-       }
-
-       update = &threadpool_io->updates [threadpool_io->updates_size - 1];
-       update->fd = fd;
-       update->sockares = sockares;
-
-       selector_thread_wakeup ();
-
-       MONO_PREPARE_BLOCKING;
-       mono_cond_wait (&threadpool_io->updates_signal, &threadpool_io->lock);
-       MONO_FINISH_BLOCKING;
-
-       mono_mutex_unlock (&threadpool_io->lock);
-}
-
-static void
-update_drain (void (*callback) (gint fd, gint events, gboolean is_new))
-{
-       gint i;
-
-       mono_mutex_lock (&threadpool_io->lock);
-
-       for (i = 0; i < threadpool_io->updates_size; ++i) {
-               ThreadPoolIOUpdate *update;
-               MonoMList *list = NULL;
-               gpointer k;
-               gboolean is_new;
-
-               update = &threadpool_io->updates [i];
-
-               is_new = !mono_g_hash_table_lookup_extended (threadpool_io->states, GINT_TO_POINTER (update->fd), &k, (gpointer*) &list);
-
-               if (!update->sockares) {
-                       callback (update->fd, 0, is_new);
-               } else {
-                       list = mono_mlist_append (list, (MonoObject*) update->sockares);
-                       mono_g_hash_table_replace (threadpool_io->states, update->sockares->handle, list);
-
-                       callback (update->fd, get_events (list), is_new);
-               }
-       }
-
-       mono_cond_broadcast (&threadpool_io->updates_signal);
-
-       if (threadpool_io->updates_size > 0) {
-               ThreadPoolIOUpdate *updates_old;
-
-               threadpool_io->updates_size = 0;
-               threadpool_io->updates_capacity = 16;
-
-               updates_old = threadpool_io->updates;
-
-               threadpool_io->updates = mono_gc_alloc_fixed (sizeof (ThreadPoolIOUpdate) * threadpool_io->updates_capacity, MONO_GC_DESCRIPTOR_NULL);
-               g_assert (threadpool_io->updates);
-
-               mono_gc_free_fixed (updates_old);
-       }
-
-       mono_mutex_unlock (&threadpool_io->lock);
-}
-
-static void
-update_remove (gboolean (*predicate) (ThreadPoolIOUpdate *update, gpointer user_data), gpointer user_data)
-{
-       gint i;
-
-       mono_mutex_lock (&threadpool_io->lock);
-
-       for (i = 0; i < threadpool_io->updates_size; ++i) {
-               if (predicate (&threadpool_io->updates [i], user_data)) {
-                       if (i < threadpool_io->updates_size - 1)
-                               memmove (threadpool_io->updates + i, threadpool_io->updates + i + 1, sizeof (ThreadPoolIOUpdate) * threadpool_io->updates_size - i - 1);
-                       memset (threadpool_io->updates + threadpool_io->updates_size - 1, 0, sizeof (ThreadPoolIOUpdate));
-
-                       threadpool_io->updates_size --;
-                       i --;
-               }
-       }
-
-       mono_mutex_unlock (&threadpool_io->lock);
-}
-
 static void
 selector_thread_wakeup (void)
 {
@@ -321,9 +224,59 @@ selector_thread_wakeup_drain_pipes (void)
        }
 }
 
+typedef struct {
+       MonoDomain *domain;
+       MonoGHashTable *states;
+} FilterSockaresForDomainData;
+
+static void
+filter_sockares_for_domain (gpointer key, gpointer value, gpointer user_data)
+{
+       FilterSockaresForDomainData *data;
+       MonoMList *list = value, *element;
+       MonoDomain *domain;
+       MonoGHashTable *states;
+
+       g_assert (user_data);
+       data = user_data;
+       domain = data->domain;
+       states = data->states;
+
+       for (element = list; element; element = mono_mlist_next (element)) {
+               MonoSocketAsyncResult *sockares = (MonoSocketAsyncResult*) mono_mlist_get_data (element);
+               if (mono_object_domain (sockares) == domain)
+                       mono_mlist_set_data (element, NULL);
+       }
+
+       /* we skip all the first elements which are NULL */
+       for (; list; list = mono_mlist_next (list)) {
+               if (mono_mlist_get_data (list))
+                       break;
+       }
+
+       if (list) {
+               g_assert (mono_mlist_get_data (list));
+
+               /* we delete all the NULL elements after the first one */
+               for (element = list; element;) {
+                       MonoMList *next;
+                       if (!(next = mono_mlist_next (element)))
+                               break;
+                       if (mono_mlist_get_data (next))
+                               element = next;
+                       else
+                               mono_mlist_set_next (element, mono_mlist_next (next));
+               }
+       }
+
+       mono_g_hash_table_replace (states, key, list);
+}
+
 static void
 selector_thread (gpointer data)
 {
+       MonoGHashTable *states;
+
        io_selector_running = TRUE;
 
        if (mono_runtime_is_shutting_down ()) {
@@ -331,16 +284,109 @@ selector_thread (gpointer data)
                return;
        }
 
-       mono_mutex_lock (&threadpool_io->lock);
+       states = mono_g_hash_table_new_type (g_direct_hash, g_direct_equal, MONO_HASH_VALUE_GC);
 
        for (;;) {
-               guint i;
+               guint i, j;
                guint max;
                gint ready = 0;
 
-               update_drain (threadpool_io->backend.register_fd);
+               mono_mutex_lock (&threadpool_io->updates_lock);
+
+               for (i = 0; i < threadpool_io->updates_size; ++i) {
+                       ThreadPoolIOUpdate *update = &threadpool_io->updates [i];
 
-               mono_mutex_unlock (&threadpool_io->lock);
+                       switch (update->type) {
+                       case UPDATE_EMPTY:
+                               break;
+                       case UPDATE_ADD: {
+                               gint fd;
+                               gpointer k;
+                               gboolean exists;
+                               MonoMList *list = NULL;
+                               MonoSocketAsyncResult *sockares;
+
+                               fd = update->data.add.fd;
+                               g_assert (fd >= 0);
+
+                               sockares = update->data.add.sockares;
+                               g_assert (sockares);
+
+                               exists = mono_g_hash_table_lookup_extended (states, GINT_TO_POINTER (fd), &k, (gpointer*) &list);
+                               list = mono_mlist_append (list, (MonoObject*) sockares);
+                               mono_g_hash_table_replace (states, sockares->handle, list);
+
+                               threadpool_io->backend.register_fd (fd, get_events (list), !exists);
+
+                               break;
+                       }
+                       case UPDATE_REMOVE_SOCKET: {
+                               gint fd;
+                               gpointer k;
+                               MonoMList *list = NULL;
+
+                               fd = update->data.remove_socket.fd;
+                               g_assert (fd >= 0);
+
+                               if (mono_g_hash_table_lookup_extended (states, GINT_TO_POINTER (fd), &k, (gpointer*) &list)) {
+                                       mono_g_hash_table_remove (states, GINT_TO_POINTER (fd));
+
+                                       for (j = i + 1; j < threadpool_io->updates_size; ++j) {
+                                               ThreadPoolIOUpdate *update = &threadpool_io->updates [j];
+                                               if (update->type == UPDATE_ADD && update->data.add.fd == fd)
+                                                       memset (update, 0, sizeof (ThreadPoolIOUpdate));
+                                       }
+
+                                       for (; list; list = mono_mlist_remove_item (list, list)) {
+                                               MonoSocketAsyncResult *sockares = (MonoSocketAsyncResult*) mono_mlist_get_data (list);
+
+                                               switch (sockares->operation) {
+                                               case AIO_OP_RECEIVE:
+                                                       sockares->operation = AIO_OP_RECV_JUST_CALLBACK;
+                                                       break;
+                                               case AIO_OP_SEND:
+                                                       sockares->operation = AIO_OP_SEND_JUST_CALLBACK;
+                                                       break;
+                                               }
+
+                                               mono_threadpool_ms_enqueue_work_item (mono_object_domain (sockares), (MonoObject*) sockares);
+                                       }
+
+                                       threadpool_io->backend.register_fd (fd, 0, FALSE);
+                               }
+
+                               break;
+                       }
+                       case UPDATE_REMOVE_DOMAIN: {
+                               MonoDomain *domain;
+
+                               domain = update->data.remove_domain.domain;
+                               g_assert (domain);
+
+                               FilterSockaresForDomainData user_data = { .domain = domain, .states = states };
+                               mono_g_hash_table_foreach (states, filter_sockares_for_domain, &user_data);
+
+                               for (j = i + 1; j < threadpool_io->updates_size; ++j) {
+                                       ThreadPoolIOUpdate *update = &threadpool_io->updates [j];
+                                       if (update->type == UPDATE_ADD && mono_object_domain (update->data.add.sockares) == domain)
+                                               memset (update, 0, sizeof (ThreadPoolIOUpdate));
+                               }
+
+                               break;
+                       }
+                       default:
+                               g_assert_not_reached ();
+                       }
+               }
+
+               mono_cond_broadcast (&threadpool_io->updates_cond);
+
+               if (threadpool_io->updates_size > 0) {
+                       threadpool_io->updates_size = 0;
+                       memset (&threadpool_io->updates, 0, UPDATES_CAPACITY * sizeof (ThreadPoolIOUpdate));
+               }
+
+               mono_mutex_unlock (&threadpool_io->updates_lock);
 
                mono_gc_set_skip_thread (TRUE);
 
@@ -348,8 +394,6 @@ selector_thread (gpointer data)
 
                mono_gc_set_skip_thread (FALSE);
 
-               mono_mutex_lock (&threadpool_io->lock);
-
                if (ready == -1 || mono_runtime_is_shutting_down ())
                        break;
 
@@ -368,7 +412,7 @@ selector_thread (gpointer data)
                                MonoMList *list = NULL;
                                gpointer k;
 
-                               if (mono_g_hash_table_lookup_extended (threadpool_io->states, GINT_TO_POINTER (fd), &k, (gpointer*) &list)) {
+                               if (mono_g_hash_table_lookup_extended (states, GINT_TO_POINTER (fd), &k, (gpointer*) &list)) {
                                        if (list && (events & MONO_POLLIN) != 0) {
                                                MonoSocketAsyncResult *sockares = get_sockares_for_event (&list, MONO_POLLIN);
                                                if (sockares)
@@ -381,9 +425,9 @@ selector_thread (gpointer data)
                                        }
 
                                        if (!list)
-                                               mono_g_hash_table_remove (threadpool_io->states, GINT_TO_POINTER (fd));
+                                               mono_g_hash_table_remove (states, GINT_TO_POINTER (fd));
                                        else
-                                               mono_g_hash_table_replace (threadpool_io->states, GINT_TO_POINTER (fd), list);
+                                               mono_g_hash_table_replace (states, GINT_TO_POINTER (fd), list);
 
                                        threadpool_io->backend.register_fd (fd, get_events (list), FALSE);
                                }
@@ -393,11 +437,32 @@ selector_thread (gpointer data)
                }
        }
 
-       mono_mutex_unlock (&threadpool_io->lock);
+       mono_g_hash_table_destroy (states);
 
        io_selector_running = FALSE;
 }
 
+/* Locking: threadpool_io->updates_lock must be held */
+static ThreadPoolIOUpdate*
+update_get_new (void)
+{
+       ThreadPoolIOUpdate *update = NULL;
+       g_assert (threadpool_io->updates_size <= UPDATES_CAPACITY);
+
+       while (threadpool_io->updates_size == UPDATES_CAPACITY) {
+               /* we wait for updates to be applied in the selector_thread and we loop
+                * as long as none are available. if it happends too much, then we need
+                * to increase UPDATES_CAPACITY */
+               mono_cond_wait (&threadpool_io->updates_cond, &threadpool_io->updates_lock);
+       }
+
+       g_assert (threadpool_io->updates_size < UPDATES_CAPACITY);
+
+       update = &threadpool_io->updates [threadpool_io->updates_size ++];
+
+       return update;
+}
+
 static void
 wakeup_pipes_init (void)
 {
@@ -462,16 +527,11 @@ initialize (void)
        threadpool_io = g_new0 (ThreadPoolIO, 1);
        g_assert (threadpool_io);
 
-       mono_mutex_init_recursive (&threadpool_io->lock);
-
-       mono_cond_init (&threadpool_io->updates_signal, NULL);
+       mono_mutex_init_recursive (&threadpool_io->updates_lock);
+       mono_cond_init (&threadpool_io->updates_cond, NULL);
+       mono_gc_register_root ((void*)&threadpool_io->updates [0], sizeof (threadpool_io->updates), MONO_GC_DESCRIPTOR_NULL);
 
-       threadpool_io->states = mono_g_hash_table_new_type (g_direct_hash, g_direct_equal, MONO_HASH_VALUE_GC);
-       MONO_GC_REGISTER_ROOT_FIXED (threadpool_io->states);
-
-       threadpool_io->updates = NULL;
        threadpool_io->updates_size = 0;
-       threadpool_io->updates_capacity = 0;
 
        threadpool_io->backend = backend_poll;
        if (g_getenv ("MONO_ENABLE_AIO") != NULL) {
@@ -502,15 +562,8 @@ cleanup (void)
        while (io_selector_running)
                g_usleep (1000);
 
-       mono_mutex_destroy (&threadpool_io->lock);
-
-       mono_cond_destroy (&threadpool_io->updates_signal);
-
-       MONO_GC_UNREGISTER_ROOT (threadpool_io->states);
-       mono_g_hash_table_destroy (threadpool_io->states);
-
-       if (threadpool_io->updates)
-               mono_gc_free_fixed (threadpool_io->updates);
+       mono_mutex_destroy (&threadpool_io->updates_lock);
+       mono_cond_destroy (&threadpool_io->updates_cond);
 
        threadpool_io->backend.cleanup ();
 
@@ -577,111 +630,77 @@ mono_threadpool_ms_io_cleanup (void)
 MonoAsyncResult *
 mono_threadpool_ms_io_add (MonoAsyncResult *ares, MonoSocketAsyncResult *sockares)
 {
+       ThreadPoolIOUpdate *update;
+
        g_assert (ares);
        g_assert (sockares);
 
        if (mono_runtime_is_shutting_down ())
                return NULL;
+       if (mono_domain_is_unloading (mono_object_domain (sockares)))
+               return NULL;
 
        mono_lazy_initialize (&io_status, initialize);
 
        MONO_OBJECT_SETREF (sockares, ares, ares);
 
-       update_add (GPOINTER_TO_INT (sockares->handle), sockares);
+       mono_mutex_lock (&threadpool_io->updates_lock);
 
-       return ares;
-}
+       update = update_get_new ();
+       update->type = UPDATE_ADD;
+       update->data.add.fd = GPOINTER_TO_INT (sockares->handle);
+       update->data.add.sockares = sockares;
+       mono_memory_barrier (); /* Ensure this is safely published before we wake up the selector */
 
-static gboolean
-remove_update_for_socket (ThreadPoolIOUpdate *update, gpointer user_data)
-{
-       if (!update->sockares)
-               return FALSE;
+       selector_thread_wakeup ();
+
+       mono_mutex_unlock (&threadpool_io->updates_lock);
 
-       return GPOINTER_TO_INT (update->sockares->handle) == GPOINTER_TO_INT (user_data);
+       return ares;
 }
 
 void
 mono_threadpool_ms_io_remove_socket (int fd)
 {
-       MonoMList *list = NULL;
-       gpointer k;
+       ThreadPoolIOUpdate *update;
 
        if (!mono_lazy_is_initialized (&io_status))
                return;
 
-       mono_mutex_lock (&threadpool_io->lock);
-
-       g_assert (threadpool_io->states);
+       mono_mutex_lock (&threadpool_io->updates_lock);
 
-       if (mono_g_hash_table_lookup_extended (threadpool_io->states, GINT_TO_POINTER (fd), &k, (gpointer*) &list))
-               mono_g_hash_table_remove (threadpool_io->states, GINT_TO_POINTER (fd));
+       update = update_get_new ();
+       update->type = UPDATE_REMOVE_SOCKET;
+       update->data.add.fd = fd;
+       mono_memory_barrier (); /* Ensure this is safely published before we wake up the selector */
 
-       update_remove (remove_update_for_socket, GINT_TO_POINTER (fd));
-
-       mono_mutex_unlock (&threadpool_io->lock);
-
-       for (; list; list = mono_mlist_remove_item (list, list)) {
-               MonoSocketAsyncResult *sockares = (MonoSocketAsyncResult*) mono_mlist_get_data (list);
-
-               if (!sockares)
-                       continue;
-
-               switch (sockares->operation) {
-               case AIO_OP_RECEIVE:
-                       sockares->operation = AIO_OP_RECV_JUST_CALLBACK;
-                       break;
-               case AIO_OP_SEND:
-                       sockares->operation = AIO_OP_SEND_JUST_CALLBACK;
-                       break;
-               }
-
-               mono_threadpool_ms_enqueue_work_item (((MonoObject*) sockares)->vtable->domain, (MonoObject*) sockares);
-       }
-
-       update_add (fd, NULL);
-}
-
-static gboolean
-remove_sockstate_for_domain (gpointer key, gpointer value, gpointer user_data)
-{
-       MonoMList *list;
-       gboolean remove = FALSE;
-
-       for (list = value; list; list = mono_mlist_next (list)) {
-               MonoObject *data = mono_mlist_get_data (list);
-               if (mono_object_domain (data) == user_data) {
-                       remove = TRUE;
-                       mono_mlist_set_data (list, NULL);
-               }
-       }
-
-       //FIXME is there some sort of additional unregistration we need to perform here?
-       return remove;
-}
+       selector_thread_wakeup ();
 
-static gboolean
-remove_update_for_domain (ThreadPoolIOUpdate *update, gpointer user_data)
-{
-       if (!update->sockares)
-               return FALSE;
+       mono_cond_wait (&threadpool_io->updates_cond, &threadpool_io->updates_lock);
 
-       return mono_object_domain (update->sockares) == (MonoDomain*) user_data;
+       mono_mutex_unlock (&threadpool_io->updates_lock);
 }
 
 void
 mono_threadpool_ms_io_remove_domain_jobs (MonoDomain *domain)
 {
+       ThreadPoolIOUpdate *update;
+
        if (!mono_lazy_is_initialized (&io_status))
                return;
 
-       mono_mutex_lock (&threadpool_io->lock);
+       mono_mutex_lock (&threadpool_io->updates_lock);
 
-       mono_g_hash_table_foreach_remove (threadpool_io->states, remove_sockstate_for_domain, domain);
+       update = update_get_new ();
+       update->type = UPDATE_REMOVE_DOMAIN;
+       update->data.remove_domain.domain = domain;
+       mono_memory_barrier (); /* Ensure this is safely published before we wake up the selector */
+
+       selector_thread_wakeup ();
 
-       update_remove (remove_update_for_domain, domain);
+       mono_cond_wait (&threadpool_io->updates_cond, &threadpool_io->updates_lock);
 
-       mono_mutex_unlock (&threadpool_io->lock);
+       mono_mutex_unlock (&threadpool_io->updates_lock);
 }
 
 void