[io-layer] Remove GetCurrentThreadId
[mono.git] / mono / metadata / threadpool-ms-io.c
index be74d429ab22aed4ff525790a22f084a8e9ed0f4..6742fddc151c9a3f3704815c1749f03a9493fb70 100644 (file)
 
 #include <mono/metadata/gc-internal.h>
 #include <mono/metadata/mono-mlist.h>
-#include <mono/metadata/threadpool-internals.h>
 #include <mono/metadata/threadpool-ms.h>
 #include <mono/metadata/threadpool-ms-io.h>
 #include <mono/utils/atomic.h>
-#include <mono/utils/mono-poll.h>
 #include <mono/utils/mono-threads.h>
-
-/* Keep in sync with System.Net.Sockets.Socket.SocketOperation */
-enum {
-       AIO_OP_FIRST,
-       AIO_OP_ACCEPT = 0,
-       AIO_OP_CONNECT,
-       AIO_OP_RECEIVE,
-       AIO_OP_RECEIVEFROM,
-       AIO_OP_SEND,
-       AIO_OP_SENDTO,
-       AIO_OP_RECV_JUST_CALLBACK,
-       AIO_OP_SEND_JUST_CALLBACK,
-       AIO_OP_READPIPE,
-       AIO_OP_CONSOLE2,
-       AIO_OP_DISCONNECT,
-       AIO_OP_ACCEPTRECEIVE,
-       AIO_OP_RECEIVE_BUFFERS,
-       AIO_OP_SEND_BUFFERS,
-       AIO_OP_LAST
-};
-
-typedef struct {
-       gint fd;
-       gint events;
-       gboolean is_new;
-} ThreadPoolIOUpdate;
+#include <mono/utils/mono-lazy-init.h>
+#include <mono/utils/mono-logger-internal.h>
 
 typedef struct {
        gboolean (*init) (gint wakeup_pipe_fd);
        void     (*cleanup) (void);
-       void     (*update_add) (ThreadPoolIOUpdate *update);
-       gint     (*event_wait) (void);
-       gint     (*event_max) (void);
-       gint     (*event_fd_at) (guint i);
-       gboolean (*event_create_sockares_at) (guint i, gint fd, MonoMList **list);
+       void     (*register_fd) (gint fd, gint events, gboolean is_new);
+       void     (*remove_fd) (gint fd);
+       gint     (*event_wait) (void (*callback) (gint fd, gint events, gpointer user_data), gpointer user_data);
 } ThreadPoolIOBackend;
 
-static int
-get_events_from_sockares (MonoSocketAsyncResult *ares)
-{
-       switch (ares->operation) {
-       case AIO_OP_ACCEPT:
-       case AIO_OP_RECEIVE:
-       case AIO_OP_RECV_JUST_CALLBACK:
-       case AIO_OP_RECEIVEFROM:
-       case AIO_OP_READPIPE:
-       case AIO_OP_ACCEPTRECEIVE:
-       case AIO_OP_RECEIVE_BUFFERS:
-               return MONO_POLLIN;
-       case AIO_OP_SEND:
-       case AIO_OP_SEND_JUST_CALLBACK:
-       case AIO_OP_SENDTO:
-       case AIO_OP_CONNECT:
-       case AIO_OP_SEND_BUFFERS:
-       case AIO_OP_DISCONNECT:
-               return MONO_POLLOUT;
-       default:
-               g_assert_not_reached ();
-       }
-}
-
-static MonoSocketAsyncResult*
-get_sockares_for_event (MonoMList **list, gint event)
-{
-       MonoSocketAsyncResult *state = NULL;
-       MonoMList *current;
-
-       g_assert (list);
+/* Keep in sync with System.IOOperation in mcs/class/System/System/IOSelector.cs */
+enum MonoIOOperation {
+       EVENT_IN   = 1 << 0,
+       EVENT_OUT  = 1 << 1,
+       EVENT_ERR  = 1 << 2, /* not in managed */
+};
 
-       for (current = *list; current; current = mono_mlist_next (current)) {
-               state = (MonoSocketAsyncResult*) mono_mlist_get_data (current);
-               if (get_events_from_sockares ((MonoSocketAsyncResult*) state) == event)
-                       break;
-               state = NULL;
-       }
+#include "threadpool-ms-io-epoll.c"
+#include "threadpool-ms-io-kqueue.c"
+#include "threadpool-ms-io-poll.c"
 
-       if (current)
-               *list = mono_mlist_remove_item (*list, current);
+#define UPDATES_CAPACITY 128
 
-       return state;
-}
+/* Keep in sync with System.IOSelectorJob in mcs/class/System/System/IOSelector.cs */
+struct _MonoIOSelectorJob {
+       MonoObject object;
+       gint32 operation;
+       MonoObject *callback;
+       MonoObject *state;
+};
 
-static gint
-get_events (MonoMList *list)
-{
-       MonoSocketAsyncResult *ares;
-       gint events = 0;
+typedef enum {
+       UPDATE_EMPTY = 0,
+       UPDATE_ADD,
+       UPDATE_REMOVE_SOCKET,
+       UPDATE_REMOVE_DOMAIN,
+} ThreadPoolIOUpdateType;
 
-       for (; list; list = mono_mlist_next (list))
-               if ((ares = (MonoSocketAsyncResult*) mono_mlist_get_data (list)))
-                       events |= get_events_from_sockares (ares);
+typedef struct {
+       gint fd;
+       MonoIOSelectorJob *job;
+} ThreadPoolIOUpdate_Add;
 
-       return events;
-}
+typedef struct {
+       gint fd;
+} ThreadPoolIOUpdate_RemoveSocket;
 
-#include "threadpool-ms-io-epoll.c"
-#include "threadpool-ms-io-kqueue.c"
-#include "threadpool-ms-io-poll.c"
+typedef struct {
+       MonoDomain *domain;
+} ThreadPoolIOUpdate_RemoveDomain;
 
 typedef struct {
-       MonoGHashTable *states;
-       mono_mutex_t states_lock;
+       ThreadPoolIOUpdateType type;
+       union {
+               ThreadPoolIOUpdate_Add add;
+               ThreadPoolIOUpdate_RemoveSocket remove_socket;
+               ThreadPoolIOUpdate_RemoveDomain remove_domain;
+       } data;
+} ThreadPoolIOUpdate;
 
+typedef struct {
        ThreadPoolIOBackend backend;
 
-       ThreadPoolIOUpdate *updates;
-       guint updates_size;
+       ThreadPoolIOUpdate updates [UPDATES_CAPACITY];
+       gint updates_size;
        mono_mutex_t updates_lock;
+       mono_cond_t updates_cond;
 
 #if !defined(HOST_WIN32)
        gint wakeup_pipes [2];
@@ -144,11 +102,42 @@ typedef struct {
 #endif
 } ThreadPoolIO;
 
-static gint32 io_status = STATUS_NOT_INITIALIZED;
-static gint32 io_thread_status = STATUS_NOT_INITIALIZED;
+static mono_lazy_init_t io_status = MONO_LAZY_INIT_STATUS_NOT_INITIALIZED;
+
+static gboolean io_selector_running = FALSE;
 
 static ThreadPoolIO* threadpool_io;
 
+static MonoIOSelectorJob*
+get_job_for_event (MonoMList **list, gint32 event)
+{
+       MonoMList *current;
+
+       g_assert (list);
+
+       for (current = *list; current; current = mono_mlist_next (current)) {
+               MonoIOSelectorJob *job = (MonoIOSelectorJob*) mono_mlist_get_data (current);
+               if (job->operation == event) {
+                       *list = mono_mlist_remove_item (*list, current);
+                       return job;
+               }
+       }
+
+       return NULL;
+}
+
+static gint
+get_operations_for_jobs (MonoMList *list)
+{
+       MonoMList *current;
+       gint operations = 0;
+
+       for (current = list; current; current = mono_mlist_next (current))
+               operations |= ((MonoIOSelectorJob*) mono_mlist_get_data (current))->operation;
+
+       return operations;
+}
+
 static void
 selector_thread_wakeup (void)
 {
@@ -205,68 +194,252 @@ selector_thread_wakeup_drain_pipes (void)
        }
 }
 
+typedef struct {
+       MonoDomain *domain;
+       MonoGHashTable *states;
+} FilterSockaresForDomainData;
+
+static void
+filter_jobs_for_domain (gpointer key, gpointer value, gpointer user_data)
+{
+       FilterSockaresForDomainData *data;
+       MonoMList *list = value, *element;
+       MonoDomain *domain;
+       MonoGHashTable *states;
+
+       g_assert (user_data);
+       data = user_data;
+       domain = data->domain;
+       states = data->states;
+
+       for (element = list; element; element = mono_mlist_next (element)) {
+               MonoIOSelectorJob *job = (MonoIOSelectorJob*) mono_mlist_get_data (element);
+               if (mono_object_domain (job) == domain)
+                       mono_mlist_set_data (element, NULL);
+       }
+
+       /* we skip all the first elements which are NULL */
+       for (; list; list = mono_mlist_next (list)) {
+               if (mono_mlist_get_data (list))
+                       break;
+       }
+
+       if (list) {
+               g_assert (mono_mlist_get_data (list));
+
+               /* we delete all the NULL elements after the first one */
+               for (element = list; element;) {
+                       MonoMList *next;
+                       if (!(next = mono_mlist_next (element)))
+                               break;
+                       if (mono_mlist_get_data (next))
+                               element = next;
+                       else
+                               mono_mlist_set_next (element, mono_mlist_next (next));
+               }
+       }
+
+       mono_g_hash_table_replace (states, key, list);
+}
+
+static void
+wait_callback (gint fd, gint events, gpointer user_data)
+{
+       if (mono_runtime_is_shutting_down ())
+               return;
+
+       if (fd == threadpool_io->wakeup_pipes [0]) {
+               mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: wke");
+               selector_thread_wakeup_drain_pipes ();
+       } else {
+               MonoGHashTable *states;
+               MonoMList *list = NULL;
+               gpointer k;
+               gboolean remove_fd = FALSE;
+               gint operations;
+
+               g_assert (user_data);
+               states = user_data;
+
+               mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: cal fd %3d, events = %2s | %2s | %3s",
+                       fd, (events & EVENT_IN) ? "RD" : "..", (events & EVENT_OUT) ? "WR" : "..", (events & EVENT_ERR) ? "ERR" : "...");
+
+               if (!mono_g_hash_table_lookup_extended (states, GINT_TO_POINTER (fd), &k, (gpointer*) &list))
+                       g_error ("wait_callback: fd %d not found in states table", fd);
+
+               if (list && (events & EVENT_IN) != 0) {
+                       MonoIOSelectorJob *job = get_job_for_event (&list, EVENT_IN);
+                       if (job)
+                               mono_threadpool_ms_enqueue_work_item (((MonoObject*) job)->vtable->domain, (MonoObject*) job);
+               }
+               if (list && (events & EVENT_OUT) != 0) {
+                       MonoIOSelectorJob *job = get_job_for_event (&list, EVENT_OUT);
+                       if (job)
+                               mono_threadpool_ms_enqueue_work_item (((MonoObject*) job)->vtable->domain, (MonoObject*) job);
+               }
+
+               remove_fd = (events & EVENT_ERR) == EVENT_ERR;
+               if (!remove_fd) {
+                       mono_g_hash_table_replace (states, GINT_TO_POINTER (fd), list);
+
+                       operations = get_operations_for_jobs (list);
+
+                       mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: res fd %3d, events = %2s | %2s | %2s",
+                               fd, (operations & EVENT_IN) ? "RD" : "..", (operations & EVENT_OUT) ? "WR" : "..");
+
+                       threadpool_io->backend.register_fd (fd, operations, FALSE);
+               } else {
+                       mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: err fd %d", fd);
+
+                       mono_g_hash_table_remove (states, GINT_TO_POINTER (fd));
+
+                       threadpool_io->backend.remove_fd (fd);
+               }
+       }
+}
+
 static void
 selector_thread (gpointer data)
 {
-       io_thread_status = STATUS_INITIALIZED;
+       MonoGHashTable *states;
 
-       for (;;) {
-               guint i;
-               guint max;
-               gint ready = 0;
+       io_selector_running = TRUE;
+
+       if (mono_runtime_is_shutting_down ()) {
+               io_selector_running = FALSE;
+               return;
+       }
 
-               mono_gc_set_skip_thread (TRUE);
+       states = mono_g_hash_table_new_type (g_direct_hash, g_direct_equal, MONO_HASH_VALUE_GC, MONO_ROOT_SOURCE_THREAD_POOL, "i/o thread pool states table");
+
+       for (;;) {
+               gint i, j;
+               gint res;
 
                mono_mutex_lock (&threadpool_io->updates_lock);
+
                for (i = 0; i < threadpool_io->updates_size; ++i) {
-                       threadpool_io->backend.update_add (&threadpool_io->updates [i]);
-               }
-               if (threadpool_io->updates_size > 0) {
-                       threadpool_io->updates_size = 0;
-                       threadpool_io->updates = g_renew (ThreadPoolIOUpdate, threadpool_io->updates, threadpool_io->updates_size);
-               }
-               mono_mutex_unlock (&threadpool_io->updates_lock);
+                       ThreadPoolIOUpdate *update = &threadpool_io->updates [i];
 
-               ready = threadpool_io->backend.event_wait ();
+                       switch (update->type) {
+                       case UPDATE_EMPTY:
+                               break;
+                       case UPDATE_ADD: {
+                               gint fd;
+                               gint operations;
+                               gpointer k;
+                               gboolean exists;
+                               MonoMList *list = NULL;
+                               MonoIOSelectorJob *job;
 
-               mono_gc_set_skip_thread (FALSE);
+                               fd = update->data.add.fd;
+                               g_assert (fd >= 0);
 
-               if (ready == -1 || mono_runtime_is_shutting_down ())
-                       break;
+                               job = update->data.add.job;
+                               g_assert (job);
 
-               max = threadpool_io->backend.event_max ();
+                               exists = mono_g_hash_table_lookup_extended (states, GINT_TO_POINTER (fd), &k, (gpointer*) &list);
+                               list = mono_mlist_append (list, (MonoObject*) job);
+                               mono_g_hash_table_replace (states, GINT_TO_POINTER (fd), list);
 
-               mono_mutex_lock (&threadpool_io->states_lock);
-               for (i = 0; i < max && ready > 0; ++i) {
-                       MonoMList *list;
-                       gboolean valid_fd;
-                       gint fd;
+                               operations = get_operations_for_jobs (list);
 
-                       fd = threadpool_io->backend.event_fd_at (i);
+                               mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: %3s fd %3d, operations = %2s | %2s | %2s",
+                                       exists ? "mod" : "add", fd, (operations & EVENT_IN) ? "RD" : "..", (operations & EVENT_OUT) ? "WR" : "..");
 
-                       if (fd == threadpool_io->wakeup_pipes [0]) {
-                               selector_thread_wakeup_drain_pipes ();
-                               ready -= 1;
-                               continue;
+                               threadpool_io->backend.register_fd (fd, operations, !exists);
+
+                               break;
                        }
+                       case UPDATE_REMOVE_SOCKET: {
+                               gint fd;
+                               gpointer k;
+                               MonoMList *list = NULL;
 
-                       list = mono_g_hash_table_lookup (threadpool_io->states, GINT_TO_POINTER (fd));
+                               fd = update->data.remove_socket.fd;
+                               g_assert (fd >= 0);
 
-                       valid_fd = threadpool_io->backend.event_create_sockares_at (i, fd, &list);
-                       if (!valid_fd)
-                               continue;
+                               if (mono_g_hash_table_lookup_extended (states, GINT_TO_POINTER (fd), &k, (gpointer*) &list)) {
+                                       mono_g_hash_table_remove (states, GINT_TO_POINTER (fd));
 
-                       if (list)
-                               mono_g_hash_table_replace (threadpool_io->states, GINT_TO_POINTER (fd), list);
-                       else
-                               mono_g_hash_table_remove (threadpool_io->states, GINT_TO_POINTER (fd));
+                                       for (j = i + 1; j < threadpool_io->updates_size; ++j) {
+                                               ThreadPoolIOUpdate *update = &threadpool_io->updates [j];
+                                               if (update->type == UPDATE_ADD && update->data.add.fd == fd)
+                                                       memset (update, 0, sizeof (ThreadPoolIOUpdate));
+                                       }
+
+                                       for (; list; list = mono_mlist_remove_item (list, list))
+                                               mono_threadpool_ms_enqueue_work_item (mono_object_domain (mono_mlist_get_data (list)), mono_mlist_get_data (list));
+
+                                       mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: del fd %3d", fd);
+                                       threadpool_io->backend.remove_fd (fd);
+                               }
+
+                               break;
+                       }
+                       case UPDATE_REMOVE_DOMAIN: {
+                               MonoDomain *domain;
+
+                               domain = update->data.remove_domain.domain;
+                               g_assert (domain);
 
-                       ready -= 1;
+                               FilterSockaresForDomainData user_data = { .domain = domain, .states = states };
+                               mono_g_hash_table_foreach (states, filter_jobs_for_domain, &user_data);
+
+                               for (j = i + 1; j < threadpool_io->updates_size; ++j) {
+                                       ThreadPoolIOUpdate *update = &threadpool_io->updates [j];
+                                       if (update->type == UPDATE_ADD && mono_object_domain (update->data.add.job) == domain)
+                                               memset (update, 0, sizeof (ThreadPoolIOUpdate));
+                               }
+
+                               break;
+                       }
+                       default:
+                               g_assert_not_reached ();
+                       }
                }
-               mono_mutex_unlock (&threadpool_io->states_lock);
+
+               mono_cond_broadcast (&threadpool_io->updates_cond);
+
+               if (threadpool_io->updates_size > 0) {
+                       threadpool_io->updates_size = 0;
+                       memset (&threadpool_io->updates, 0, UPDATES_CAPACITY * sizeof (ThreadPoolIOUpdate));
+               }
+
+               mono_mutex_unlock (&threadpool_io->updates_lock);
+
+               mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: wai");
+
+               res = threadpool_io->backend.event_wait (wait_callback, states);
+
+               if (res == -1 || mono_runtime_is_shutting_down ())
+                       break;
+       }
+
+       mono_g_hash_table_destroy (states);
+
+       io_selector_running = FALSE;
+}
+
+/* Locking: threadpool_io->updates_lock must be held */
+static ThreadPoolIOUpdate*
+update_get_new (void)
+{
+       ThreadPoolIOUpdate *update = NULL;
+       g_assert (threadpool_io->updates_size <= UPDATES_CAPACITY);
+
+       while (threadpool_io->updates_size == UPDATES_CAPACITY) {
+               /* we wait for updates to be applied in the selector_thread and we loop
+                * as long as none are available. if it happends too much, then we need
+                * to increase UPDATES_CAPACITY */
+               mono_cond_wait (&threadpool_io->updates_cond, &threadpool_io->updates_lock);
        }
 
-       io_thread_status = STATUS_CLEANED_UP;
+       g_assert (threadpool_io->updates_size < UPDATES_CAPACITY);
+
+       update = &threadpool_io->updates [threadpool_io->updates_size ++];
+
+       return update;
 }
 
 static void
@@ -327,85 +500,49 @@ wakeup_pipes_init (void)
 }
 
 static void
-ensure_initialized (void)
+initialize (void)
 {
-       if (io_status >= STATUS_INITIALIZED)
-               return;
-       if (io_status == STATUS_INITIALIZING || InterlockedCompareExchange (&io_status, STATUS_INITIALIZING, STATUS_NOT_INITIALIZED) != STATUS_NOT_INITIALIZED) {
-               while (io_status == STATUS_INITIALIZING)
-                       mono_thread_info_yield ();
-               g_assert (io_status >= STATUS_INITIALIZED);
-               return;
-       }
-
        g_assert (!threadpool_io);
        threadpool_io = g_new0 (ThreadPoolIO, 1);
        g_assert (threadpool_io);
 
-       threadpool_io->states = mono_g_hash_table_new_type (g_direct_hash, g_direct_equal, MONO_HASH_VALUE_GC);
-       MONO_GC_REGISTER_ROOT_FIXED (threadpool_io->states);
-       mono_mutex_init (&threadpool_io->states_lock);
+       mono_mutex_init_recursive (&threadpool_io->updates_lock);
+       mono_cond_init (&threadpool_io->updates_cond, 0);
+       mono_gc_register_root ((void*)&threadpool_io->updates [0], sizeof (threadpool_io->updates), MONO_GC_DESCRIPTOR_NULL, MONO_ROOT_SOURCE_THREAD_POOL, "i/o thread pool updates list");
 
-       threadpool_io->updates = NULL;
        threadpool_io->updates_size = 0;
-       mono_mutex_init (&threadpool_io->updates_lock);
 
+       threadpool_io->backend = backend_poll;
+       if (g_getenv ("MONO_ENABLE_AIO") != NULL) {
 #if defined(HAVE_EPOLL)
-       threadpool_io->backend = backend_epoll;
+               threadpool_io->backend = backend_epoll;
 #elif defined(HAVE_KQUEUE)
-       threadpool_io->backend = backend_kqueue;
-#else
-       threadpool_io->backend = backend_poll;
+               threadpool_io->backend = backend_kqueue;
 #endif
-       if (g_getenv ("MONO_DISABLE_AIO") != NULL)
-               threadpool_io->backend = backend_poll;
+       }
 
        wakeup_pipes_init ();
 
        if (!threadpool_io->backend.init (threadpool_io->wakeup_pipes [0]))
-               g_error ("ensure_initialized: backend->init () failed");
+               g_error ("initialize: backend->init () failed");
 
        if (!mono_thread_create_internal (mono_get_root_domain (), selector_thread, NULL, TRUE, SMALL_STACK))
-               g_error ("ensure_initialized: mono_thread_create_internal () failed");
-
-       io_thread_status = STATUS_INITIALIZING;
-       mono_memory_write_barrier ();
-
-       io_status = STATUS_INITIALIZED;
+               g_error ("initialize: mono_thread_create_internal () failed");
 }
 
 static void
-ensure_cleanedup (void)
+cleanup (void)
 {
-       if (io_status == STATUS_NOT_INITIALIZED && InterlockedCompareExchange (&io_status, STATUS_CLEANED_UP, STATUS_NOT_INITIALIZED) == STATUS_NOT_INITIALIZED)
-               return;
-       if (io_status == STATUS_INITIALIZING) {
-               while (io_status == STATUS_INITIALIZING)
-                       mono_thread_info_yield ();
-       }
-       if (io_status == STATUS_CLEANED_UP)
-               return;
-       if (io_status == STATUS_CLEANING_UP || InterlockedCompareExchange (&io_status, STATUS_CLEANING_UP, STATUS_INITIALIZED) != STATUS_INITIALIZED) {
-               while (io_status == STATUS_CLEANING_UP)
-                       mono_thread_info_yield ();
-               g_assert (io_status == STATUS_CLEANED_UP);
-               return;
-       }
-
        /* we make the assumption along the code that we are
         * cleaning up only if the runtime is shutting down */
        g_assert (mono_runtime_is_shutting_down ());
 
        selector_thread_wakeup ();
-       while (io_thread_status != STATUS_CLEANED_UP)
-               usleep (1000);
-
-       MONO_GC_UNREGISTER_ROOT (threadpool_io->states);
-       mono_g_hash_table_destroy (threadpool_io->states);
-       mono_mutex_destroy (&threadpool_io->states_lock);
+       while (io_selector_running)
+               g_usleep (1000);
 
-       g_free (threadpool_io->updates);
        mono_mutex_destroy (&threadpool_io->updates_lock);
+       mono_cond_destroy (&threadpool_io->updates_cond);
 
        threadpool_io->backend.cleanup ();
 
@@ -421,187 +558,110 @@ ensure_cleanedup (void)
        g_free (threadpool_io);
        threadpool_io = NULL;
        g_assert (!threadpool_io);
-
-       io_status = STATUS_CLEANED_UP;
 }
 
-static gboolean
-is_socket_async_callback (MonoImage *system_image, MonoClass *class)
+void
+mono_threadpool_ms_io_cleanup (void)
 {
-       MonoClass *socket_async_callback_class = NULL;
-
-       socket_async_callback_class = mono_class_from_name (system_image, "System.Net.Sockets", "SocketAsyncCallback");
-       g_assert (socket_async_callback_class);
-
-       return class == socket_async_callback_class;
+       mono_lazy_cleanup (&io_status, cleanup);
 }
 
-static gboolean
-is_async_read_handler (MonoImage *system_image, MonoClass *class)
+void
+ves_icall_System_IOSelector_Add (gpointer handle, MonoIOSelectorJob *job)
 {
-       MonoClass *process_class = NULL;
+       ThreadPoolIOUpdate *update;
 
-       process_class = mono_class_from_name (system_image, "System.Diagnostics", "Process");
-       g_assert (process_class);
+       g_assert (handle >= 0);
 
-       return class->nested_in && class->nested_in == process_class && strcmp (class->name, "AsyncReadHandler") == 0;
-}
+       g_assert (job->operation == EVENT_IN ^ job->operation == EVENT_OUT);
+       g_assert (job->callback);
 
-gboolean
-mono_threadpool_ms_is_io (MonoObject *target, MonoObject *state)
-{
-       MonoImage *system_image;
-       MonoSocketAsyncResult *sockares;
+       if (mono_runtime_is_shutting_down ())
+               return;
+       if (mono_domain_is_unloading (mono_object_domain (job)))
+               return;
+
+       mono_lazy_initialize (&io_status, initialize);
 
-       system_image = mono_image_loaded ("System");
-       if (!system_image)
-               return FALSE;
+       mono_mutex_lock (&threadpool_io->updates_lock);
 
-       if (!is_socket_async_callback (system_image, target->vtable->klass) && !is_async_read_handler (system_image, target->vtable->klass))
-               return FALSE;
+       update = update_get_new ();
+       update->type = UPDATE_ADD;
+       update->data.add.fd = GPOINTER_TO_INT (handle);
+       update->data.add.job = job;
+       mono_memory_barrier (); /* Ensure this is safely published before we wake up the selector */
 
-       sockares = (MonoSocketAsyncResult*) state;
-       if (sockares->operation < AIO_OP_FIRST || sockares->operation >= AIO_OP_LAST)
-               return FALSE;
+       selector_thread_wakeup ();
 
-       return TRUE;
+       mono_mutex_unlock (&threadpool_io->updates_lock);
 }
 
 void
-mono_threadpool_ms_io_cleanup (void)
+ves_icall_System_IOSelector_Remove (gpointer handle)
 {
-       ensure_cleanedup ();
+       mono_threadpool_ms_io_remove_socket (GPOINTER_TO_INT (handle));
 }
 
-MonoAsyncResult *
-mono_threadpool_ms_io_add (MonoAsyncResult *ares, MonoSocketAsyncResult *sockares)
+void
+mono_threadpool_ms_io_remove_socket (int fd)
 {
        ThreadPoolIOUpdate *update;
-       MonoMList *list;
-       gboolean is_new;
-       gint events;
-       gint fd;
-
-       g_assert (ares);
-       g_assert (sockares);
 
-       if (mono_runtime_is_shutting_down ())
-               return NULL;
-
-       ensure_initialized ();
-
-       MONO_OBJECT_SETREF (sockares, ares, ares);
-
-       fd = GPOINTER_TO_INT (sockares->handle);
-
-       mono_mutex_lock (&threadpool_io->states_lock);
-       g_assert (threadpool_io->states);
-
-       list = mono_g_hash_table_lookup (threadpool_io->states, GINT_TO_POINTER (fd));
-       is_new = list == NULL;
-       list = mono_mlist_append (list, (MonoObject*) sockares);
-       mono_g_hash_table_replace (threadpool_io->states, sockares->handle, list);
-
-       events = get_events (list);
+       if (!mono_lazy_is_initialized (&io_status))
+               return;
 
        mono_mutex_lock (&threadpool_io->updates_lock);
-       threadpool_io->updates_size += 1;
-       threadpool_io->updates = g_renew (ThreadPoolIOUpdate, threadpool_io->updates, threadpool_io->updates_size);
-
-       update = &threadpool_io->updates [threadpool_io->updates_size - 1];
-       update->fd = fd;
-       update->events = events;
-       update->is_new = is_new;
-       mono_mutex_unlock (&threadpool_io->updates_lock);
 
-       mono_mutex_unlock (&threadpool_io->states_lock);
+       update = update_get_new ();
+       update->type = UPDATE_REMOVE_SOCKET;
+       update->data.add.fd = fd;
+       mono_memory_barrier (); /* Ensure this is safely published before we wake up the selector */
 
        selector_thread_wakeup ();
 
-       return ares;
+       mono_cond_wait (&threadpool_io->updates_cond, &threadpool_io->updates_lock);
+
+       mono_mutex_unlock (&threadpool_io->updates_lock);
 }
 
 void
-mono_threadpool_ms_io_remove_socket (int fd)
+mono_threadpool_ms_io_remove_domain_jobs (MonoDomain *domain)
 {
-       MonoMList *list;
+       ThreadPoolIOUpdate *update;
 
-       if (io_status != STATUS_INITIALIZED)
+       if (!mono_lazy_is_initialized (&io_status))
                return;
 
-       mono_mutex_lock (&threadpool_io->states_lock);
-       g_assert (threadpool_io->states);
-       list = mono_g_hash_table_lookup (threadpool_io->states, GINT_TO_POINTER (fd));
-       if (list)
-               mono_g_hash_table_remove (threadpool_io->states, GINT_TO_POINTER (fd));
-       mono_mutex_unlock (&threadpool_io->states_lock);
-
-       while (list) {
-               MonoSocketAsyncResult *sockares, *sockares2;
-
-               sockares = (MonoSocketAsyncResult*) mono_mlist_get_data (list);
-               if (sockares->operation == AIO_OP_RECEIVE)
-                       sockares->operation = AIO_OP_RECV_JUST_CALLBACK;
-               else if (sockares->operation == AIO_OP_SEND)
-                       sockares->operation = AIO_OP_SEND_JUST_CALLBACK;
-
-               sockares2 = get_sockares_for_event (&list, MONO_POLLIN);
-               if (sockares2)
-                       mono_threadpool_ms_enqueue_work_item (((MonoObject*) sockares2)->vtable->domain, (MonoObject*) sockares2);
-
-               if (!list)
-                       break;
+       mono_mutex_lock (&threadpool_io->updates_lock);
 
-               sockares2 = get_sockares_for_event (&list, MONO_POLLOUT);
-               if (sockares2)
-                       mono_threadpool_ms_enqueue_work_item (((MonoObject*) sockares2)->vtable->domain, (MonoObject*) sockares2);
-       }
-}
+       update = update_get_new ();
+       update->type = UPDATE_REMOVE_DOMAIN;
+       update->data.remove_domain.domain = domain;
+       mono_memory_barrier (); /* Ensure this is safely published before we wake up the selector */
 
-static gboolean
-remove_sockstate_for_domain (gpointer key, gpointer value, gpointer user_data)
-{
-       MonoMList *list;
-       gboolean remove = FALSE;
-
-       for (list = value; list; list = mono_mlist_next (list)) {
-               MonoObject *data = mono_mlist_get_data (list);
-               if (mono_object_domain (data) == user_data) {
-                       remove = TRUE;
-                       mono_mlist_set_data (list, NULL);
-               }
-       }
+       selector_thread_wakeup ();
 
-       //FIXME is there some sort of additional unregistration we need to perform here?
-       return remove;
-}
+       mono_cond_wait (&threadpool_io->updates_cond, &threadpool_io->updates_lock);
 
-void
-mono_threadpool_ms_io_remove_domain_jobs (MonoDomain *domain)
-{
-       if (io_status == STATUS_INITIALIZED) {
-               mono_mutex_lock (&threadpool_io->states_lock);
-               mono_g_hash_table_foreach_remove (threadpool_io->states, remove_sockstate_for_domain, domain);
-               mono_mutex_unlock (&threadpool_io->states_lock);
-       }
+       mono_mutex_unlock (&threadpool_io->updates_lock);
 }
 
 #else
 
-gboolean
-mono_threadpool_ms_is_io (MonoObject *target, MonoObject *state)
+void
+ves_icall_System_IOSelector_Add (gpointer handle, MonoIOSelectorJob *job)
 {
-       return FALSE;
+       g_assert_not_reached ();
 }
 
 void
-mono_threadpool_ms_io_cleanup (void)
+ves_icall_System_IOSelector_Remove (gpointer handle)
 {
        g_assert_not_reached ();
 }
 
-MonoAsyncResult *
-mono_threadpool_ms_io_add (MonoAsyncResult *ares, MonoSocketAsyncResult *sockares)
+void
+mono_threadpool_ms_io_cleanup (void)
 {
        g_assert_not_reached ();
 }
@@ -618,4 +678,4 @@ mono_threadpool_ms_io_remove_domain_jobs (MonoDomain *domain)
        g_assert_not_reached ();
 }
 
-#endif
\ No newline at end of file
+#endif