#include <fcntl.h>
#endif
-#include <mono/metadata/gc-internal.h>
+#include <mono/metadata/gc-internals.h>
#include <mono/metadata/mono-mlist.h>
#include <mono/metadata/threadpool-ms.h>
#include <mono/metadata/threadpool-ms-io.h>
#include <mono/utils/atomic.h>
#include <mono/utils/mono-threads.h>
#include <mono/utils/mono-lazy-init.h>
-#include <mono/utils/mono-logger-internal.h>
+#include <mono/utils/mono-logger-internals.h>
typedef struct {
gboolean (*init) (gint wakeup_pipe_fd);
gint (*event_wait) (void (*callback) (gint fd, gint events, gpointer user_data), gpointer user_data);
} ThreadPoolIOBackend;
-enum {
+/* Keep in sync with System.IOOperation in mcs/class/System/System/IOSelector.cs */
+enum MonoIOOperation {
EVENT_IN = 1 << 0,
EVENT_OUT = 1 << 1,
-} ThreadPoolIOEvent;
+ EVENT_ERR = 1 << 2, /* not in managed */
+};
#include "threadpool-ms-io-epoll.c"
#include "threadpool-ms-io-kqueue.c"
#define UPDATES_CAPACITY 128
-/* Keep in sync with System.Net.Sockets.Socket.SocketOperation */
-enum {
- AIO_OP_FIRST,
- AIO_OP_ACCEPT = 0,
- AIO_OP_CONNECT,
- AIO_OP_RECEIVE,
- AIO_OP_RECEIVEFROM,
- AIO_OP_SEND,
- AIO_OP_SENDTO,
- AIO_OP_RECV_JUST_CALLBACK,
- AIO_OP_SEND_JUST_CALLBACK,
- AIO_OP_READPIPE,
- AIO_OP_CONSOLE2,
- AIO_OP_DISCONNECT,
- AIO_OP_ACCEPTRECEIVE,
- AIO_OP_RECEIVE_BUFFERS,
- AIO_OP_SEND_BUFFERS,
- AIO_OP_LAST
+/* Keep in sync with System.IOSelectorJob in mcs/class/System/System/IOSelector.cs */
+struct _MonoIOSelectorJob {
+ MonoObject object;
+ gint32 operation;
+ MonoObject *callback;
+ MonoObject *state;
};
typedef enum {
typedef struct {
gint fd;
- MonoSocketAsyncResult *sockares;
+ MonoIOSelectorJob *job;
} ThreadPoolIOUpdate_Add;
typedef struct {
static ThreadPoolIO* threadpool_io;
-static int
-get_events_from_sockares (MonoSocketAsyncResult *ares)
-{
- switch (ares->operation) {
- case AIO_OP_ACCEPT:
- case AIO_OP_RECEIVE:
- case AIO_OP_RECV_JUST_CALLBACK:
- case AIO_OP_RECEIVEFROM:
- case AIO_OP_READPIPE:
- case AIO_OP_ACCEPTRECEIVE:
- case AIO_OP_RECEIVE_BUFFERS:
- return EVENT_IN;
- case AIO_OP_SEND:
- case AIO_OP_SEND_JUST_CALLBACK:
- case AIO_OP_SENDTO:
- case AIO_OP_CONNECT:
- case AIO_OP_SEND_BUFFERS:
- case AIO_OP_DISCONNECT:
- return EVENT_OUT;
- default:
- g_assert_not_reached ();
- }
-}
-
-static MonoSocketAsyncResult*
-get_sockares_for_event (MonoMList **list, gint event)
+static MonoIOSelectorJob*
+get_job_for_event (MonoMList **list, gint32 event)
{
MonoMList *current;
g_assert (list);
for (current = *list; current; current = mono_mlist_next (current)) {
- MonoSocketAsyncResult *ares = (MonoSocketAsyncResult*) mono_mlist_get_data (current);
- if (get_events_from_sockares (ares) == event) {
+ MonoIOSelectorJob *job = (MonoIOSelectorJob*) mono_mlist_get_data (current);
+ if (job->operation == event) {
*list = mono_mlist_remove_item (*list, current);
- return ares;
+ return job;
}
}
}
static gint
-get_events (MonoMList *list)
+get_operations_for_jobs (MonoMList *list)
{
MonoMList *current;
- gint events = 0;
+ gint operations = 0;
for (current = list; current; current = mono_mlist_next (current))
- events |= get_events_from_sockares ((MonoSocketAsyncResult*) mono_mlist_get_data (current));
+ operations |= ((MonoIOSelectorJob*) mono_mlist_get_data (current))->operation;
- return events;
+ return operations;
}
static void
} FilterSockaresForDomainData;
static void
-filter_sockares_for_domain (gpointer key, gpointer value, gpointer user_data)
+filter_jobs_for_domain (gpointer key, gpointer value, gpointer user_data)
{
FilterSockaresForDomainData *data;
MonoMList *list = value, *element;
states = data->states;
for (element = list; element; element = mono_mlist_next (element)) {
- MonoSocketAsyncResult *sockares = (MonoSocketAsyncResult*) mono_mlist_get_data (element);
- if (mono_object_domain (sockares) == domain)
+ MonoIOSelectorJob *job = (MonoIOSelectorJob*) mono_mlist_get_data (element);
+ if (mono_object_domain (job) == domain)
mono_mlist_set_data (element, NULL);
}
MonoGHashTable *states;
MonoMList *list = NULL;
gpointer k;
+ gboolean remove_fd = FALSE;
+ gint operations;
g_assert (user_data);
states = user_data;
- mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: cal fd %3d, events = %2s | %2s",
- fd, (events & EVENT_IN) ? "RD" : "..", (events & EVENT_OUT) ? "WR" : "..");
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: cal fd %3d, events = %2s | %2s | %3s",
+ fd, (events & EVENT_IN) ? "RD" : "..", (events & EVENT_OUT) ? "WR" : "..", (events & EVENT_ERR) ? "ERR" : "...");
if (!mono_g_hash_table_lookup_extended (states, GINT_TO_POINTER (fd), &k, (gpointer*) &list))
g_error ("wait_callback: fd %d not found in states table", fd);
if (list && (events & EVENT_IN) != 0) {
- MonoSocketAsyncResult *sockares = get_sockares_for_event (&list, EVENT_IN);
- if (sockares)
- mono_threadpool_ms_enqueue_work_item (((MonoObject*) sockares)->vtable->domain, (MonoObject*) sockares);
+ MonoIOSelectorJob *job = get_job_for_event (&list, EVENT_IN);
+ if (job)
+ mono_threadpool_ms_enqueue_work_item (((MonoObject*) job)->vtable->domain, (MonoObject*) job);
}
if (list && (events & EVENT_OUT) != 0) {
- MonoSocketAsyncResult *sockares = get_sockares_for_event (&list, EVENT_OUT);
- if (sockares)
- mono_threadpool_ms_enqueue_work_item (((MonoObject*) sockares)->vtable->domain, (MonoObject*) sockares);
+ MonoIOSelectorJob *job = get_job_for_event (&list, EVENT_OUT);
+ if (job)
+ mono_threadpool_ms_enqueue_work_item (((MonoObject*) job)->vtable->domain, (MonoObject*) job);
}
- mono_g_hash_table_replace (states, GINT_TO_POINTER (fd), list);
+ remove_fd = (events & EVENT_ERR) == EVENT_ERR;
+ if (!remove_fd) {
+ mono_g_hash_table_replace (states, GINT_TO_POINTER (fd), list);
+
+ operations = get_operations_for_jobs (list);
+
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: res fd %3d, events = %2s | %2s | %3s",
+ fd, (operations & EVENT_IN) ? "RD" : "..", (operations & EVENT_OUT) ? "WR" : "..", (operations & EVENT_ERR) ? "ERR" : "...");
- events = get_events (list);
+ threadpool_io->backend.register_fd (fd, operations, FALSE);
+ } else {
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: err fd %d", fd);
- mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: res fd %3d, events = %2s | %2s",
- fd, (events & EVENT_IN) ? "RD" : "..", (events & EVENT_OUT) ? "WR" : "..");
+ mono_g_hash_table_remove (states, GINT_TO_POINTER (fd));
- threadpool_io->backend.register_fd (fd, events, FALSE);
+ threadpool_io->backend.remove_fd (fd);
+ }
}
}
return;
}
- states = mono_g_hash_table_new_type (g_direct_hash, g_direct_equal, MONO_HASH_VALUE_GC);
+ states = mono_g_hash_table_new_type (g_direct_hash, g_direct_equal, MONO_HASH_VALUE_GC, MONO_ROOT_SOURCE_THREAD_POOL, "i/o thread pool states table");
for (;;) {
gint i, j;
break;
case UPDATE_ADD: {
gint fd;
- gint events;
+ gint operations;
gpointer k;
gboolean exists;
MonoMList *list = NULL;
- MonoSocketAsyncResult *sockares;
+ MonoIOSelectorJob *job;
fd = update->data.add.fd;
g_assert (fd >= 0);
- sockares = update->data.add.sockares;
- g_assert (sockares);
+ job = update->data.add.job;
+ g_assert (job);
exists = mono_g_hash_table_lookup_extended (states, GINT_TO_POINTER (fd), &k, (gpointer*) &list);
- list = mono_mlist_append (list, (MonoObject*) sockares);
- mono_g_hash_table_replace (states, sockares->handle, list);
+ list = mono_mlist_append (list, (MonoObject*) job);
+ mono_g_hash_table_replace (states, GINT_TO_POINTER (fd), list);
- events = get_events (list);
+ operations = get_operations_for_jobs (list);
- mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: %3s fd %3d, events = %2s | %2s",
- exists ? "mod" : "add", fd, (events & EVENT_IN) ? "RD" : "..", (events & EVENT_OUT) ? "WR" : "..");
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: %3s fd %3d, operations = %2s | %2s | %3s",
+ exists ? "mod" : "add", fd, (operations & EVENT_IN) ? "RD" : "..", (operations & EVENT_OUT) ? "WR" : "..", (operations & EVENT_ERR) ? "ERR" : "...");
- threadpool_io->backend.register_fd (fd, events, !exists);
+ threadpool_io->backend.register_fd (fd, operations, !exists);
break;
}
g_assert (domain);
FilterSockaresForDomainData user_data = { .domain = domain, .states = states };
- mono_g_hash_table_foreach (states, filter_sockares_for_domain, &user_data);
+ mono_g_hash_table_foreach (states, filter_jobs_for_domain, &user_data);
for (j = i + 1; j < threadpool_io->updates_size; ++j) {
ThreadPoolIOUpdate *update = &threadpool_io->updates [j];
- if (update->type == UPDATE_ADD && mono_object_domain (update->data.add.sockares) == domain)
+ if (update->type == UPDATE_ADD && mono_object_domain (update->data.add.job) == domain)
memset (update, 0, sizeof (ThreadPoolIOUpdate));
}
g_assert (threadpool_io);
mono_mutex_init_recursive (&threadpool_io->updates_lock);
- mono_cond_init (&threadpool_io->updates_cond, NULL);
- mono_gc_register_root ((void*)&threadpool_io->updates [0], sizeof (threadpool_io->updates), MONO_GC_DESCRIPTOR_NULL);
+ mono_cond_init (&threadpool_io->updates_cond, 0);
+ mono_gc_register_root ((void*)&threadpool_io->updates [0], sizeof (threadpool_io->updates), MONO_GC_DESCRIPTOR_NULL, MONO_ROOT_SOURCE_THREAD_POOL, "i/o thread pool updates list");
threadpool_io->updates_size = 0;
g_assert (!threadpool_io);
}
-static gboolean
-is_socket_async_callback (MonoImage *system_image, MonoClass *class)
-{
- MonoClass *socket_async_callback_class = NULL;
-
- socket_async_callback_class = mono_class_from_name (system_image, "System.Net.Sockets", "SocketAsyncCallback");
-
- return class == socket_async_callback_class;
-}
-
-static gboolean
-is_async_read_handler (MonoImage *system_image, MonoClass *class)
-{
- MonoClass *async_read_handler_class = NULL;
-
- async_read_handler_class = mono_class_from_name (system_image, "System.Diagnostics", "Process/AsyncReadHandler");
-
- return class == async_read_handler_class;
-}
-
-gboolean
-mono_threadpool_ms_is_io (MonoObject *target, MonoObject *state)
-{
- MonoImage *system_image;
- MonoSocketAsyncResult *sockares;
-
- system_image = mono_image_loaded ("System");
- if (!system_image)
- return FALSE;
-
- if (!is_socket_async_callback (system_image, target->vtable->klass) && !is_async_read_handler (system_image, target->vtable->klass))
- return FALSE;
-
- sockares = (MonoSocketAsyncResult*) state;
- if (sockares->operation < AIO_OP_FIRST || sockares->operation >= AIO_OP_LAST)
- return FALSE;
-
- return TRUE;
-}
-
void
mono_threadpool_ms_io_cleanup (void)
{
mono_lazy_cleanup (&io_status, cleanup);
}
-MonoAsyncResult *
-mono_threadpool_ms_io_add (MonoAsyncResult *ares, MonoSocketAsyncResult *sockares)
+void
+ves_icall_System_IOSelector_Add (gpointer handle, MonoIOSelectorJob *job)
{
ThreadPoolIOUpdate *update;
- g_assert (ares);
- g_assert (sockares);
+ g_assert (handle >= 0);
+
+ g_assert (job->operation == EVENT_IN ^ job->operation == EVENT_OUT);
+ g_assert (job->callback);
if (mono_runtime_is_shutting_down ())
- return NULL;
- if (mono_domain_is_unloading (mono_object_domain (sockares)))
- return NULL;
+ return;
+ if (mono_domain_is_unloading (mono_object_domain (job)))
+ return;
mono_lazy_initialize (&io_status, initialize);
- MONO_OBJECT_SETREF (sockares, ares, ares);
-
mono_mutex_lock (&threadpool_io->updates_lock);
update = update_get_new ();
update->type = UPDATE_ADD;
- update->data.add.fd = GPOINTER_TO_INT (sockares->handle);
- update->data.add.sockares = sockares;
+ update->data.add.fd = GPOINTER_TO_INT (handle);
+ update->data.add.job = job;
mono_memory_barrier (); /* Ensure this is safely published before we wake up the selector */
selector_thread_wakeup ();
mono_mutex_unlock (&threadpool_io->updates_lock);
+}
- return ares;
+void
+ves_icall_System_IOSelector_Remove (gpointer handle)
+{
+ mono_threadpool_ms_io_remove_socket (GPOINTER_TO_INT (handle));
}
void
mono_mutex_unlock (&threadpool_io->updates_lock);
}
-void
-icall_append_io_job (MonoObject *target, MonoSocketAsyncResult *state)
-{
- MonoAsyncResult *ares;
-
- /* Don't call mono_async_result_new() to avoid capturing the context */
- ares = (MonoAsyncResult *) mono_object_new (mono_domain_get (), mono_defaults.asyncresult_class);
- MONO_OBJECT_SETREF (ares, async_delegate, target);
- MONO_OBJECT_SETREF (ares, async_state, state);
-
- mono_threadpool_ms_io_add (ares, state);
- return;
-}
-
#else
-gboolean
-mono_threadpool_ms_is_io (MonoObject *target, MonoObject *state)
-{
- return FALSE;
-}
-
void
-mono_threadpool_ms_io_cleanup (void)
+ves_icall_System_IOSelector_Add (gpointer handle, MonoIOSelectorJob *job)
{
g_assert_not_reached ();
}
-MonoAsyncResult *
-mono_threadpool_ms_io_add (MonoAsyncResult *ares, MonoSocketAsyncResult *sockares)
+void
+ves_icall_System_IOSelector_Remove (gpointer handle)
{
g_assert_not_reached ();
}
void
-mono_threadpool_ms_io_remove_socket (int fd)
+mono_threadpool_ms_io_cleanup (void)
{
g_assert_not_reached ();
}
void
-mono_threadpool_ms_io_remove_domain_jobs (MonoDomain *domain)
+mono_threadpool_ms_io_remove_socket (int fd)
{
g_assert_not_reached ();
}
void
-icall_append_io_job (MonoObject *target, MonoSocketAsyncResult *state)
+mono_threadpool_ms_io_remove_domain_jobs (MonoDomain *domain)
{
g_assert_not_reached ();
}