X-Git-Url: http://wien.tomnetworks.com/gitweb/?a=blobdiff_plain;f=mono%2Fmetadata%2Fthreadpool-ms-io.c;h=d6f8b25b2521e381534cefb1f05561bdf72accea;hb=8ae7b6d6b950e5ee5fcd4a6625e0f467340dddee;hp=71f5bac5f3ef31acf01ed310eba3b0e9dd2c5f7c;hpb=6efe705614f87d6fc57b05470b48c5518d73fdcf;p=mono.git diff --git a/mono/metadata/threadpool-ms-io.c b/mono/metadata/threadpool-ms-io.c index 71f5bac5f3e..d6f8b25b252 100644 --- a/mono/metadata/threadpool-ms-io.c +++ b/mono/metadata/threadpool-ms-io.c @@ -25,23 +25,29 @@ #include #include #include -#include #include +#include +#include typedef struct { gboolean (*init) (gint wakeup_pipe_fd); void (*cleanup) (void); - void (*update_add) (gint fd, gint events, gboolean is_new); - gint (*event_wait) (void); - gint (*event_get_fd_max) (void); - gint (*event_get_fd_at) (gint i, gint *events); - void (*event_reset_fd_at) (gint i, gint events); + void (*register_fd) (gint fd, gint events, gboolean is_new); + void (*remove_fd) (gint fd); + gint (*event_wait) (void (*callback) (gint fd, gint events, gpointer user_data), gpointer user_data); } ThreadPoolIOBackend; +enum { + EVENT_IN = 1 << 0, + EVENT_OUT = 1 << 1, +} ThreadPoolIOEvent; + #include "threadpool-ms-io-epoll.c" #include "threadpool-ms-io-kqueue.c" #include "threadpool-ms-io-poll.c" +#define UPDATES_CAPACITY 128 + /* Keep in sync with System.Net.Sockets.Socket.SocketOperation */ enum { AIO_OP_FIRST, @@ -62,20 +68,42 @@ enum { AIO_OP_LAST }; +typedef enum { + UPDATE_EMPTY = 0, + UPDATE_ADD, + UPDATE_REMOVE_SOCKET, + UPDATE_REMOVE_DOMAIN, +} ThreadPoolIOUpdateType; + typedef struct { + gint fd; MonoSocketAsyncResult *sockares; -} ThreadPoolIOUpdate; +} ThreadPoolIOUpdate_Add; typedef struct { - MonoGHashTable *states; - mono_mutex_t states_lock; + gint fd; +} ThreadPoolIOUpdate_RemoveSocket; + +typedef struct { + MonoDomain *domain; +} ThreadPoolIOUpdate_RemoveDomain; +typedef struct { + ThreadPoolIOUpdateType type; + union { + ThreadPoolIOUpdate_Add add; + ThreadPoolIOUpdate_RemoveSocket remove_socket; + ThreadPoolIOUpdate_RemoveDomain remove_domain; + } data; +} ThreadPoolIOUpdate; + +typedef struct { ThreadPoolIOBackend backend; - ThreadPoolIOUpdate *updates; - guint updates_size; - guint updates_capacity; + ThreadPoolIOUpdate updates [UPDATES_CAPACITY]; + gint updates_size; mono_mutex_t updates_lock; + mono_cond_t updates_cond; #if !defined(HOST_WIN32) gint wakeup_pipes [2]; @@ -84,8 +112,9 @@ typedef struct { #endif } ThreadPoolIO; -static gint32 io_status = STATUS_NOT_INITIALIZED; -static gint32 io_thread_status = STATUS_NOT_INITIALIZED; +static mono_lazy_init_t io_status = MONO_LAZY_INIT_STATUS_NOT_INITIALIZED; + +static gboolean io_selector_running = FALSE; static ThreadPoolIO* threadpool_io; @@ -100,14 +129,14 @@ get_events_from_sockares (MonoSocketAsyncResult *ares) case AIO_OP_READPIPE: case AIO_OP_ACCEPTRECEIVE: case AIO_OP_RECEIVE_BUFFERS: - return MONO_POLLIN; + return EVENT_IN; case AIO_OP_SEND: case AIO_OP_SEND_JUST_CALLBACK: case AIO_OP_SENDTO: case AIO_OP_CONNECT: case AIO_OP_SEND_BUFFERS: case AIO_OP_DISCONNECT: - return MONO_POLLOUT; + return EVENT_OUT; default: g_assert_not_reached (); } @@ -116,33 +145,29 @@ get_events_from_sockares (MonoSocketAsyncResult *ares) static MonoSocketAsyncResult* get_sockares_for_event (MonoMList **list, gint event) { - MonoSocketAsyncResult *state = NULL; MonoMList *current; g_assert (list); for (current = *list; current; current = mono_mlist_next (current)) { - state = (MonoSocketAsyncResult*) mono_mlist_get_data (current); - if (get_events_from_sockares ((MonoSocketAsyncResult*) state) == event) - break; - state = NULL; + MonoSocketAsyncResult *ares = (MonoSocketAsyncResult*) mono_mlist_get_data (current); + if (get_events_from_sockares (ares) == event) { + *list = mono_mlist_remove_item (*list, current); + return ares; + } } - if (current) - *list = mono_mlist_remove_item (*list, current); - - return state; + return NULL; } static gint get_events (MonoMList *list) { - MonoSocketAsyncResult *ares; + MonoMList *current; gint events = 0; - for (; list; list = mono_mlist_next (list)) - if ((ares = (MonoSocketAsyncResult*) mono_mlist_get_data (list))) - events |= get_events_from_sockares (ares); + for (current = list; current; current = mono_mlist_next (current)) + events |= get_events_from_sockares ((MonoSocketAsyncResult*) mono_mlist_get_data (current)); return events; } @@ -203,101 +228,241 @@ selector_thread_wakeup_drain_pipes (void) } } +typedef struct { + MonoDomain *domain; + MonoGHashTable *states; +} FilterSockaresForDomainData; + static void -selector_thread (gpointer data) +filter_sockares_for_domain (gpointer key, gpointer value, gpointer user_data) { - io_thread_status = STATUS_INITIALIZED; + FilterSockaresForDomainData *data; + MonoMList *list = value, *element; + MonoDomain *domain; + MonoGHashTable *states; - for (;;) { - guint i; - guint max; - gint ready = 0; + g_assert (user_data); + data = user_data; + domain = data->domain; + states = data->states; - mono_mutex_lock (&threadpool_io->states_lock); - mono_mutex_lock (&threadpool_io->updates_lock); + for (element = list; element; element = mono_mlist_next (element)) { + MonoSocketAsyncResult *sockares = (MonoSocketAsyncResult*) mono_mlist_get_data (element); + if (mono_object_domain (sockares) == domain) + mono_mlist_set_data (element, NULL); + } - for (i = 0; i < threadpool_io->updates_size; ++i) { - ThreadPoolIOUpdate *update; - MonoMList *list; + /* we skip all the first elements which are NULL */ + for (; list; list = mono_mlist_next (list)) { + if (mono_mlist_get_data (list)) + break; + } - update = &threadpool_io->updates [i]; + if (list) { + g_assert (mono_mlist_get_data (list)); + + /* we delete all the NULL elements after the first one */ + for (element = list; element;) { + MonoMList *next; + if (!(next = mono_mlist_next (element))) + break; + if (mono_mlist_get_data (next)) + element = next; + else + mono_mlist_set_next (element, mono_mlist_next (next)); + } + } - g_assert (update->sockares); + mono_g_hash_table_replace (states, key, list); +} - list = mono_g_hash_table_lookup (threadpool_io->states, update->sockares->handle); - list = mono_mlist_append (list, (MonoObject*) update->sockares); - mono_g_hash_table_replace (threadpool_io->states, update->sockares->handle, list); +static void +wait_callback (gint fd, gint events, gpointer user_data) +{ + if (mono_runtime_is_shutting_down ()) + return; - threadpool_io->backend.update_add (GPOINTER_TO_INT (update->sockares->handle), get_events (list), mono_mlist_next (list) == NULL); - } - if (threadpool_io->updates_size > 0) { - ThreadPoolIOUpdate *updates_old; + if (fd == threadpool_io->wakeup_pipes [0]) { + mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: wke"); + selector_thread_wakeup_drain_pipes (); + } else { + MonoGHashTable *states; + MonoMList *list = NULL; + gpointer k; - threadpool_io->updates_size = 0; - threadpool_io->updates_capacity = 128; + g_assert (user_data); + states = user_data; - updates_old = threadpool_io->updates; + mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: cal fd %3d, events = %2s | %2s", + fd, (events & EVENT_IN) ? "RD" : "..", (events & EVENT_OUT) ? "WR" : ".."); - threadpool_io->updates = mono_gc_alloc_fixed (sizeof (ThreadPoolIOUpdate) * threadpool_io->updates_capacity, MONO_GC_DESCRIPTOR_NULL); - g_assert (threadpool_io->updates); + if (!mono_g_hash_table_lookup_extended (states, GINT_TO_POINTER (fd), &k, (gpointer*) &list)) + g_error ("wait_callback: fd %d not found in states table", fd); - mono_gc_free_fixed (updates_old); + if (list && (events & EVENT_IN) != 0) { + MonoSocketAsyncResult *sockares = get_sockares_for_event (&list, EVENT_IN); + if (sockares) + mono_threadpool_ms_enqueue_work_item (((MonoObject*) sockares)->vtable->domain, (MonoObject*) sockares); + } + if (list && (events & EVENT_OUT) != 0) { + MonoSocketAsyncResult *sockares = get_sockares_for_event (&list, EVENT_OUT); + if (sockares) + mono_threadpool_ms_enqueue_work_item (((MonoObject*) sockares)->vtable->domain, (MonoObject*) sockares); } - mono_mutex_unlock (&threadpool_io->updates_lock); - mono_mutex_unlock (&threadpool_io->states_lock); + mono_g_hash_table_replace (states, GINT_TO_POINTER (fd), list); - mono_gc_set_skip_thread (TRUE); + events = get_events (list); - ready = threadpool_io->backend.event_wait (); + mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: res fd %3d, events = %2s | %2s", + fd, (events & EVENT_IN) ? "RD" : "..", (events & EVENT_OUT) ? "WR" : ".."); - mono_gc_set_skip_thread (FALSE); + threadpool_io->backend.register_fd (fd, events, FALSE); + } +} - if (ready == -1 || mono_runtime_is_shutting_down ()) - break; +static void +selector_thread (gpointer data) +{ + MonoGHashTable *states; - max = threadpool_io->backend.event_get_fd_max (); + io_selector_running = TRUE; - mono_mutex_lock (&threadpool_io->states_lock); + if (mono_runtime_is_shutting_down ()) { + io_selector_running = FALSE; + return; + } - for (i = 0; i < max && ready > 0; ++i) { - gint events; - gint fd = threadpool_io->backend.event_get_fd_at (i, &events); + states = mono_g_hash_table_new_type (g_direct_hash, g_direct_equal, MONO_HASH_VALUE_GC); - if (fd == -1) - continue; + for (;;) { + gint i, j; + gint res; - if (fd == threadpool_io->wakeup_pipes [0]) { - selector_thread_wakeup_drain_pipes (); - } else { - MonoMList *list = mono_g_hash_table_lookup (threadpool_io->states, GINT_TO_POINTER (fd)); + mono_mutex_lock (&threadpool_io->updates_lock); - if (list && (events & MONO_POLLIN) != 0) { - MonoSocketAsyncResult *sockares = get_sockares_for_event (&list, MONO_POLLIN); - if (sockares) - mono_threadpool_ms_enqueue_work_item (((MonoObject*) sockares)->vtable->domain, (MonoObject*) sockares); - } - if (list && (events & MONO_POLLOUT) != 0) { - MonoSocketAsyncResult *sockares = get_sockares_for_event (&list, MONO_POLLOUT); - if (sockares) - mono_threadpool_ms_enqueue_work_item (((MonoObject*) sockares)->vtable->domain, (MonoObject*) sockares); + for (i = 0; i < threadpool_io->updates_size; ++i) { + ThreadPoolIOUpdate *update = &threadpool_io->updates [i]; + + switch (update->type) { + case UPDATE_EMPTY: + break; + case UPDATE_ADD: { + gint fd; + gint events; + gpointer k; + gboolean exists; + MonoMList *list = NULL; + MonoSocketAsyncResult *sockares; + + fd = update->data.add.fd; + g_assert (fd >= 0); + + sockares = update->data.add.sockares; + g_assert (sockares); + + exists = mono_g_hash_table_lookup_extended (states, GINT_TO_POINTER (fd), &k, (gpointer*) &list); + list = mono_mlist_append (list, (MonoObject*) sockares); + mono_g_hash_table_replace (states, sockares->handle, list); + + events = get_events (list); + + mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: %3s fd %3d, events = %2s | %2s", + exists ? "mod" : "add", fd, (events & EVENT_IN) ? "RD" : "..", (events & EVENT_OUT) ? "WR" : ".."); + + threadpool_io->backend.register_fd (fd, events, !exists); + + break; + } + case UPDATE_REMOVE_SOCKET: { + gint fd; + gpointer k; + MonoMList *list = NULL; + + fd = update->data.remove_socket.fd; + g_assert (fd >= 0); + + if (mono_g_hash_table_lookup_extended (states, GINT_TO_POINTER (fd), &k, (gpointer*) &list)) { + mono_g_hash_table_remove (states, GINT_TO_POINTER (fd)); + + for (j = i + 1; j < threadpool_io->updates_size; ++j) { + ThreadPoolIOUpdate *update = &threadpool_io->updates [j]; + if (update->type == UPDATE_ADD && update->data.add.fd == fd) + memset (update, 0, sizeof (ThreadPoolIOUpdate)); + } + + for (; list; list = mono_mlist_remove_item (list, list)) + mono_threadpool_ms_enqueue_work_item (mono_object_domain (mono_mlist_get_data (list)), mono_mlist_get_data (list)); + + mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: del fd %3d", fd); + threadpool_io->backend.remove_fd (fd); } - if (!list) - mono_g_hash_table_remove (threadpool_io->states, GINT_TO_POINTER (fd)); - else - mono_g_hash_table_replace (threadpool_io->states, GINT_TO_POINTER (fd), list); + break; + } + case UPDATE_REMOVE_DOMAIN: { + MonoDomain *domain; + + domain = update->data.remove_domain.domain; + g_assert (domain); + + FilterSockaresForDomainData user_data = { .domain = domain, .states = states }; + mono_g_hash_table_foreach (states, filter_sockares_for_domain, &user_data); - threadpool_io->backend.event_reset_fd_at (i, get_events (list)); + for (j = i + 1; j < threadpool_io->updates_size; ++j) { + ThreadPoolIOUpdate *update = &threadpool_io->updates [j]; + if (update->type == UPDATE_ADD && mono_object_domain (update->data.add.sockares) == domain) + memset (update, 0, sizeof (ThreadPoolIOUpdate)); + } + + break; } + default: + g_assert_not_reached (); + } + } - ready -= 1; + mono_cond_broadcast (&threadpool_io->updates_cond); + + if (threadpool_io->updates_size > 0) { + threadpool_io->updates_size = 0; + memset (&threadpool_io->updates, 0, UPDATES_CAPACITY * sizeof (ThreadPoolIOUpdate)); } - mono_mutex_unlock (&threadpool_io->states_lock); + mono_mutex_unlock (&threadpool_io->updates_lock); + + mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: wai"); + + res = threadpool_io->backend.event_wait (wait_callback, states); + + if (res == -1 || mono_runtime_is_shutting_down ()) + break; } - io_thread_status = STATUS_CLEANED_UP; + mono_g_hash_table_destroy (states); + + io_selector_running = FALSE; +} + +/* Locking: threadpool_io->updates_lock must be held */ +static ThreadPoolIOUpdate* +update_get_new (void) +{ + ThreadPoolIOUpdate *update = NULL; + g_assert (threadpool_io->updates_size <= UPDATES_CAPACITY); + + while (threadpool_io->updates_size == UPDATES_CAPACITY) { + /* we wait for updates to be applied in the selector_thread and we loop + * as long as none are available. if it happends too much, then we need + * to increase UPDATES_CAPACITY */ + mono_cond_wait (&threadpool_io->updates_cond, &threadpool_io->updates_lock); + } + + g_assert (threadpool_io->updates_size < UPDATES_CAPACITY); + + update = &threadpool_io->updates [threadpool_io->updates_size ++]; + + return update; } static void @@ -358,87 +523,49 @@ wakeup_pipes_init (void) } static void -ensure_initialized (void) +initialize (void) { - if (io_status >= STATUS_INITIALIZED) - return; - if (io_status == STATUS_INITIALIZING || InterlockedCompareExchange (&io_status, STATUS_INITIALIZING, STATUS_NOT_INITIALIZED) != STATUS_NOT_INITIALIZED) { - while (io_status == STATUS_INITIALIZING) - mono_thread_info_yield (); - g_assert (io_status >= STATUS_INITIALIZED); - return; - } - g_assert (!threadpool_io); threadpool_io = g_new0 (ThreadPoolIO, 1); g_assert (threadpool_io); - threadpool_io->states = mono_g_hash_table_new_type (g_direct_hash, g_direct_equal, MONO_HASH_VALUE_GC); - MONO_GC_REGISTER_ROOT_FIXED (threadpool_io->states); - mono_mutex_init (&threadpool_io->states_lock); + mono_mutex_init_recursive (&threadpool_io->updates_lock); + mono_cond_init (&threadpool_io->updates_cond, NULL); + mono_gc_register_root ((void*)&threadpool_io->updates [0], sizeof (threadpool_io->updates), MONO_GC_DESCRIPTOR_NULL); - threadpool_io->updates = NULL; threadpool_io->updates_size = 0; - threadpool_io->updates_capacity = 0; - mono_mutex_init (&threadpool_io->updates_lock); + threadpool_io->backend = backend_poll; + if (g_getenv ("MONO_ENABLE_AIO") != NULL) { #if defined(HAVE_EPOLL) - threadpool_io->backend = backend_epoll; + threadpool_io->backend = backend_epoll; #elif defined(HAVE_KQUEUE) - threadpool_io->backend = backend_kqueue; -#else - threadpool_io->backend = backend_poll; + threadpool_io->backend = backend_kqueue; #endif - if (g_getenv ("MONO_DISABLE_AIO") != NULL) - threadpool_io->backend = backend_poll; + } wakeup_pipes_init (); if (!threadpool_io->backend.init (threadpool_io->wakeup_pipes [0])) - g_error ("ensure_initialized: backend->init () failed"); + g_error ("initialize: backend->init () failed"); if (!mono_thread_create_internal (mono_get_root_domain (), selector_thread, NULL, TRUE, SMALL_STACK)) - g_error ("ensure_initialized: mono_thread_create_internal () failed"); - - io_thread_status = STATUS_INITIALIZING; - mono_memory_write_barrier (); - - io_status = STATUS_INITIALIZED; + g_error ("initialize: mono_thread_create_internal () failed"); } static void -ensure_cleanedup (void) +cleanup (void) { - if (io_status == STATUS_NOT_INITIALIZED && InterlockedCompareExchange (&io_status, STATUS_CLEANED_UP, STATUS_NOT_INITIALIZED) == STATUS_NOT_INITIALIZED) - return; - if (io_status == STATUS_INITIALIZING) { - while (io_status == STATUS_INITIALIZING) - mono_thread_info_yield (); - } - if (io_status == STATUS_CLEANED_UP) - return; - if (io_status == STATUS_CLEANING_UP || InterlockedCompareExchange (&io_status, STATUS_CLEANING_UP, STATUS_INITIALIZED) != STATUS_INITIALIZED) { - while (io_status == STATUS_CLEANING_UP) - mono_thread_info_yield (); - g_assert (io_status == STATUS_CLEANED_UP); - return; - } - /* we make the assumption along the code that we are * cleaning up only if the runtime is shutting down */ g_assert (mono_runtime_is_shutting_down ()); selector_thread_wakeup (); - while (io_thread_status != STATUS_CLEANED_UP) + while (io_selector_running) g_usleep (1000); - MONO_GC_UNREGISTER_ROOT (threadpool_io->states); - mono_g_hash_table_destroy (threadpool_io->states); - mono_mutex_destroy (&threadpool_io->states_lock); - - if (threadpool_io->updates) - mono_gc_free_fixed (threadpool_io->updates); mono_mutex_destroy (&threadpool_io->updates_lock); + mono_cond_destroy (&threadpool_io->updates_cond); threadpool_io->backend.cleanup (); @@ -454,8 +581,6 @@ ensure_cleanedup (void) g_free (threadpool_io); threadpool_io = NULL; g_assert (!threadpool_io); - - io_status = STATUS_CLEANED_UP; } static gboolean @@ -464,7 +589,6 @@ is_socket_async_callback (MonoImage *system_image, MonoClass *class) MonoClass *socket_async_callback_class = NULL; socket_async_callback_class = mono_class_from_name (system_image, "System.Net.Sockets", "SocketAsyncCallback"); - g_assert (socket_async_callback_class); return class == socket_async_callback_class; } @@ -475,7 +599,6 @@ is_async_read_handler (MonoImage *system_image, MonoClass *class) MonoClass *async_read_handler_class = NULL; async_read_handler_class = mono_class_from_name (system_image, "System.Diagnostics", "Process/AsyncReadHandler"); - g_assert (async_read_handler_class); return class == async_read_handler_class; } @@ -503,7 +626,7 @@ mono_threadpool_ms_is_io (MonoObject *target, MonoObject *state) void mono_threadpool_ms_io_cleanup (void) { - ensure_cleanedup (); + mono_lazy_cleanup (&io_status, cleanup); } MonoAsyncResult * @@ -516,146 +639,70 @@ mono_threadpool_ms_io_add (MonoAsyncResult *ares, MonoSocketAsyncResult *sockare if (mono_runtime_is_shutting_down ()) return NULL; + if (mono_domain_is_unloading (mono_object_domain (sockares))) + return NULL; - ensure_initialized (); + mono_lazy_initialize (&io_status, initialize); MONO_OBJECT_SETREF (sockares, ares, ares); mono_mutex_lock (&threadpool_io->updates_lock); - threadpool_io->updates_size += 1; - if (threadpool_io->updates_size > threadpool_io->updates_capacity) { - ThreadPoolIOUpdate *updates_new, *updates_old; - gint updates_new_capacity, updates_old_capacity; - - updates_old_capacity = threadpool_io->updates_capacity; - updates_new_capacity = updates_old_capacity + 128; - - updates_old = threadpool_io->updates; - updates_new = mono_gc_alloc_fixed (sizeof (ThreadPoolIOUpdate) * updates_new_capacity, MONO_GC_DESCRIPTOR_NULL); - g_assert (updates_new); + update = update_get_new (); + update->type = UPDATE_ADD; + update->data.add.fd = GPOINTER_TO_INT (sockares->handle); + update->data.add.sockares = sockares; + mono_memory_barrier (); /* Ensure this is safely published before we wake up the selector */ - if (updates_old) - memcpy (updates_new, updates_old, sizeof (ThreadPoolIOUpdate) * updates_old_capacity); - - threadpool_io->updates = updates_new; - threadpool_io->updates_capacity = updates_new_capacity; - - if (updates_old) - mono_gc_free_fixed (updates_old); - } - - update = &threadpool_io->updates [threadpool_io->updates_size - 1]; - update->sockares = sockares; + selector_thread_wakeup (); mono_mutex_unlock (&threadpool_io->updates_lock); - selector_thread_wakeup (); - return ares; } void mono_threadpool_ms_io_remove_socket (int fd) { - MonoMList *list; - gint i; + ThreadPoolIOUpdate *update; - if (io_status != STATUS_INITIALIZED) + if (!mono_lazy_is_initialized (&io_status)) return; - mono_mutex_lock (&threadpool_io->states_lock); mono_mutex_lock (&threadpool_io->updates_lock); - g_assert (threadpool_io->states); - - list = mono_g_hash_table_lookup (threadpool_io->states, GINT_TO_POINTER (fd)); - if (list) - mono_g_hash_table_remove (threadpool_io->states, GINT_TO_POINTER (fd)); + update = update_get_new (); + update->type = UPDATE_REMOVE_SOCKET; + update->data.add.fd = fd; + mono_memory_barrier (); /* Ensure this is safely published before we wake up the selector */ - for (i = 0; i < threadpool_io->updates_size; ++i) { - ThreadPoolIOUpdate *update = &threadpool_io->updates [i]; - - g_assert (update->sockares); - - if (GPOINTER_TO_INT (update->sockares->handle) == fd) { - if (i < threadpool_io->updates_size - 1) - memmove (threadpool_io->updates + i, threadpool_io->updates + i + 1, sizeof (ThreadPoolIOUpdate) * threadpool_io->updates_size - i - 1); - memset (threadpool_io->updates + threadpool_io->updates_size - 1, 0, sizeof (ThreadPoolIOUpdate)); + selector_thread_wakeup (); - threadpool_io->updates_size -= 1; - } - } + mono_cond_wait (&threadpool_io->updates_cond, &threadpool_io->updates_lock); mono_mutex_unlock (&threadpool_io->updates_lock); - mono_mutex_unlock (&threadpool_io->states_lock); - - for (; list; list = mono_mlist_remove_item (list, list)) { - MonoSocketAsyncResult *sockares = (MonoSocketAsyncResult*) mono_mlist_get_data (list); - - if (!sockares) - continue; - - switch (sockares->operation) { - case AIO_OP_RECEIVE: - sockares->operation = AIO_OP_RECV_JUST_CALLBACK; - break; - case AIO_OP_SEND: - sockares->operation = AIO_OP_SEND_JUST_CALLBACK; - break; - } - - mono_threadpool_ms_enqueue_work_item (((MonoObject*) sockares)->vtable->domain, (MonoObject*) sockares); - } -} - -static gboolean -remove_sockstate_for_domain (gpointer key, gpointer value, gpointer user_data) -{ - MonoMList *list; - gboolean remove = FALSE; - - for (list = value; list; list = mono_mlist_next (list)) { - MonoObject *data = mono_mlist_get_data (list); - if (mono_object_domain (data) == user_data) { - remove = TRUE; - mono_mlist_set_data (list, NULL); - } - } - - //FIXME is there some sort of additional unregistration we need to perform here? - return remove; } void mono_threadpool_ms_io_remove_domain_jobs (MonoDomain *domain) { - gint i; + ThreadPoolIOUpdate *update; - if (io_status != STATUS_INITIALIZED) + if (!mono_lazy_is_initialized (&io_status)) return; - mono_mutex_lock (&threadpool_io->states_lock); mono_mutex_lock (&threadpool_io->updates_lock); - mono_g_hash_table_foreach_remove (threadpool_io->states, remove_sockstate_for_domain, domain); - - for (i = 0; i < threadpool_io->updates_size; ++i) { - ThreadPoolIOUpdate *update = &threadpool_io->updates [i]; - - g_assert (update->sockares); + update = update_get_new (); + update->type = UPDATE_REMOVE_DOMAIN; + update->data.remove_domain.domain = domain; + mono_memory_barrier (); /* Ensure this is safely published before we wake up the selector */ - if (mono_object_domain (update->sockares) == domain) { - if (i < threadpool_io->updates_size - 1) - memmove (threadpool_io->updates + i, threadpool_io->updates + i + 1, sizeof (ThreadPoolIOUpdate) * threadpool_io->updates_size - i - 1); - memset (threadpool_io->updates + threadpool_io->updates_size - 1, 0, sizeof (ThreadPoolIOUpdate)); + selector_thread_wakeup (); - threadpool_io->updates_size -= 1; - } - } + mono_cond_wait (&threadpool_io->updates_cond, &threadpool_io->updates_lock); mono_mutex_unlock (&threadpool_io->updates_lock); - mono_mutex_unlock (&threadpool_io->states_lock); } void @@ -710,4 +757,4 @@ icall_append_io_job (MonoObject *target, MonoSocketAsyncResult *state) g_assert_not_reached (); } -#endif \ No newline at end of file +#endif