2 * threadpool-ms-io.c: Microsoft IO threadpool runtime support
5 * Ludovic Henry (ludovic.henry@xamarin.com)
7 * Copyright 2015 Xamarin, Inc (http://www.xamarin.com)
12 #ifndef DISABLE_SOCKETS
16 #if defined(HOST_WIN32)
23 #include <mono/metadata/gc-internal.h>
24 #include <mono/metadata/mono-mlist.h>
25 #include <mono/metadata/threadpool-ms.h>
26 #include <mono/metadata/threadpool-ms-io.h>
27 #include <mono/utils/atomic.h>
28 #include <mono/utils/mono-poll.h>
29 #include <mono/utils/mono-threads.h>
30 #include <mono/utils/mono-lazy-init.h>
33 gboolean (*init) (gint wakeup_pipe_fd);
34 void (*cleanup) (void);
35 void (*register_fd) (gint fd, gint events, gboolean is_new);
36 gint (*event_wait) (void);
37 gint (*event_get_fd_max) (void);
38 gint (*event_get_fd_at) (gint i, gint *events);
39 } ThreadPoolIOBackend;
41 #include "threadpool-ms-io-epoll.c"
42 #include "threadpool-ms-io-kqueue.c"
43 #include "threadpool-ms-io-poll.c"
45 /* Keep in sync with System.Net.Sockets.Socket.SocketOperation */
54 AIO_OP_RECV_JUST_CALLBACK,
55 AIO_OP_SEND_JUST_CALLBACK,
60 AIO_OP_RECEIVE_BUFFERS,
67 MonoSocketAsyncResult *sockares;
71 ThreadPoolIOBackend backend;
75 mono_cond_t updates_signal;
77 MonoGHashTable *states;
79 ThreadPoolIOUpdate *updates;
81 guint updates_capacity;
83 #if !defined(HOST_WIN32)
84 gint wakeup_pipes [2];
86 SOCKET wakeup_pipes [2];
90 static mono_lazy_init_t io_status = MONO_LAZY_INIT_STATUS_NOT_INITIALIZED;
92 static gboolean io_selector_running = FALSE;
94 static ThreadPoolIO* threadpool_io;
97 get_events_from_sockares (MonoSocketAsyncResult *ares)
99 switch (ares->operation) {
102 case AIO_OP_RECV_JUST_CALLBACK:
103 case AIO_OP_RECEIVEFROM:
104 case AIO_OP_READPIPE:
105 case AIO_OP_ACCEPTRECEIVE:
106 case AIO_OP_RECEIVE_BUFFERS:
109 case AIO_OP_SEND_JUST_CALLBACK:
112 case AIO_OP_SEND_BUFFERS:
113 case AIO_OP_DISCONNECT:
116 g_assert_not_reached ();
120 static MonoSocketAsyncResult*
121 get_sockares_for_event (MonoMList **list, gint event)
127 for (current = *list; current; current = mono_mlist_next (current)) {
128 MonoSocketAsyncResult *ares = (MonoSocketAsyncResult*) mono_mlist_get_data (current);
129 if (get_events_from_sockares (ares) == event) {
130 *list = mono_mlist_remove_item (*list, current);
139 get_events (MonoMList *list)
144 for (current = list; current; current = mono_mlist_next (current)) {
145 MonoSocketAsyncResult *ares = (MonoSocketAsyncResult*) mono_mlist_get_data (current);
147 events |= get_events_from_sockares (ares);
154 selector_thread_wakeup (void);
157 * If sockares is NULL, then it means we want to delete the corresponding fd
160 update_add (gint fd, MonoSocketAsyncResult *sockares)
162 ThreadPoolIOUpdate *update;
164 mono_mutex_lock (&threadpool_io->lock);
166 threadpool_io->updates_size += 1;
167 if (threadpool_io->updates_size > threadpool_io->updates_capacity) {
168 ThreadPoolIOUpdate *updates_new, *updates_old;
169 gint updates_new_capacity, updates_old_capacity;
171 updates_old_capacity = threadpool_io->updates_capacity;
172 updates_new_capacity = updates_old_capacity + 16;
174 updates_old = threadpool_io->updates;
175 updates_new = mono_gc_alloc_fixed (sizeof (ThreadPoolIOUpdate) * updates_new_capacity, MONO_GC_DESCRIPTOR_NULL);
176 g_assert (updates_new);
179 memcpy (updates_new, updates_old, sizeof (ThreadPoolIOUpdate) * updates_old_capacity);
181 threadpool_io->updates = updates_new;
182 threadpool_io->updates_capacity = updates_new_capacity;
185 mono_gc_free_fixed (updates_old);
188 update = &threadpool_io->updates [threadpool_io->updates_size - 1];
190 update->sockares = sockares;
192 selector_thread_wakeup ();
194 mono_cond_wait (&threadpool_io->updates_signal, &threadpool_io->lock);
196 mono_mutex_unlock (&threadpool_io->lock);
200 update_drain (void (*callback) (gint fd, gint events, gboolean is_new))
204 mono_mutex_lock (&threadpool_io->lock);
206 for (i = 0; i < threadpool_io->updates_size; ++i) {
207 ThreadPoolIOUpdate *update;
208 MonoMList *list = NULL;
212 update = &threadpool_io->updates [i];
214 is_new = !mono_g_hash_table_lookup_extended (threadpool_io->states, GINT_TO_POINTER (update->fd), &k, (gpointer*) &list);
216 if (!update->sockares) {
217 callback (update->fd, 0, is_new);
219 list = mono_mlist_append (list, (MonoObject*) update->sockares);
220 mono_g_hash_table_replace (threadpool_io->states, update->sockares->handle, list);
222 callback (update->fd, get_events (list), is_new);
226 mono_cond_broadcast (&threadpool_io->updates_signal);
228 if (threadpool_io->updates_size > 0) {
229 ThreadPoolIOUpdate *updates_old;
231 threadpool_io->updates_size = 0;
232 threadpool_io->updates_capacity = 16;
234 updates_old = threadpool_io->updates;
236 threadpool_io->updates = mono_gc_alloc_fixed (sizeof (ThreadPoolIOUpdate) * threadpool_io->updates_capacity, MONO_GC_DESCRIPTOR_NULL);
237 g_assert (threadpool_io->updates);
239 mono_gc_free_fixed (updates_old);
242 mono_mutex_unlock (&threadpool_io->lock);
246 update_remove (gboolean (*predicate) (ThreadPoolIOUpdate *update, gpointer user_data), gpointer user_data)
250 mono_mutex_lock (&threadpool_io->lock);
252 for (i = 0; i < threadpool_io->updates_size; ++i) {
253 if (predicate (&threadpool_io->updates [i], user_data)) {
254 if (i < threadpool_io->updates_size - 1)
255 memmove (threadpool_io->updates + i, threadpool_io->updates + i + 1, sizeof (ThreadPoolIOUpdate) * threadpool_io->updates_size - i - 1);
256 memset (threadpool_io->updates + threadpool_io->updates_size - 1, 0, sizeof (ThreadPoolIOUpdate));
258 threadpool_io->updates_size --;
263 mono_mutex_unlock (&threadpool_io->lock);
267 selector_thread_wakeup (void)
273 #if !defined(HOST_WIN32)
274 written = write (threadpool_io->wakeup_pipes [1], &msg, 1);
278 g_warning ("selector_thread_wakeup: write () failed, error (%d) %s\n", errno, g_strerror (errno));
282 written = send (threadpool_io->wakeup_pipes [1], &msg, 1, 0);
285 if (written == SOCKET_ERROR) {
286 g_warning ("selector_thread_wakeup: write () failed, error (%d)\n", WSAGetLastError ());
294 selector_thread_wakeup_drain_pipes (void)
300 #if !defined(HOST_WIN32)
301 received = read (threadpool_io->wakeup_pipes [0], buffer, sizeof (buffer));
304 if (received == -1) {
305 if (errno != EINTR && errno != EAGAIN)
306 g_warning ("selector_thread_wakeup_drain_pipes: read () failed, error (%d) %s\n", errno, g_strerror (errno));
310 received = recv (threadpool_io->wakeup_pipes [0], buffer, sizeof (buffer), 0);
313 if (received == SOCKET_ERROR) {
314 if (WSAGetLastError () != WSAEINTR && WSAGetLastError () != WSAEWOULDBLOCK)
315 g_warning ("selector_thread_wakeup_drain_pipes: recv () failed, error (%d) %s\n", WSAGetLastError ());
323 selector_thread (gpointer data)
325 io_selector_running = TRUE;
327 if (mono_runtime_is_shutting_down ()) {
328 io_selector_running = FALSE;
332 mono_mutex_lock (&threadpool_io->lock);
339 update_drain (threadpool_io->backend.register_fd);
341 mono_mutex_unlock (&threadpool_io->lock);
343 mono_gc_set_skip_thread (TRUE);
345 ready = threadpool_io->backend.event_wait ();
347 mono_gc_set_skip_thread (FALSE);
349 mono_mutex_lock (&threadpool_io->lock);
351 if (ready == -1 || mono_runtime_is_shutting_down ())
354 max = threadpool_io->backend.event_get_fd_max ();
356 for (i = 0; i < max && ready > 0; ++i) {
358 gint fd = threadpool_io->backend.event_get_fd_at (i, &events);
363 if (fd == threadpool_io->wakeup_pipes [0]) {
364 selector_thread_wakeup_drain_pipes ();
366 MonoMList *list = NULL;
369 if (mono_g_hash_table_lookup_extended (threadpool_io->states, GINT_TO_POINTER (fd), &k, (gpointer*) &list)) {
370 if (list && (events & MONO_POLLIN) != 0) {
371 MonoSocketAsyncResult *sockares = get_sockares_for_event (&list, MONO_POLLIN);
373 mono_threadpool_ms_enqueue_work_item (((MonoObject*) sockares)->vtable->domain, (MonoObject*) sockares);
375 if (list && (events & MONO_POLLOUT) != 0) {
376 MonoSocketAsyncResult *sockares = get_sockares_for_event (&list, MONO_POLLOUT);
378 mono_threadpool_ms_enqueue_work_item (((MonoObject*) sockares)->vtable->domain, (MonoObject*) sockares);
382 mono_g_hash_table_remove (threadpool_io->states, GINT_TO_POINTER (fd));
384 mono_g_hash_table_replace (threadpool_io->states, GINT_TO_POINTER (fd), list);
386 threadpool_io->backend.register_fd (fd, get_events (list), FALSE);
394 mono_mutex_unlock (&threadpool_io->lock);
396 io_selector_running = FALSE;
400 wakeup_pipes_init (void)
402 #if !defined(HOST_WIN32)
403 if (pipe (threadpool_io->wakeup_pipes) == -1)
404 g_error ("wakeup_pipes_init: pipe () failed, error (%d) %s\n", errno, g_strerror (errno));
405 if (fcntl (threadpool_io->wakeup_pipes [0], F_SETFL, O_NONBLOCK) == -1)
406 g_error ("wakeup_pipes_init: fcntl () failed, error (%d) %s\n", errno, g_strerror (errno));
408 struct sockaddr_in client;
409 struct sockaddr_in server;
414 server_sock = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
415 g_assert (server_sock != INVALID_SOCKET);
416 threadpool_io->wakeup_pipes [1] = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
417 g_assert (threadpool_io->wakeup_pipes [1] != INVALID_SOCKET);
419 server.sin_family = AF_INET;
420 server.sin_addr.s_addr = inet_addr ("127.0.0.1");
422 if (bind (server_sock, (SOCKADDR*) &server, sizeof (server)) == SOCKET_ERROR) {
423 closesocket (server_sock);
424 g_error ("wakeup_pipes_init: bind () failed, error (%d)\n", WSAGetLastError ());
427 size = sizeof (server);
428 if (getsockname (server_sock, (SOCKADDR*) &server, &size) == SOCKET_ERROR) {
429 closesocket (server_sock);
430 g_error ("wakeup_pipes_init: getsockname () failed, error (%d)\n", WSAGetLastError ());
432 if (listen (server_sock, 1024) == SOCKET_ERROR) {
433 closesocket (server_sock);
434 g_error ("wakeup_pipes_init: listen () failed, error (%d)\n", WSAGetLastError ());
436 if (connect ((SOCKET) threadpool_io->wakeup_pipes [1], (SOCKADDR*) &server, sizeof (server)) == SOCKET_ERROR) {
437 closesocket (server_sock);
438 g_error ("wakeup_pipes_init: connect () failed, error (%d)\n", WSAGetLastError ());
441 size = sizeof (client);
442 threadpool_io->wakeup_pipes [0] = accept (server_sock, (SOCKADDR *) &client, &size);
443 g_assert (threadpool_io->wakeup_pipes [0] != INVALID_SOCKET);
446 if (ioctlsocket (threadpool_io->wakeup_pipes [0], FIONBIO, &arg) == SOCKET_ERROR) {
447 closesocket (threadpool_io->wakeup_pipes [0]);
448 closesocket (server_sock);
449 g_error ("wakeup_pipes_init: ioctlsocket () failed, error (%d)\n", WSAGetLastError ());
452 closesocket (server_sock);
459 g_assert (!threadpool_io);
460 threadpool_io = g_new0 (ThreadPoolIO, 1);
461 g_assert (threadpool_io);
463 mono_mutex_init_recursive (&threadpool_io->lock);
465 mono_cond_init (&threadpool_io->updates_signal, NULL);
467 threadpool_io->states = mono_g_hash_table_new_type (g_direct_hash, g_direct_equal, MONO_HASH_VALUE_GC);
468 MONO_GC_REGISTER_ROOT_FIXED (threadpool_io->states);
470 threadpool_io->updates = NULL;
471 threadpool_io->updates_size = 0;
472 threadpool_io->updates_capacity = 0;
474 #if defined(HAVE_EPOLL)
475 threadpool_io->backend = backend_epoll;
476 #elif defined(HAVE_KQUEUE)
477 threadpool_io->backend = backend_kqueue;
479 threadpool_io->backend = backend_poll;
481 if (g_getenv ("MONO_DISABLE_AIO") != NULL)
482 threadpool_io->backend = backend_poll;
484 wakeup_pipes_init ();
486 if (!threadpool_io->backend.init (threadpool_io->wakeup_pipes [0]))
487 g_error ("initialize: backend->init () failed");
489 if (!mono_thread_create_internal (mono_get_root_domain (), selector_thread, NULL, TRUE, SMALL_STACK))
490 g_error ("initialize: mono_thread_create_internal () failed");
496 /* we make the assumption along the code that we are
497 * cleaning up only if the runtime is shutting down */
498 g_assert (mono_runtime_is_shutting_down ());
500 selector_thread_wakeup ();
501 while (io_selector_running)
504 mono_mutex_destroy (&threadpool_io->lock);
506 mono_cond_destroy (&threadpool_io->updates_signal);
508 MONO_GC_UNREGISTER_ROOT (threadpool_io->states);
509 mono_g_hash_table_destroy (threadpool_io->states);
511 if (threadpool_io->updates)
512 mono_gc_free_fixed (threadpool_io->updates);
514 threadpool_io->backend.cleanup ();
516 #if !defined(HOST_WIN32)
517 close (threadpool_io->wakeup_pipes [0]);
518 close (threadpool_io->wakeup_pipes [1]);
520 closesocket (threadpool_io->wakeup_pipes [0]);
521 closesocket (threadpool_io->wakeup_pipes [1]);
524 g_assert (threadpool_io);
525 g_free (threadpool_io);
526 threadpool_io = NULL;
527 g_assert (!threadpool_io);
531 is_socket_async_callback (MonoImage *system_image, MonoClass *class)
533 MonoClass *socket_async_callback_class = NULL;
535 socket_async_callback_class = mono_class_from_name (system_image, "System.Net.Sockets", "SocketAsyncCallback");
537 return class == socket_async_callback_class;
541 is_async_read_handler (MonoImage *system_image, MonoClass *class)
543 MonoClass *async_read_handler_class = NULL;
545 async_read_handler_class = mono_class_from_name (system_image, "System.Diagnostics", "Process/AsyncReadHandler");
547 return class == async_read_handler_class;
551 mono_threadpool_ms_is_io (MonoObject *target, MonoObject *state)
553 MonoImage *system_image;
554 MonoSocketAsyncResult *sockares;
556 system_image = mono_image_loaded ("System");
560 if (!is_socket_async_callback (system_image, target->vtable->klass) && !is_async_read_handler (system_image, target->vtable->klass))
563 sockares = (MonoSocketAsyncResult*) state;
564 if (sockares->operation < AIO_OP_FIRST || sockares->operation >= AIO_OP_LAST)
571 mono_threadpool_ms_io_cleanup (void)
573 mono_lazy_cleanup (&io_status, cleanup);
577 mono_threadpool_ms_io_add (MonoAsyncResult *ares, MonoSocketAsyncResult *sockares)
582 if (mono_runtime_is_shutting_down ())
585 mono_lazy_initialize (&io_status, initialize);
587 MONO_OBJECT_SETREF (sockares, ares, ares);
589 update_add (GPOINTER_TO_INT (sockares->handle), sockares);
595 remove_update_for_socket (ThreadPoolIOUpdate *update, gpointer user_data)
597 if (!update->sockares)
600 return GPOINTER_TO_INT (update->sockares->handle) == GPOINTER_TO_INT (user_data);
604 mono_threadpool_ms_io_remove_socket (int fd)
606 MonoMList *list = NULL;
609 if (!mono_lazy_is_initialized (&io_status))
612 mono_mutex_lock (&threadpool_io->lock);
614 g_assert (threadpool_io->states);
616 if (mono_g_hash_table_lookup_extended (threadpool_io->states, GINT_TO_POINTER (fd), &k, (gpointer*) &list))
617 mono_g_hash_table_remove (threadpool_io->states, GINT_TO_POINTER (fd));
619 update_remove (remove_update_for_socket, GINT_TO_POINTER (fd));
621 mono_mutex_unlock (&threadpool_io->lock);
623 for (; list; list = mono_mlist_remove_item (list, list)) {
624 MonoSocketAsyncResult *sockares = (MonoSocketAsyncResult*) mono_mlist_get_data (list);
629 switch (sockares->operation) {
631 sockares->operation = AIO_OP_RECV_JUST_CALLBACK;
634 sockares->operation = AIO_OP_SEND_JUST_CALLBACK;
638 mono_threadpool_ms_enqueue_work_item (((MonoObject*) sockares)->vtable->domain, (MonoObject*) sockares);
641 update_add (fd, NULL);
645 remove_sockstate_for_domain (gpointer key, gpointer value, gpointer user_data)
648 gboolean remove = FALSE;
650 for (list = value; list; list = mono_mlist_next (list)) {
651 MonoObject *data = mono_mlist_get_data (list);
652 if (mono_object_domain (data) == user_data) {
654 mono_mlist_set_data (list, NULL);
658 //FIXME is there some sort of additional unregistration we need to perform here?
663 remove_update_for_domain (ThreadPoolIOUpdate *update, gpointer user_data)
665 if (!update->sockares)
668 return mono_object_domain (update->sockares) == (MonoDomain*) user_data;
672 mono_threadpool_ms_io_remove_domain_jobs (MonoDomain *domain)
674 if (!mono_lazy_is_initialized (&io_status))
677 mono_mutex_lock (&threadpool_io->lock);
679 mono_g_hash_table_foreach_remove (threadpool_io->states, remove_sockstate_for_domain, domain);
681 update_remove (remove_update_for_domain, domain);
683 mono_mutex_unlock (&threadpool_io->lock);
687 icall_append_io_job (MonoObject *target, MonoSocketAsyncResult *state)
689 MonoAsyncResult *ares;
691 /* Don't call mono_async_result_new() to avoid capturing the context */
692 ares = (MonoAsyncResult *) mono_object_new (mono_domain_get (), mono_defaults.asyncresult_class);
693 MONO_OBJECT_SETREF (ares, async_delegate, target);
694 MONO_OBJECT_SETREF (ares, async_state, state);
696 mono_threadpool_ms_io_add (ares, state);
703 mono_threadpool_ms_is_io (MonoObject *target, MonoObject *state)
709 mono_threadpool_ms_io_cleanup (void)
711 g_assert_not_reached ();
715 mono_threadpool_ms_io_add (MonoAsyncResult *ares, MonoSocketAsyncResult *sockares)
717 g_assert_not_reached ();
721 mono_threadpool_ms_io_remove_socket (int fd)
723 g_assert_not_reached ();
727 mono_threadpool_ms_io_remove_domain_jobs (MonoDomain *domain)
729 g_assert_not_reached ();
733 icall_append_io_job (MonoObject *target, MonoSocketAsyncResult *state)
735 g_assert_not_reached ();