Clarify ownership of generic param objects
[mono.git] / mono / metadata / threadpool-ms-io.c
index d6f8b25b2521e381534cefb1f05561bdf72accea..6ac26ed033c111604bba586cfa527ea766b8e8dd 100644 (file)
 #include <fcntl.h>
 #endif
 
-#include <mono/metadata/gc-internal.h>
+#include <mono/metadata/gc-internals.h>
 #include <mono/metadata/mono-mlist.h>
 #include <mono/metadata/threadpool-ms.h>
 #include <mono/metadata/threadpool-ms-io.h>
 #include <mono/utils/atomic.h>
 #include <mono/utils/mono-threads.h>
 #include <mono/utils/mono-lazy-init.h>
-#include <mono/utils/mono-logger-internal.h>
+#include <mono/utils/mono-logger-internals.h>
 
 typedef struct {
        gboolean (*init) (gint wakeup_pipe_fd);
@@ -37,10 +37,12 @@ typedef struct {
        gint     (*event_wait) (void (*callback) (gint fd, gint events, gpointer user_data), gpointer user_data);
 } ThreadPoolIOBackend;
 
-enum {
+/* Keep in sync with System.IOOperation in mcs/class/System/System/IOSelector.cs */
+enum MonoIOOperation {
        EVENT_IN   = 1 << 0,
        EVENT_OUT  = 1 << 1,
-} ThreadPoolIOEvent;
+       EVENT_ERR  = 1 << 2, /* not in managed */
+};
 
 #include "threadpool-ms-io-epoll.c"
 #include "threadpool-ms-io-kqueue.c"
@@ -48,24 +50,12 @@ enum {
 
 #define UPDATES_CAPACITY 128
 
-/* Keep in sync with System.Net.Sockets.Socket.SocketOperation */
-enum {
-       AIO_OP_FIRST,
-       AIO_OP_ACCEPT = 0,
-       AIO_OP_CONNECT,
-       AIO_OP_RECEIVE,
-       AIO_OP_RECEIVEFROM,
-       AIO_OP_SEND,
-       AIO_OP_SENDTO,
-       AIO_OP_RECV_JUST_CALLBACK,
-       AIO_OP_SEND_JUST_CALLBACK,
-       AIO_OP_READPIPE,
-       AIO_OP_CONSOLE2,
-       AIO_OP_DISCONNECT,
-       AIO_OP_ACCEPTRECEIVE,
-       AIO_OP_RECEIVE_BUFFERS,
-       AIO_OP_SEND_BUFFERS,
-       AIO_OP_LAST
+/* Keep in sync with System.IOSelectorJob in mcs/class/System/System/IOSelector.cs */
+struct _MonoIOSelectorJob {
+       MonoObject object;
+       gint32 operation;
+       MonoObject *callback;
+       MonoObject *state;
 };
 
 typedef enum {
@@ -77,7 +67,7 @@ typedef enum {
 
 typedef struct {
        gint fd;
-       MonoSocketAsyncResult *sockares;
+       MonoIOSelectorJob *job;
 } ThreadPoolIOUpdate_Add;
 
 typedef struct {
@@ -118,42 +108,18 @@ static gboolean io_selector_running = FALSE;
 
 static ThreadPoolIO* threadpool_io;
 
-static int
-get_events_from_sockares (MonoSocketAsyncResult *ares)
-{
-       switch (ares->operation) {
-       case AIO_OP_ACCEPT:
-       case AIO_OP_RECEIVE:
-       case AIO_OP_RECV_JUST_CALLBACK:
-       case AIO_OP_RECEIVEFROM:
-       case AIO_OP_READPIPE:
-       case AIO_OP_ACCEPTRECEIVE:
-       case AIO_OP_RECEIVE_BUFFERS:
-               return EVENT_IN;
-       case AIO_OP_SEND:
-       case AIO_OP_SEND_JUST_CALLBACK:
-       case AIO_OP_SENDTO:
-       case AIO_OP_CONNECT:
-       case AIO_OP_SEND_BUFFERS:
-       case AIO_OP_DISCONNECT:
-               return EVENT_OUT;
-       default:
-               g_assert_not_reached ();
-       }
-}
-
-static MonoSocketAsyncResult*
-get_sockares_for_event (MonoMList **list, gint event)
+static MonoIOSelectorJob*
+get_job_for_event (MonoMList **list, gint32 event)
 {
        MonoMList *current;
 
        g_assert (list);
 
        for (current = *list; current; current = mono_mlist_next (current)) {
-               MonoSocketAsyncResult *ares = (MonoSocketAsyncResult*) mono_mlist_get_data (current);
-               if (get_events_from_sockares (ares) == event) {
+               MonoIOSelectorJob *job = (MonoIOSelectorJob*) mono_mlist_get_data (current);
+               if (job->operation == event) {
                        *list = mono_mlist_remove_item (*list, current);
-                       return ares;
+                       return job;
                }
        }
 
@@ -161,15 +127,15 @@ get_sockares_for_event (MonoMList **list, gint event)
 }
 
 static gint
-get_events (MonoMList *list)
+get_operations_for_jobs (MonoMList *list)
 {
        MonoMList *current;
-       gint events = 0;
+       gint operations = 0;
 
        for (current = list; current; current = mono_mlist_next (current))
-               events |= get_events_from_sockares ((MonoSocketAsyncResult*) mono_mlist_get_data (current));
+               operations |= ((MonoIOSelectorJob*) mono_mlist_get_data (current))->operation;
 
-       return events;
+       return operations;
 }
 
 static void
@@ -234,7 +200,7 @@ typedef struct {
 } FilterSockaresForDomainData;
 
 static void
-filter_sockares_for_domain (gpointer key, gpointer value, gpointer user_data)
+filter_jobs_for_domain (gpointer key, gpointer value, gpointer user_data)
 {
        FilterSockaresForDomainData *data;
        MonoMList *list = value, *element;
@@ -247,8 +213,8 @@ filter_sockares_for_domain (gpointer key, gpointer value, gpointer user_data)
        states = data->states;
 
        for (element = list; element; element = mono_mlist_next (element)) {
-               MonoSocketAsyncResult *sockares = (MonoSocketAsyncResult*) mono_mlist_get_data (element);
-               if (mono_object_domain (sockares) == domain)
+               MonoIOSelectorJob *job = (MonoIOSelectorJob*) mono_mlist_get_data (element);
+               if (mono_object_domain (job) == domain)
                        mono_mlist_set_data (element, NULL);
        }
 
@@ -289,35 +255,46 @@ wait_callback (gint fd, gint events, gpointer user_data)
                MonoGHashTable *states;
                MonoMList *list = NULL;
                gpointer k;
+               gboolean remove_fd = FALSE;
+               gint operations;
 
                g_assert (user_data);
                states = user_data;
 
-               mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: cal fd %3d, events = %2s | %2s",
-                       fd, (events & EVENT_IN) ? "RD" : "..", (events & EVENT_OUT) ? "WR" : "..");
+               mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: cal fd %3d, events = %2s | %2s | %3s",
+                       fd, (events & EVENT_IN) ? "RD" : "..", (events & EVENT_OUT) ? "WR" : "..", (events & EVENT_ERR) ? "ERR" : "...");
 
                if (!mono_g_hash_table_lookup_extended (states, GINT_TO_POINTER (fd), &k, (gpointer*) &list))
                        g_error ("wait_callback: fd %d not found in states table", fd);
 
                if (list && (events & EVENT_IN) != 0) {
-                       MonoSocketAsyncResult *sockares = get_sockares_for_event (&list, EVENT_IN);
-                       if (sockares)
-                               mono_threadpool_ms_enqueue_work_item (((MonoObject*) sockares)->vtable->domain, (MonoObject*) sockares);
+                       MonoIOSelectorJob *job = get_job_for_event (&list, EVENT_IN);
+                       if (job)
+                               mono_threadpool_ms_enqueue_work_item (((MonoObject*) job)->vtable->domain, (MonoObject*) job);
                }
                if (list && (events & EVENT_OUT) != 0) {
-                       MonoSocketAsyncResult *sockares = get_sockares_for_event (&list, EVENT_OUT);
-                       if (sockares)
-                               mono_threadpool_ms_enqueue_work_item (((MonoObject*) sockares)->vtable->domain, (MonoObject*) sockares);
+                       MonoIOSelectorJob *job = get_job_for_event (&list, EVENT_OUT);
+                       if (job)
+                               mono_threadpool_ms_enqueue_work_item (((MonoObject*) job)->vtable->domain, (MonoObject*) job);
                }
 
-               mono_g_hash_table_replace (states, GINT_TO_POINTER (fd), list);
+               remove_fd = (events & EVENT_ERR) == EVENT_ERR;
+               if (!remove_fd) {
+                       mono_g_hash_table_replace (states, GINT_TO_POINTER (fd), list);
+
+                       operations = get_operations_for_jobs (list);
+
+                       mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: res fd %3d, events = %2s | %2s | %3s",
+                               fd, (operations & EVENT_IN) ? "RD" : "..", (operations & EVENT_OUT) ? "WR" : "..", (operations & EVENT_ERR) ? "ERR" : "...");
 
-               events = get_events (list);
+                       threadpool_io->backend.register_fd (fd, operations, FALSE);
+               } else {
+                       mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: err fd %d", fd);
 
-               mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: res fd %3d, events = %2s | %2s",
-                       fd, (events & EVENT_IN) ? "RD" : "..", (events & EVENT_OUT) ? "WR" : "..");
+                       mono_g_hash_table_remove (states, GINT_TO_POINTER (fd));
 
-               threadpool_io->backend.register_fd (fd, events, FALSE);
+                       threadpool_io->backend.remove_fd (fd);
+               }
        }
 }
 
@@ -333,7 +310,7 @@ selector_thread (gpointer data)
                return;
        }
 
-       states = mono_g_hash_table_new_type (g_direct_hash, g_direct_equal, MONO_HASH_VALUE_GC);
+       states = mono_g_hash_table_new_type (g_direct_hash, g_direct_equal, MONO_HASH_VALUE_GC, MONO_ROOT_SOURCE_THREAD_POOL, "i/o thread pool states table");
 
        for (;;) {
                gint i, j;
@@ -349,28 +326,28 @@ selector_thread (gpointer data)
                                break;
                        case UPDATE_ADD: {
                                gint fd;
-                               gint events;
+                               gint operations;
                                gpointer k;
                                gboolean exists;
                                MonoMList *list = NULL;
-                               MonoSocketAsyncResult *sockares;
+                               MonoIOSelectorJob *job;
 
                                fd = update->data.add.fd;
                                g_assert (fd >= 0);
 
-                               sockares = update->data.add.sockares;
-                               g_assert (sockares);
+                               job = update->data.add.job;
+                               g_assert (job);
 
                                exists = mono_g_hash_table_lookup_extended (states, GINT_TO_POINTER (fd), &k, (gpointer*) &list);
-                               list = mono_mlist_append (list, (MonoObject*) sockares);
-                               mono_g_hash_table_replace (states, sockares->handle, list);
+                               list = mono_mlist_append (list, (MonoObject*) job);
+                               mono_g_hash_table_replace (states, GINT_TO_POINTER (fd), list);
 
-                               events = get_events (list);
+                               operations = get_operations_for_jobs (list);
 
-                               mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: %3s fd %3d, events = %2s | %2s",
-                                       exists ? "mod" : "add", fd, (events & EVENT_IN) ? "RD" : "..", (events & EVENT_OUT) ? "WR" : "..");
+                               mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: %3s fd %3d, operations = %2s | %2s | %3s",
+                                       exists ? "mod" : "add", fd, (operations & EVENT_IN) ? "RD" : "..", (operations & EVENT_OUT) ? "WR" : "..", (operations & EVENT_ERR) ? "ERR" : "...");
 
-                               threadpool_io->backend.register_fd (fd, events, !exists);
+                               threadpool_io->backend.register_fd (fd, operations, !exists);
 
                                break;
                        }
@@ -407,11 +384,11 @@ selector_thread (gpointer data)
                                g_assert (domain);
 
                                FilterSockaresForDomainData user_data = { .domain = domain, .states = states };
-                               mono_g_hash_table_foreach (states, filter_sockares_for_domain, &user_data);
+                               mono_g_hash_table_foreach (states, filter_jobs_for_domain, &user_data);
 
                                for (j = i + 1; j < threadpool_io->updates_size; ++j) {
                                        ThreadPoolIOUpdate *update = &threadpool_io->updates [j];
-                                       if (update->type == UPDATE_ADD && mono_object_domain (update->data.add.sockares) == domain)
+                                       if (update->type == UPDATE_ADD && mono_object_domain (update->data.add.job) == domain)
                                                memset (update, 0, sizeof (ThreadPoolIOUpdate));
                                }
 
@@ -530,8 +507,8 @@ initialize (void)
        g_assert (threadpool_io);
 
        mono_mutex_init_recursive (&threadpool_io->updates_lock);
-       mono_cond_init (&threadpool_io->updates_cond, NULL);
-       mono_gc_register_root ((void*)&threadpool_io->updates [0], sizeof (threadpool_io->updates), MONO_GC_DESCRIPTOR_NULL);
+       mono_cond_init (&threadpool_io->updates_cond, 0);
+       mono_gc_register_root ((void*)&threadpool_io->updates [0], sizeof (threadpool_io->updates), MONO_GC_DESCRIPTOR_NULL, MONO_ROOT_SOURCE_THREAD_POOL, "i/o thread pool updates list");
 
        threadpool_io->updates_size = 0;
 
@@ -583,82 +560,46 @@ cleanup (void)
        g_assert (!threadpool_io);
 }
 
-static gboolean
-is_socket_async_callback (MonoImage *system_image, MonoClass *class)
-{
-       MonoClass *socket_async_callback_class = NULL;
-
-       socket_async_callback_class = mono_class_from_name (system_image, "System.Net.Sockets", "SocketAsyncCallback");
-
-       return class == socket_async_callback_class;
-}
-
-static gboolean
-is_async_read_handler (MonoImage *system_image, MonoClass *class)
-{
-       MonoClass *async_read_handler_class = NULL;
-
-       async_read_handler_class = mono_class_from_name (system_image, "System.Diagnostics", "Process/AsyncReadHandler");
-
-       return class == async_read_handler_class;
-}
-
-gboolean
-mono_threadpool_ms_is_io (MonoObject *target, MonoObject *state)
-{
-       MonoImage *system_image;
-       MonoSocketAsyncResult *sockares;
-
-       system_image = mono_image_loaded ("System");
-       if (!system_image)
-               return FALSE;
-
-       if (!is_socket_async_callback (system_image, target->vtable->klass) && !is_async_read_handler (system_image, target->vtable->klass))
-               return FALSE;
-
-       sockares = (MonoSocketAsyncResult*) state;
-       if (sockares->operation < AIO_OP_FIRST || sockares->operation >= AIO_OP_LAST)
-               return FALSE;
-
-       return TRUE;
-}
-
 void
 mono_threadpool_ms_io_cleanup (void)
 {
        mono_lazy_cleanup (&io_status, cleanup);
 }
 
-MonoAsyncResult *
-mono_threadpool_ms_io_add (MonoAsyncResult *ares, MonoSocketAsyncResult *sockares)
+void
+ves_icall_System_IOSelector_Add (gpointer handle, MonoIOSelectorJob *job)
 {
        ThreadPoolIOUpdate *update;
 
-       g_assert (ares);
-       g_assert (sockares);
+       g_assert (handle >= 0);
+
+       g_assert (job->operation == EVENT_IN ^ job->operation == EVENT_OUT);
+       g_assert (job->callback);
 
        if (mono_runtime_is_shutting_down ())
-               return NULL;
-       if (mono_domain_is_unloading (mono_object_domain (sockares)))
-               return NULL;
+               return;
+       if (mono_domain_is_unloading (mono_object_domain (job)))
+               return;
 
        mono_lazy_initialize (&io_status, initialize);
 
-       MONO_OBJECT_SETREF (sockares, ares, ares);
-
        mono_mutex_lock (&threadpool_io->updates_lock);
 
        update = update_get_new ();
        update->type = UPDATE_ADD;
-       update->data.add.fd = GPOINTER_TO_INT (sockares->handle);
-       update->data.add.sockares = sockares;
+       update->data.add.fd = GPOINTER_TO_INT (handle);
+       update->data.add.job = job;
        mono_memory_barrier (); /* Ensure this is safely published before we wake up the selector */
 
        selector_thread_wakeup ();
 
        mono_mutex_unlock (&threadpool_io->updates_lock);
+}
 
-       return ares;
+void
+ves_icall_System_IOSelector_Remove (gpointer handle)
+{
+       mono_threadpool_ms_io_remove_socket (GPOINTER_TO_INT (handle));
 }
 
 void
@@ -705,54 +646,34 @@ mono_threadpool_ms_io_remove_domain_jobs (MonoDomain *domain)
        mono_mutex_unlock (&threadpool_io->updates_lock);
 }
 
-void
-icall_append_io_job (MonoObject *target, MonoSocketAsyncResult *state)
-{
-       MonoAsyncResult *ares;
-
-       /* Don't call mono_async_result_new() to avoid capturing the context */
-       ares = (MonoAsyncResult *) mono_object_new (mono_domain_get (), mono_defaults.asyncresult_class);
-       MONO_OBJECT_SETREF (ares, async_delegate, target);
-       MONO_OBJECT_SETREF (ares, async_state, state);
-
-       mono_threadpool_ms_io_add (ares, state);
-       return;
-}
-
 #else
 
-gboolean
-mono_threadpool_ms_is_io (MonoObject *target, MonoObject *state)
-{
-       return FALSE;
-}
-
 void
-mono_threadpool_ms_io_cleanup (void)
+ves_icall_System_IOSelector_Add (gpointer handle, MonoIOSelectorJob *job)
 {
        g_assert_not_reached ();
 }
 
-MonoAsyncResult *
-mono_threadpool_ms_io_add (MonoAsyncResult *ares, MonoSocketAsyncResult *sockares)
+void
+ves_icall_System_IOSelector_Remove (gpointer handle)
 {
        g_assert_not_reached ();
 }
 
 void
-mono_threadpool_ms_io_remove_socket (int fd)
+mono_threadpool_ms_io_cleanup (void)
 {
        g_assert_not_reached ();
 }
 
 void
-mono_threadpool_ms_io_remove_domain_jobs (MonoDomain *domain)
+mono_threadpool_ms_io_remove_socket (int fd)
 {
        g_assert_not_reached ();
 }
 
 void
-icall_append_io_job (MonoObject *target, MonoSocketAsyncResult *state)
+mono_threadpool_ms_io_remove_domain_jobs (MonoDomain *domain)
 {
        g_assert_not_reached ();
 }