2 * threadpool-ms-io.c: Microsoft IO threadpool runtime support
5 * Ludovic Henry (ludovic.henry@xamarin.com)
7 * Copyright 2015 Xamarin, Inc (http://www.xamarin.com)
12 #ifndef DISABLE_SOCKETS
16 #if defined(HOST_WIN32)
23 #include <mono/metadata/gc-internals.h>
24 #include <mono/metadata/mono-mlist.h>
25 #include <mono/metadata/threadpool-ms.h>
26 #include <mono/metadata/threadpool-ms-io.h>
27 #include <mono/utils/atomic.h>
28 #include <mono/utils/mono-threads.h>
29 #include <mono/utils/mono-lazy-init.h>
30 #include <mono/utils/mono-logger-internals.h>
33 gboolean (*init) (gint wakeup_pipe_fd);
34 void (*cleanup) (void);
35 void (*register_fd) (gint fd, gint events, gboolean is_new);
36 void (*remove_fd) (gint fd);
37 gint (*event_wait) (void (*callback) (gint fd, gint events, gpointer user_data), gpointer user_data);
38 } ThreadPoolIOBackend;
40 /* Keep in sync with System.IOOperation in mcs/class/System/System/IOSelector.cs */
41 enum MonoIOOperation {
44 EVENT_ERR = 1 << 2, /* not in managed */
47 #include "threadpool-ms-io-epoll.c"
48 #include "threadpool-ms-io-kqueue.c"
49 #include "threadpool-ms-io-poll.c"
51 #define UPDATES_CAPACITY 128
53 /* Keep in sync with System.IOSelectorJob in mcs/class/System/System/IOSelector.cs */
54 struct _MonoIOSelectorJob {
66 } ThreadPoolIOUpdateType;
70 MonoIOSelectorJob *job;
71 } ThreadPoolIOUpdate_Add;
75 } ThreadPoolIOUpdate_RemoveSocket;
79 } ThreadPoolIOUpdate_RemoveDomain;
82 ThreadPoolIOUpdateType type;
84 ThreadPoolIOUpdate_Add add;
85 ThreadPoolIOUpdate_RemoveSocket remove_socket;
86 ThreadPoolIOUpdate_RemoveDomain remove_domain;
91 ThreadPoolIOBackend backend;
93 ThreadPoolIOUpdate updates [UPDATES_CAPACITY];
95 MonoCoopMutex updates_lock;
96 MonoCoopCond updates_cond;
98 #if !defined(HOST_WIN32)
99 gint wakeup_pipes [2];
101 SOCKET wakeup_pipes [2];
105 static mono_lazy_init_t io_status = MONO_LAZY_INIT_STATUS_NOT_INITIALIZED;
107 static gboolean io_selector_running = FALSE;
109 static ThreadPoolIO* threadpool_io;
111 static MonoIOSelectorJob*
112 get_job_for_event (MonoMList **list, gint32 event)
118 for (current = *list; current; current = mono_mlist_next (current)) {
119 MonoIOSelectorJob *job = (MonoIOSelectorJob*) mono_mlist_get_data (current);
120 if (job->operation == event) {
121 *list = mono_mlist_remove_item (*list, current);
130 get_operations_for_jobs (MonoMList *list)
135 for (current = list; current; current = mono_mlist_next (current))
136 operations |= ((MonoIOSelectorJob*) mono_mlist_get_data (current))->operation;
142 selector_thread_wakeup (void)
148 #if !defined(HOST_WIN32)
149 written = write (threadpool_io->wakeup_pipes [1], &msg, 1);
153 g_warning ("selector_thread_wakeup: write () failed, error (%d) %s\n", errno, g_strerror (errno));
157 written = send (threadpool_io->wakeup_pipes [1], &msg, 1, 0);
160 if (written == SOCKET_ERROR) {
161 g_warning ("selector_thread_wakeup: write () failed, error (%d)\n", WSAGetLastError ());
169 selector_thread_wakeup_drain_pipes (void)
175 #if !defined(HOST_WIN32)
176 received = read (threadpool_io->wakeup_pipes [0], buffer, sizeof (buffer));
179 if (received == -1) {
180 if (errno != EINTR && errno != EAGAIN)
181 g_warning ("selector_thread_wakeup_drain_pipes: read () failed, error (%d) %s\n", errno, g_strerror (errno));
185 received = recv (threadpool_io->wakeup_pipes [0], buffer, sizeof (buffer), 0);
188 if (received == SOCKET_ERROR) {
189 if (WSAGetLastError () != WSAEINTR && WSAGetLastError () != WSAEWOULDBLOCK)
190 g_warning ("selector_thread_wakeup_drain_pipes: recv () failed, error (%d) %s\n", WSAGetLastError ());
199 MonoGHashTable *states;
200 } FilterSockaresForDomainData;
203 filter_jobs_for_domain (gpointer key, gpointer value, gpointer user_data)
205 FilterSockaresForDomainData *data;
206 MonoMList *list = (MonoMList *)value, *element;
208 MonoGHashTable *states;
210 g_assert (user_data);
211 data = (FilterSockaresForDomainData *)user_data;
212 domain = data->domain;
213 states = data->states;
215 for (element = list; element; element = mono_mlist_next (element)) {
216 MonoIOSelectorJob *job = (MonoIOSelectorJob*) mono_mlist_get_data (element);
217 if (mono_object_domain (job) == domain)
218 mono_mlist_set_data (element, NULL);
221 /* we skip all the first elements which are NULL */
222 for (; list; list = mono_mlist_next (list)) {
223 if (mono_mlist_get_data (list))
228 g_assert (mono_mlist_get_data (list));
230 /* we delete all the NULL elements after the first one */
231 for (element = list; element;) {
233 if (!(next = mono_mlist_next (element)))
235 if (mono_mlist_get_data (next))
238 mono_mlist_set_next (element, mono_mlist_next (next));
242 mono_g_hash_table_replace (states, key, list);
246 wait_callback (gint fd, gint events, gpointer user_data)
250 if (mono_runtime_is_shutting_down ())
253 if (fd == threadpool_io->wakeup_pipes [0]) {
254 mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: wke");
255 selector_thread_wakeup_drain_pipes ();
257 MonoGHashTable *states;
258 MonoMList *list = NULL;
260 gboolean remove_fd = FALSE;
263 g_assert (user_data);
264 states = (MonoGHashTable *)user_data;
266 mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: cal fd %3d, events = %2s | %2s | %3s",
267 fd, (events & EVENT_IN) ? "RD" : "..", (events & EVENT_OUT) ? "WR" : "..", (events & EVENT_ERR) ? "ERR" : "...");
269 if (!mono_g_hash_table_lookup_extended (states, GINT_TO_POINTER (fd), &k, (gpointer*) &list))
270 g_error ("wait_callback: fd %d not found in states table", fd);
272 if (list && (events & EVENT_IN) != 0) {
273 MonoIOSelectorJob *job = get_job_for_event (&list, EVENT_IN);
275 mono_threadpool_ms_enqueue_work_item (((MonoObject*) job)->vtable->domain, (MonoObject*) job, &error);
276 mono_error_raise_exception (&error); /* FIXME don't raise here */
280 if (list && (events & EVENT_OUT) != 0) {
281 MonoIOSelectorJob *job = get_job_for_event (&list, EVENT_OUT);
283 mono_threadpool_ms_enqueue_work_item (((MonoObject*) job)->vtable->domain, (MonoObject*) job, &error);
284 mono_error_raise_exception (&error); /* FIXME don't raise here */
288 remove_fd = (events & EVENT_ERR) == EVENT_ERR;
290 mono_g_hash_table_replace (states, GINT_TO_POINTER (fd), list);
292 operations = get_operations_for_jobs (list);
294 mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: res fd %3d, events = %2s | %2s | %3s",
295 fd, (operations & EVENT_IN) ? "RD" : "..", (operations & EVENT_OUT) ? "WR" : "..", (operations & EVENT_ERR) ? "ERR" : "...");
297 threadpool_io->backend.register_fd (fd, operations, FALSE);
299 mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: err fd %d", fd);
301 mono_g_hash_table_remove (states, GINT_TO_POINTER (fd));
303 threadpool_io->backend.remove_fd (fd);
309 selector_thread (gpointer data)
312 MonoGHashTable *states;
314 io_selector_running = TRUE;
316 if (mono_runtime_is_shutting_down ()) {
317 io_selector_running = FALSE;
321 states = mono_g_hash_table_new_type (g_direct_hash, g_direct_equal, MONO_HASH_VALUE_GC, MONO_ROOT_SOURCE_THREAD_POOL, "i/o thread pool states table");
327 mono_coop_mutex_lock (&threadpool_io->updates_lock);
329 for (i = 0; i < threadpool_io->updates_size; ++i) {
330 ThreadPoolIOUpdate *update = &threadpool_io->updates [i];
332 switch (update->type) {
340 MonoMList *list = NULL;
341 MonoIOSelectorJob *job;
343 fd = update->data.add.fd;
346 job = update->data.add.job;
349 exists = mono_g_hash_table_lookup_extended (states, GINT_TO_POINTER (fd), &k, (gpointer*) &list);
350 list = mono_mlist_append (list, (MonoObject*) job);
351 mono_g_hash_table_replace (states, GINT_TO_POINTER (fd), list);
353 operations = get_operations_for_jobs (list);
355 mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: %3s fd %3d, operations = %2s | %2s | %3s",
356 exists ? "mod" : "add", fd, (operations & EVENT_IN) ? "RD" : "..", (operations & EVENT_OUT) ? "WR" : "..", (operations & EVENT_ERR) ? "ERR" : "...");
358 threadpool_io->backend.register_fd (fd, operations, !exists);
362 case UPDATE_REMOVE_SOCKET: {
365 MonoMList *list = NULL;
367 fd = update->data.remove_socket.fd;
370 if (mono_g_hash_table_lookup_extended (states, GINT_TO_POINTER (fd), &k, (gpointer*) &list)) {
371 mono_g_hash_table_remove (states, GINT_TO_POINTER (fd));
373 for (j = i + 1; j < threadpool_io->updates_size; ++j) {
374 ThreadPoolIOUpdate *update = &threadpool_io->updates [j];
375 if (update->type == UPDATE_ADD && update->data.add.fd == fd)
376 memset (update, 0, sizeof (ThreadPoolIOUpdate));
379 for (; list; list = mono_mlist_remove_item (list, list)) {
380 mono_threadpool_ms_enqueue_work_item (mono_object_domain (mono_mlist_get_data (list)), mono_mlist_get_data (list), &error);
381 mono_error_raise_exception (&error); /* FIXME don't raise here */
384 mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: del fd %3d", fd);
385 threadpool_io->backend.remove_fd (fd);
390 case UPDATE_REMOVE_DOMAIN: {
393 domain = update->data.remove_domain.domain;
396 FilterSockaresForDomainData user_data = { .domain = domain, .states = states };
397 mono_g_hash_table_foreach (states, filter_jobs_for_domain, &user_data);
399 for (j = i + 1; j < threadpool_io->updates_size; ++j) {
400 ThreadPoolIOUpdate *update = &threadpool_io->updates [j];
401 if (update->type == UPDATE_ADD && mono_object_domain (update->data.add.job) == domain)
402 memset (update, 0, sizeof (ThreadPoolIOUpdate));
408 g_assert_not_reached ();
412 mono_coop_cond_broadcast (&threadpool_io->updates_cond);
414 if (threadpool_io->updates_size > 0) {
415 threadpool_io->updates_size = 0;
416 memset (&threadpool_io->updates, 0, UPDATES_CAPACITY * sizeof (ThreadPoolIOUpdate));
419 mono_coop_mutex_unlock (&threadpool_io->updates_lock);
421 mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: wai");
423 res = threadpool_io->backend.event_wait (wait_callback, states);
425 if (res == -1 || mono_runtime_is_shutting_down ())
429 mono_g_hash_table_destroy (states);
431 io_selector_running = FALSE;
434 /* Locking: threadpool_io->updates_lock must be held */
435 static ThreadPoolIOUpdate*
436 update_get_new (void)
438 ThreadPoolIOUpdate *update = NULL;
439 g_assert (threadpool_io->updates_size <= UPDATES_CAPACITY);
441 while (threadpool_io->updates_size == UPDATES_CAPACITY) {
442 /* we wait for updates to be applied in the selector_thread and we loop
443 * as long as none are available. if it happends too much, then we need
444 * to increase UPDATES_CAPACITY */
445 mono_coop_cond_wait (&threadpool_io->updates_cond, &threadpool_io->updates_lock);
448 g_assert (threadpool_io->updates_size < UPDATES_CAPACITY);
450 update = &threadpool_io->updates [threadpool_io->updates_size ++];
456 wakeup_pipes_init (void)
458 #if !defined(HOST_WIN32)
459 if (pipe (threadpool_io->wakeup_pipes) == -1)
460 g_error ("wakeup_pipes_init: pipe () failed, error (%d) %s\n", errno, g_strerror (errno));
461 if (fcntl (threadpool_io->wakeup_pipes [0], F_SETFL, O_NONBLOCK) == -1)
462 g_error ("wakeup_pipes_init: fcntl () failed, error (%d) %s\n", errno, g_strerror (errno));
464 struct sockaddr_in client;
465 struct sockaddr_in server;
470 server_sock = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
471 g_assert (server_sock != INVALID_SOCKET);
472 threadpool_io->wakeup_pipes [1] = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
473 g_assert (threadpool_io->wakeup_pipes [1] != INVALID_SOCKET);
475 server.sin_family = AF_INET;
476 server.sin_addr.s_addr = inet_addr ("127.0.0.1");
478 if (bind (server_sock, (SOCKADDR*) &server, sizeof (server)) == SOCKET_ERROR) {
479 closesocket (server_sock);
480 g_error ("wakeup_pipes_init: bind () failed, error (%d)\n", WSAGetLastError ());
483 size = sizeof (server);
484 if (getsockname (server_sock, (SOCKADDR*) &server, &size) == SOCKET_ERROR) {
485 closesocket (server_sock);
486 g_error ("wakeup_pipes_init: getsockname () failed, error (%d)\n", WSAGetLastError ());
488 if (listen (server_sock, 1024) == SOCKET_ERROR) {
489 closesocket (server_sock);
490 g_error ("wakeup_pipes_init: listen () failed, error (%d)\n", WSAGetLastError ());
492 if (connect ((SOCKET) threadpool_io->wakeup_pipes [1], (SOCKADDR*) &server, sizeof (server)) == SOCKET_ERROR) {
493 closesocket (server_sock);
494 g_error ("wakeup_pipes_init: connect () failed, error (%d)\n", WSAGetLastError ());
497 size = sizeof (client);
498 threadpool_io->wakeup_pipes [0] = accept (server_sock, (SOCKADDR *) &client, &size);
499 g_assert (threadpool_io->wakeup_pipes [0] != INVALID_SOCKET);
502 if (ioctlsocket (threadpool_io->wakeup_pipes [0], FIONBIO, &arg) == SOCKET_ERROR) {
503 closesocket (threadpool_io->wakeup_pipes [0]);
504 closesocket (server_sock);
505 g_error ("wakeup_pipes_init: ioctlsocket () failed, error (%d)\n", WSAGetLastError ());
508 closesocket (server_sock);
515 g_assert (!threadpool_io);
516 threadpool_io = g_new0 (ThreadPoolIO, 1);
517 g_assert (threadpool_io);
519 mono_coop_mutex_init (&threadpool_io->updates_lock);
520 mono_coop_cond_init (&threadpool_io->updates_cond);
521 mono_gc_register_root ((char *)&threadpool_io->updates [0], sizeof (threadpool_io->updates), MONO_GC_DESCRIPTOR_NULL, MONO_ROOT_SOURCE_THREAD_POOL, "i/o thread pool updates list");
523 threadpool_io->updates_size = 0;
525 threadpool_io->backend = backend_poll;
526 if (g_getenv ("MONO_ENABLE_AIO") != NULL) {
527 #if defined(HAVE_EPOLL)
528 threadpool_io->backend = backend_epoll;
529 #elif defined(HAVE_KQUEUE)
530 threadpool_io->backend = backend_kqueue;
534 wakeup_pipes_init ();
536 if (!threadpool_io->backend.init (threadpool_io->wakeup_pipes [0]))
537 g_error ("initialize: backend->init () failed");
539 if (!mono_thread_create_internal (mono_get_root_domain (), selector_thread, NULL, TRUE, SMALL_STACK))
540 g_error ("initialize: mono_thread_create_internal () failed");
546 /* we make the assumption along the code that we are
547 * cleaning up only if the runtime is shutting down */
548 g_assert (mono_runtime_is_shutting_down ());
550 selector_thread_wakeup ();
551 while (io_selector_running)
552 mono_thread_info_usleep (1000);
554 mono_coop_mutex_destroy (&threadpool_io->updates_lock);
555 mono_coop_cond_destroy (&threadpool_io->updates_cond);
557 threadpool_io->backend.cleanup ();
559 #if !defined(HOST_WIN32)
560 close (threadpool_io->wakeup_pipes [0]);
561 close (threadpool_io->wakeup_pipes [1]);
563 closesocket (threadpool_io->wakeup_pipes [0]);
564 closesocket (threadpool_io->wakeup_pipes [1]);
567 g_assert (threadpool_io);
568 g_free (threadpool_io);
569 threadpool_io = NULL;
570 g_assert (!threadpool_io);
574 mono_threadpool_ms_io_cleanup (void)
576 mono_lazy_cleanup (&io_status, cleanup);
580 ves_icall_System_IOSelector_Add (gpointer handle, MonoIOSelectorJob *job)
582 ThreadPoolIOUpdate *update;
584 g_assert (handle >= 0);
586 g_assert (job->operation == EVENT_IN ^ job->operation == EVENT_OUT);
587 g_assert (job->callback);
589 if (mono_runtime_is_shutting_down ())
591 if (mono_domain_is_unloading (mono_object_domain (job)))
594 mono_lazy_initialize (&io_status, initialize);
596 mono_coop_mutex_lock (&threadpool_io->updates_lock);
598 update = update_get_new ();
599 update->type = UPDATE_ADD;
600 update->data.add.fd = GPOINTER_TO_INT (handle);
601 update->data.add.job = job;
602 mono_memory_barrier (); /* Ensure this is safely published before we wake up the selector */
604 selector_thread_wakeup ();
606 mono_coop_mutex_unlock (&threadpool_io->updates_lock);
610 ves_icall_System_IOSelector_Remove (gpointer handle)
612 mono_threadpool_ms_io_remove_socket (GPOINTER_TO_INT (handle));
616 mono_threadpool_ms_io_remove_socket (int fd)
618 ThreadPoolIOUpdate *update;
620 if (!mono_lazy_is_initialized (&io_status))
623 mono_coop_mutex_lock (&threadpool_io->updates_lock);
625 update = update_get_new ();
626 update->type = UPDATE_REMOVE_SOCKET;
627 update->data.add.fd = fd;
628 mono_memory_barrier (); /* Ensure this is safely published before we wake up the selector */
630 selector_thread_wakeup ();
632 mono_coop_cond_wait (&threadpool_io->updates_cond, &threadpool_io->updates_lock);
634 mono_coop_mutex_unlock (&threadpool_io->updates_lock);
638 mono_threadpool_ms_io_remove_domain_jobs (MonoDomain *domain)
640 ThreadPoolIOUpdate *update;
642 if (!mono_lazy_is_initialized (&io_status))
645 mono_coop_mutex_lock (&threadpool_io->updates_lock);
647 update = update_get_new ();
648 update->type = UPDATE_REMOVE_DOMAIN;
649 update->data.remove_domain.domain = domain;
650 mono_memory_barrier (); /* Ensure this is safely published before we wake up the selector */
652 selector_thread_wakeup ();
654 mono_coop_cond_wait (&threadpool_io->updates_cond, &threadpool_io->updates_lock);
656 mono_coop_mutex_unlock (&threadpool_io->updates_lock);
662 ves_icall_System_IOSelector_Add (gpointer handle, MonoIOSelectorJob *job)
664 g_assert_not_reached ();
668 ves_icall_System_IOSelector_Remove (gpointer handle)
670 g_assert_not_reached ();
674 mono_threadpool_ms_io_cleanup (void)
676 g_assert_not_reached ();
680 mono_threadpool_ms_io_remove_socket (int fd)
682 g_assert_not_reached ();
686 mono_threadpool_ms_io_remove_domain_jobs (MonoDomain *domain)
688 g_assert_not_reached ();