2 * threadpool-ms-io.c: Microsoft IO threadpool runtime support
5 * Ludovic Henry (ludovic.henry@xamarin.com)
7 * Copyright 2015 Xamarin, Inc (http://www.xamarin.com)
12 #ifndef DISABLE_SOCKETS
16 #if defined(HOST_WIN32)
23 #include <mono/metadata/gc-internal.h>
24 #include <mono/metadata/mono-mlist.h>
25 #include <mono/metadata/threadpool-ms.h>
26 #include <mono/metadata/threadpool-ms-io.h>
27 #include <mono/utils/atomic.h>
28 #include <mono/utils/mono-threads.h>
29 #include <mono/utils/mono-lazy-init.h>
30 #include <mono/utils/mono-logger-internal.h>
33 gboolean (*init) (gint wakeup_pipe_fd);
34 void (*cleanup) (void);
35 void (*register_fd) (gint fd, gint events, gboolean is_new);
36 void (*remove_fd) (gint fd);
37 gint (*event_wait) (void (*callback) (gint fd, gint events, gpointer user_data), gpointer user_data);
38 } ThreadPoolIOBackend;
45 #include "threadpool-ms-io-epoll.c"
46 #include "threadpool-ms-io-kqueue.c"
47 #include "threadpool-ms-io-poll.c"
49 #define UPDATES_CAPACITY 128
51 /* Keep in sync with System.Net.Sockets.Socket.SocketOperation */
60 AIO_OP_RECV_JUST_CALLBACK,
61 AIO_OP_SEND_JUST_CALLBACK,
66 AIO_OP_RECEIVE_BUFFERS,
76 } ThreadPoolIOUpdateType;
80 MonoSocketAsyncResult *sockares;
81 } ThreadPoolIOUpdate_Add;
85 } ThreadPoolIOUpdate_RemoveSocket;
89 } ThreadPoolIOUpdate_RemoveDomain;
92 ThreadPoolIOUpdateType type;
94 ThreadPoolIOUpdate_Add add;
95 ThreadPoolIOUpdate_RemoveSocket remove_socket;
96 ThreadPoolIOUpdate_RemoveDomain remove_domain;
101 ThreadPoolIOBackend backend;
103 ThreadPoolIOUpdate updates [UPDATES_CAPACITY];
105 mono_mutex_t updates_lock;
106 mono_cond_t updates_cond;
108 #if !defined(HOST_WIN32)
109 gint wakeup_pipes [2];
111 SOCKET wakeup_pipes [2];
115 static mono_lazy_init_t io_status = MONO_LAZY_INIT_STATUS_NOT_INITIALIZED;
117 static gboolean io_selector_running = FALSE;
119 static ThreadPoolIO* threadpool_io;
122 get_events_from_sockares (MonoSocketAsyncResult *ares)
124 switch (ares->operation) {
127 case AIO_OP_RECV_JUST_CALLBACK:
128 case AIO_OP_RECEIVEFROM:
129 case AIO_OP_READPIPE:
130 case AIO_OP_ACCEPTRECEIVE:
131 case AIO_OP_RECEIVE_BUFFERS:
134 case AIO_OP_SEND_JUST_CALLBACK:
137 case AIO_OP_SEND_BUFFERS:
138 case AIO_OP_DISCONNECT:
141 g_assert_not_reached ();
145 static MonoSocketAsyncResult*
146 get_sockares_for_event (MonoMList **list, gint event)
152 for (current = *list; current; current = mono_mlist_next (current)) {
153 MonoSocketAsyncResult *ares = (MonoSocketAsyncResult*) mono_mlist_get_data (current);
154 if (get_events_from_sockares (ares) == event) {
155 *list = mono_mlist_remove_item (*list, current);
164 get_events (MonoMList *list)
169 for (current = list; current; current = mono_mlist_next (current))
170 events |= get_events_from_sockares ((MonoSocketAsyncResult*) mono_mlist_get_data (current));
176 selector_thread_wakeup (void)
182 #if !defined(HOST_WIN32)
183 written = write (threadpool_io->wakeup_pipes [1], &msg, 1);
187 g_warning ("selector_thread_wakeup: write () failed, error (%d) %s\n", errno, g_strerror (errno));
191 written = send (threadpool_io->wakeup_pipes [1], &msg, 1, 0);
194 if (written == SOCKET_ERROR) {
195 g_warning ("selector_thread_wakeup: write () failed, error (%d)\n", WSAGetLastError ());
203 selector_thread_wakeup_drain_pipes (void)
209 #if !defined(HOST_WIN32)
210 received = read (threadpool_io->wakeup_pipes [0], buffer, sizeof (buffer));
213 if (received == -1) {
214 if (errno != EINTR && errno != EAGAIN)
215 g_warning ("selector_thread_wakeup_drain_pipes: read () failed, error (%d) %s\n", errno, g_strerror (errno));
219 received = recv (threadpool_io->wakeup_pipes [0], buffer, sizeof (buffer), 0);
222 if (received == SOCKET_ERROR) {
223 if (WSAGetLastError () != WSAEINTR && WSAGetLastError () != WSAEWOULDBLOCK)
224 g_warning ("selector_thread_wakeup_drain_pipes: recv () failed, error (%d) %s\n", WSAGetLastError ());
233 MonoGHashTable *states;
234 } FilterSockaresForDomainData;
237 filter_sockares_for_domain (gpointer key, gpointer value, gpointer user_data)
239 FilterSockaresForDomainData *data;
240 MonoMList *list = value, *element;
242 MonoGHashTable *states;
244 g_assert (user_data);
246 domain = data->domain;
247 states = data->states;
249 for (element = list; element; element = mono_mlist_next (element)) {
250 MonoSocketAsyncResult *sockares = (MonoSocketAsyncResult*) mono_mlist_get_data (element);
251 if (mono_object_domain (sockares) == domain)
252 mono_mlist_set_data (element, NULL);
255 /* we skip all the first elements which are NULL */
256 for (; list; list = mono_mlist_next (list)) {
257 if (mono_mlist_get_data (list))
262 g_assert (mono_mlist_get_data (list));
264 /* we delete all the NULL elements after the first one */
265 for (element = list; element;) {
267 if (!(next = mono_mlist_next (element)))
269 if (mono_mlist_get_data (next))
272 mono_mlist_set_next (element, mono_mlist_next (next));
276 mono_g_hash_table_replace (states, key, list);
280 wait_callback (gint fd, gint events, gpointer user_data)
282 if (mono_runtime_is_shutting_down ())
285 if (fd == threadpool_io->wakeup_pipes [0]) {
286 mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: wke");
287 selector_thread_wakeup_drain_pipes ();
289 MonoGHashTable *states;
290 MonoMList *list = NULL;
293 g_assert (user_data);
296 mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: cal fd %3d, events = %2s | %2s",
297 fd, (events & EVENT_IN) ? "RD" : "..", (events & EVENT_OUT) ? "WR" : "..");
299 if (!mono_g_hash_table_lookup_extended (states, GINT_TO_POINTER (fd), &k, (gpointer*) &list))
300 g_error ("wait_callback: fd %d not found in states table", fd);
302 if (list && (events & EVENT_IN) != 0) {
303 MonoSocketAsyncResult *sockares = get_sockares_for_event (&list, EVENT_IN);
305 mono_threadpool_ms_enqueue_work_item (((MonoObject*) sockares)->vtable->domain, (MonoObject*) sockares);
307 if (list && (events & EVENT_OUT) != 0) {
308 MonoSocketAsyncResult *sockares = get_sockares_for_event (&list, EVENT_OUT);
310 mono_threadpool_ms_enqueue_work_item (((MonoObject*) sockares)->vtable->domain, (MonoObject*) sockares);
313 mono_g_hash_table_replace (states, GINT_TO_POINTER (fd), list);
315 events = get_events (list);
317 mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: res fd %3d, events = %2s | %2s",
318 fd, (events & EVENT_IN) ? "RD" : "..", (events & EVENT_OUT) ? "WR" : "..");
320 threadpool_io->backend.register_fd (fd, events, FALSE);
325 selector_thread (gpointer data)
327 MonoGHashTable *states;
329 io_selector_running = TRUE;
331 if (mono_runtime_is_shutting_down ()) {
332 io_selector_running = FALSE;
336 states = mono_g_hash_table_new_type (g_direct_hash, g_direct_equal, MONO_HASH_VALUE_GC);
342 mono_mutex_lock (&threadpool_io->updates_lock);
344 for (i = 0; i < threadpool_io->updates_size; ++i) {
345 ThreadPoolIOUpdate *update = &threadpool_io->updates [i];
347 switch (update->type) {
355 MonoMList *list = NULL;
356 MonoSocketAsyncResult *sockares;
358 fd = update->data.add.fd;
361 sockares = update->data.add.sockares;
364 exists = mono_g_hash_table_lookup_extended (states, GINT_TO_POINTER (fd), &k, (gpointer*) &list);
365 list = mono_mlist_append (list, (MonoObject*) sockares);
366 mono_g_hash_table_replace (states, sockares->handle, list);
368 events = get_events (list);
370 mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: %3s fd %3d, events = %2s | %2s",
371 exists ? "mod" : "add", fd, (events & EVENT_IN) ? "RD" : "..", (events & EVENT_OUT) ? "WR" : "..");
373 threadpool_io->backend.register_fd (fd, events, !exists);
377 case UPDATE_REMOVE_SOCKET: {
380 MonoMList *list = NULL;
382 fd = update->data.remove_socket.fd;
385 if (mono_g_hash_table_lookup_extended (states, GINT_TO_POINTER (fd), &k, (gpointer*) &list)) {
386 mono_g_hash_table_remove (states, GINT_TO_POINTER (fd));
388 for (j = i + 1; j < threadpool_io->updates_size; ++j) {
389 ThreadPoolIOUpdate *update = &threadpool_io->updates [j];
390 if (update->type == UPDATE_ADD && update->data.add.fd == fd)
391 memset (update, 0, sizeof (ThreadPoolIOUpdate));
394 for (; list; list = mono_mlist_remove_item (list, list))
395 mono_threadpool_ms_enqueue_work_item (mono_object_domain (mono_mlist_get_data (list)), mono_mlist_get_data (list));
397 mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: del fd %3d", fd);
398 threadpool_io->backend.remove_fd (fd);
403 case UPDATE_REMOVE_DOMAIN: {
406 domain = update->data.remove_domain.domain;
409 FilterSockaresForDomainData user_data = { .domain = domain, .states = states };
410 mono_g_hash_table_foreach (states, filter_sockares_for_domain, &user_data);
412 for (j = i + 1; j < threadpool_io->updates_size; ++j) {
413 ThreadPoolIOUpdate *update = &threadpool_io->updates [j];
414 if (update->type == UPDATE_ADD && mono_object_domain (update->data.add.sockares) == domain)
415 memset (update, 0, sizeof (ThreadPoolIOUpdate));
421 g_assert_not_reached ();
425 mono_cond_broadcast (&threadpool_io->updates_cond);
427 if (threadpool_io->updates_size > 0) {
428 threadpool_io->updates_size = 0;
429 memset (&threadpool_io->updates, 0, UPDATES_CAPACITY * sizeof (ThreadPoolIOUpdate));
432 mono_mutex_unlock (&threadpool_io->updates_lock);
434 mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: wai");
436 res = threadpool_io->backend.event_wait (wait_callback, states);
438 if (res == -1 || mono_runtime_is_shutting_down ())
442 mono_g_hash_table_destroy (states);
444 io_selector_running = FALSE;
447 /* Locking: threadpool_io->updates_lock must be held */
448 static ThreadPoolIOUpdate*
449 update_get_new (void)
451 ThreadPoolIOUpdate *update = NULL;
452 g_assert (threadpool_io->updates_size <= UPDATES_CAPACITY);
454 while (threadpool_io->updates_size == UPDATES_CAPACITY) {
455 /* we wait for updates to be applied in the selector_thread and we loop
456 * as long as none are available. if it happends too much, then we need
457 * to increase UPDATES_CAPACITY */
458 mono_cond_wait (&threadpool_io->updates_cond, &threadpool_io->updates_lock);
461 g_assert (threadpool_io->updates_size < UPDATES_CAPACITY);
463 update = &threadpool_io->updates [threadpool_io->updates_size ++];
469 wakeup_pipes_init (void)
471 #if !defined(HOST_WIN32)
472 if (pipe (threadpool_io->wakeup_pipes) == -1)
473 g_error ("wakeup_pipes_init: pipe () failed, error (%d) %s\n", errno, g_strerror (errno));
474 if (fcntl (threadpool_io->wakeup_pipes [0], F_SETFL, O_NONBLOCK) == -1)
475 g_error ("wakeup_pipes_init: fcntl () failed, error (%d) %s\n", errno, g_strerror (errno));
477 struct sockaddr_in client;
478 struct sockaddr_in server;
483 server_sock = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
484 g_assert (server_sock != INVALID_SOCKET);
485 threadpool_io->wakeup_pipes [1] = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
486 g_assert (threadpool_io->wakeup_pipes [1] != INVALID_SOCKET);
488 server.sin_family = AF_INET;
489 server.sin_addr.s_addr = inet_addr ("127.0.0.1");
491 if (bind (server_sock, (SOCKADDR*) &server, sizeof (server)) == SOCKET_ERROR) {
492 closesocket (server_sock);
493 g_error ("wakeup_pipes_init: bind () failed, error (%d)\n", WSAGetLastError ());
496 size = sizeof (server);
497 if (getsockname (server_sock, (SOCKADDR*) &server, &size) == SOCKET_ERROR) {
498 closesocket (server_sock);
499 g_error ("wakeup_pipes_init: getsockname () failed, error (%d)\n", WSAGetLastError ());
501 if (listen (server_sock, 1024) == SOCKET_ERROR) {
502 closesocket (server_sock);
503 g_error ("wakeup_pipes_init: listen () failed, error (%d)\n", WSAGetLastError ());
505 if (connect ((SOCKET) threadpool_io->wakeup_pipes [1], (SOCKADDR*) &server, sizeof (server)) == SOCKET_ERROR) {
506 closesocket (server_sock);
507 g_error ("wakeup_pipes_init: connect () failed, error (%d)\n", WSAGetLastError ());
510 size = sizeof (client);
511 threadpool_io->wakeup_pipes [0] = accept (server_sock, (SOCKADDR *) &client, &size);
512 g_assert (threadpool_io->wakeup_pipes [0] != INVALID_SOCKET);
515 if (ioctlsocket (threadpool_io->wakeup_pipes [0], FIONBIO, &arg) == SOCKET_ERROR) {
516 closesocket (threadpool_io->wakeup_pipes [0]);
517 closesocket (server_sock);
518 g_error ("wakeup_pipes_init: ioctlsocket () failed, error (%d)\n", WSAGetLastError ());
521 closesocket (server_sock);
528 g_assert (!threadpool_io);
529 threadpool_io = g_new0 (ThreadPoolIO, 1);
530 g_assert (threadpool_io);
532 mono_mutex_init_recursive (&threadpool_io->updates_lock);
533 mono_cond_init (&threadpool_io->updates_cond, NULL);
534 mono_gc_register_root ((void*)&threadpool_io->updates [0], sizeof (threadpool_io->updates), MONO_GC_DESCRIPTOR_NULL);
536 threadpool_io->updates_size = 0;
538 threadpool_io->backend = backend_poll;
539 if (g_getenv ("MONO_ENABLE_AIO") != NULL) {
540 #if defined(HAVE_EPOLL)
541 threadpool_io->backend = backend_epoll;
542 #elif defined(HAVE_KQUEUE)
543 threadpool_io->backend = backend_kqueue;
547 wakeup_pipes_init ();
549 if (!threadpool_io->backend.init (threadpool_io->wakeup_pipes [0]))
550 g_error ("initialize: backend->init () failed");
552 if (!mono_thread_create_internal (mono_get_root_domain (), selector_thread, NULL, TRUE, SMALL_STACK))
553 g_error ("initialize: mono_thread_create_internal () failed");
559 /* we make the assumption along the code that we are
560 * cleaning up only if the runtime is shutting down */
561 g_assert (mono_runtime_is_shutting_down ());
563 selector_thread_wakeup ();
564 while (io_selector_running)
567 mono_mutex_destroy (&threadpool_io->updates_lock);
568 mono_cond_destroy (&threadpool_io->updates_cond);
570 threadpool_io->backend.cleanup ();
572 #if !defined(HOST_WIN32)
573 close (threadpool_io->wakeup_pipes [0]);
574 close (threadpool_io->wakeup_pipes [1]);
576 closesocket (threadpool_io->wakeup_pipes [0]);
577 closesocket (threadpool_io->wakeup_pipes [1]);
580 g_assert (threadpool_io);
581 g_free (threadpool_io);
582 threadpool_io = NULL;
583 g_assert (!threadpool_io);
587 is_socket_async_callback (MonoImage *system_image, MonoClass *class)
589 MonoClass *socket_async_callback_class = NULL;
591 socket_async_callback_class = mono_class_from_name (system_image, "System.Net.Sockets", "SocketAsyncCallback");
593 return class == socket_async_callback_class;
597 is_async_read_handler (MonoImage *system_image, MonoClass *class)
599 MonoClass *async_read_handler_class = NULL;
601 async_read_handler_class = mono_class_from_name (system_image, "System.Diagnostics", "Process/AsyncReadHandler");
603 return class == async_read_handler_class;
607 mono_threadpool_ms_is_io (MonoObject *target, MonoObject *state)
609 MonoImage *system_image;
610 MonoSocketAsyncResult *sockares;
612 system_image = mono_image_loaded ("System");
616 if (!is_socket_async_callback (system_image, target->vtable->klass) && !is_async_read_handler (system_image, target->vtable->klass))
619 sockares = (MonoSocketAsyncResult*) state;
620 if (sockares->operation < AIO_OP_FIRST || sockares->operation >= AIO_OP_LAST)
627 mono_threadpool_ms_io_cleanup (void)
629 mono_lazy_cleanup (&io_status, cleanup);
633 mono_threadpool_ms_io_add (MonoAsyncResult *ares, MonoSocketAsyncResult *sockares)
635 ThreadPoolIOUpdate *update;
640 if (mono_runtime_is_shutting_down ())
642 if (mono_domain_is_unloading (mono_object_domain (sockares)))
645 mono_lazy_initialize (&io_status, initialize);
647 MONO_OBJECT_SETREF (sockares, ares, ares);
649 mono_mutex_lock (&threadpool_io->updates_lock);
651 update = update_get_new ();
652 update->type = UPDATE_ADD;
653 update->data.add.fd = GPOINTER_TO_INT (sockares->handle);
654 update->data.add.sockares = sockares;
655 mono_memory_barrier (); /* Ensure this is safely published before we wake up the selector */
657 selector_thread_wakeup ();
659 mono_mutex_unlock (&threadpool_io->updates_lock);
665 mono_threadpool_ms_io_remove_socket (int fd)
667 ThreadPoolIOUpdate *update;
669 if (!mono_lazy_is_initialized (&io_status))
672 mono_mutex_lock (&threadpool_io->updates_lock);
674 update = update_get_new ();
675 update->type = UPDATE_REMOVE_SOCKET;
676 update->data.add.fd = fd;
677 mono_memory_barrier (); /* Ensure this is safely published before we wake up the selector */
679 selector_thread_wakeup ();
681 mono_cond_wait (&threadpool_io->updates_cond, &threadpool_io->updates_lock);
683 mono_mutex_unlock (&threadpool_io->updates_lock);
687 mono_threadpool_ms_io_remove_domain_jobs (MonoDomain *domain)
689 ThreadPoolIOUpdate *update;
691 if (!mono_lazy_is_initialized (&io_status))
694 mono_mutex_lock (&threadpool_io->updates_lock);
696 update = update_get_new ();
697 update->type = UPDATE_REMOVE_DOMAIN;
698 update->data.remove_domain.domain = domain;
699 mono_memory_barrier (); /* Ensure this is safely published before we wake up the selector */
701 selector_thread_wakeup ();
703 mono_cond_wait (&threadpool_io->updates_cond, &threadpool_io->updates_lock);
705 mono_mutex_unlock (&threadpool_io->updates_lock);
709 icall_append_io_job (MonoObject *target, MonoSocketAsyncResult *state)
711 MonoAsyncResult *ares;
713 /* Don't call mono_async_result_new() to avoid capturing the context */
714 ares = (MonoAsyncResult *) mono_object_new (mono_domain_get (), mono_defaults.asyncresult_class);
715 MONO_OBJECT_SETREF (ares, async_delegate, target);
716 MONO_OBJECT_SETREF (ares, async_state, state);
718 mono_threadpool_ms_io_add (ares, state);
725 mono_threadpool_ms_is_io (MonoObject *target, MonoObject *state)
731 mono_threadpool_ms_io_cleanup (void)
733 g_assert_not_reached ();
737 mono_threadpool_ms_io_add (MonoAsyncResult *ares, MonoSocketAsyncResult *sockares)
739 g_assert_not_reached ();
743 mono_threadpool_ms_io_remove_socket (int fd)
745 g_assert_not_reached ();
749 mono_threadpool_ms_io_remove_domain_jobs (MonoDomain *domain)
751 g_assert_not_reached ();
755 icall_append_io_job (MonoObject *target, MonoSocketAsyncResult *state)
757 g_assert_not_reached ();