3 * Microsoft IO threadpool runtime support
6 * Ludovic Henry (ludovic.henry@xamarin.com)
8 * Copyright 2015 Xamarin, Inc (http://www.xamarin.com)
9 * Licensed under the MIT license. See LICENSE file in the project root for full license information.
14 #ifndef DISABLE_SOCKETS
18 #if defined(HOST_WIN32)
25 #include <mono/metadata/gc-internals.h>
26 #include <mono/metadata/mono-mlist.h>
27 #include <mono/metadata/threadpool.h>
28 #include <mono/metadata/threadpool-io.h>
29 #include <mono/utils/atomic.h>
30 #include <mono/utils/mono-threads.h>
31 #include <mono/utils/mono-lazy-init.h>
32 #include <mono/utils/mono-logger-internals.h>
33 #include <mono/utils/w32api.h>
36 gboolean (*init) (gint wakeup_pipe_fd);
37 void (*register_fd) (gint fd, gint events, gboolean is_new);
38 void (*remove_fd) (gint fd);
39 gint (*event_wait) (void (*callback) (gint fd, gint events, gpointer user_data), gpointer user_data);
40 } ThreadPoolIOBackend;
42 /* Keep in sync with System.IOOperation in mcs/class/System/System/IOSelector.cs */
43 enum MonoIOOperation {
46 EVENT_ERR = 1 << 2, /* not in managed */
49 #include "threadpool-io-epoll.c"
50 #include "threadpool-io-kqueue.c"
51 #include "threadpool-io-poll.c"
53 #define UPDATES_CAPACITY 128
55 /* Keep in sync with System.IOSelectorJob in mcs/class/System/System/IOSelector.cs */
56 struct _MonoIOSelectorJob {
68 } ThreadPoolIOUpdateType;
72 MonoIOSelectorJob *job;
73 } ThreadPoolIOUpdate_Add;
77 } ThreadPoolIOUpdate_RemoveSocket;
81 } ThreadPoolIOUpdate_RemoveDomain;
84 ThreadPoolIOUpdateType type;
86 ThreadPoolIOUpdate_Add add;
87 ThreadPoolIOUpdate_RemoveSocket remove_socket;
88 ThreadPoolIOUpdate_RemoveDomain remove_domain;
93 ThreadPoolIOBackend backend;
95 ThreadPoolIOUpdate updates [UPDATES_CAPACITY];
97 MonoCoopMutex updates_lock;
98 MonoCoopCond updates_cond;
100 #if !defined(HOST_WIN32)
101 gint wakeup_pipes [2];
103 SOCKET wakeup_pipes [2];
107 static mono_lazy_init_t io_status = MONO_LAZY_INIT_STATUS_NOT_INITIALIZED;
109 static gboolean io_selector_running = FALSE;
111 static ThreadPoolIO* threadpool_io;
113 static MonoIOSelectorJob*
114 get_job_for_event (MonoMList **list, gint32 event)
120 for (current = *list; current; current = mono_mlist_next (current)) {
121 MonoIOSelectorJob *job = (MonoIOSelectorJob*) mono_mlist_get_data (current);
122 if (job->operation == event) {
123 *list = mono_mlist_remove_item (*list, current);
132 get_operations_for_jobs (MonoMList *list)
137 for (current = list; current; current = mono_mlist_next (current))
138 operations |= ((MonoIOSelectorJob*) mono_mlist_get_data (current))->operation;
144 selector_thread_wakeup (void)
150 #if !defined(HOST_WIN32)
151 written = write (threadpool_io->wakeup_pipes [1], &msg, 1);
155 g_warning ("selector_thread_wakeup: write () failed, error (%d) %s\n", errno, g_strerror (errno));
159 written = send (threadpool_io->wakeup_pipes [1], &msg, 1, 0);
162 if (written == SOCKET_ERROR) {
163 g_warning ("selector_thread_wakeup: write () failed, error (%d)\n", WSAGetLastError ());
171 selector_thread_wakeup_drain_pipes (void)
177 #if !defined(HOST_WIN32)
178 received = read (threadpool_io->wakeup_pipes [0], buffer, sizeof (buffer));
181 if (received == -1) {
182 if (errno != EINTR && errno != EAGAIN)
183 g_warning ("selector_thread_wakeup_drain_pipes: read () failed, error (%d) %s\n", errno, g_strerror (errno));
187 received = recv (threadpool_io->wakeup_pipes [0], buffer, sizeof (buffer), 0);
190 if (received == SOCKET_ERROR) {
191 if (WSAGetLastError () != WSAEINTR && WSAGetLastError () != WSAEWOULDBLOCK)
192 g_warning ("selector_thread_wakeup_drain_pipes: recv () failed, error (%d) %s\n", WSAGetLastError ());
201 MonoGHashTable *states;
202 } FilterSockaresForDomainData;
205 filter_jobs_for_domain (gpointer key, gpointer value, gpointer user_data)
207 FilterSockaresForDomainData *data;
208 MonoMList *list = (MonoMList *)value, *element;
210 MonoGHashTable *states;
212 g_assert (user_data);
213 data = (FilterSockaresForDomainData *)user_data;
214 domain = data->domain;
215 states = data->states;
217 for (element = list; element; element = mono_mlist_next (element)) {
218 MonoIOSelectorJob *job = (MonoIOSelectorJob*) mono_mlist_get_data (element);
219 if (mono_object_domain (job) == domain)
220 mono_mlist_set_data (element, NULL);
223 /* we skip all the first elements which are NULL */
224 for (; list; list = mono_mlist_next (list)) {
225 if (mono_mlist_get_data (list))
230 g_assert (mono_mlist_get_data (list));
232 /* we delete all the NULL elements after the first one */
233 for (element = list; element;) {
235 if (!(next = mono_mlist_next (element)))
237 if (mono_mlist_get_data (next))
240 mono_mlist_set_next (element, mono_mlist_next (next));
244 mono_g_hash_table_replace (states, key, list);
248 wait_callback (gint fd, gint events, gpointer user_data)
252 if (mono_runtime_is_shutting_down ())
255 if (fd == threadpool_io->wakeup_pipes [0]) {
256 mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: wke");
257 selector_thread_wakeup_drain_pipes ();
259 MonoGHashTable *states;
260 MonoMList *list = NULL;
262 gboolean remove_fd = FALSE;
265 g_assert (user_data);
266 states = (MonoGHashTable *)user_data;
268 mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: cal fd %3d, events = %2s | %2s | %3s",
269 fd, (events & EVENT_IN) ? "RD" : "..", (events & EVENT_OUT) ? "WR" : "..", (events & EVENT_ERR) ? "ERR" : "...");
271 if (!mono_g_hash_table_lookup_extended (states, GINT_TO_POINTER (fd), &k, (gpointer*) &list))
272 g_error ("wait_callback: fd %d not found in states table", fd);
274 if (list && (events & EVENT_IN) != 0) {
275 MonoIOSelectorJob *job = get_job_for_event (&list, EVENT_IN);
277 mono_threadpool_enqueue_work_item (((MonoObject*) job)->vtable->domain, (MonoObject*) job, &error);
278 mono_error_assert_ok (&error);
282 if (list && (events & EVENT_OUT) != 0) {
283 MonoIOSelectorJob *job = get_job_for_event (&list, EVENT_OUT);
285 mono_threadpool_enqueue_work_item (((MonoObject*) job)->vtable->domain, (MonoObject*) job, &error);
286 mono_error_assert_ok (&error);
290 remove_fd = (events & EVENT_ERR) == EVENT_ERR;
292 mono_g_hash_table_replace (states, GINT_TO_POINTER (fd), list);
294 operations = get_operations_for_jobs (list);
296 mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: res fd %3d, events = %2s | %2s | %3s",
297 fd, (operations & EVENT_IN) ? "RD" : "..", (operations & EVENT_OUT) ? "WR" : "..", (operations & EVENT_ERR) ? "ERR" : "...");
299 threadpool_io->backend.register_fd (fd, operations, FALSE);
301 mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: err fd %d", fd);
303 mono_g_hash_table_remove (states, GINT_TO_POINTER (fd));
305 threadpool_io->backend.remove_fd (fd);
311 selector_thread_interrupt (gpointer unused)
313 selector_thread_wakeup ();
317 selector_thread (gpointer data)
320 MonoGHashTable *states;
322 if (mono_runtime_is_shutting_down ()) {
323 io_selector_running = FALSE;
327 states = mono_g_hash_table_new_type (g_direct_hash, NULL, MONO_HASH_VALUE_GC, MONO_ROOT_SOURCE_THREAD_POOL, "i/o thread pool states table");
329 while (!mono_runtime_is_shutting_down ()) {
332 gboolean interrupted = FALSE;
334 if (mono_thread_interruption_checkpoint ())
337 mono_coop_mutex_lock (&threadpool_io->updates_lock);
339 for (i = 0; i < threadpool_io->updates_size; ++i) {
340 ThreadPoolIOUpdate *update = &threadpool_io->updates [i];
342 switch (update->type) {
350 MonoMList *list = NULL;
351 MonoIOSelectorJob *job;
353 fd = update->data.add.fd;
356 job = update->data.add.job;
359 exists = mono_g_hash_table_lookup_extended (states, GINT_TO_POINTER (fd), &k, (gpointer*) &list);
360 list = mono_mlist_append_checked (list, (MonoObject*) job, &error);
361 mono_error_assert_ok (&error);
362 mono_g_hash_table_replace (states, GINT_TO_POINTER (fd), list);
364 operations = get_operations_for_jobs (list);
366 mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: %3s fd %3d, operations = %2s | %2s | %3s",
367 exists ? "mod" : "add", fd, (operations & EVENT_IN) ? "RD" : "..", (operations & EVENT_OUT) ? "WR" : "..", (operations & EVENT_ERR) ? "ERR" : "...");
369 threadpool_io->backend.register_fd (fd, operations, !exists);
373 case UPDATE_REMOVE_SOCKET: {
376 MonoMList *list = NULL;
378 fd = update->data.remove_socket.fd;
381 if (mono_g_hash_table_lookup_extended (states, GINT_TO_POINTER (fd), &k, (gpointer*) &list)) {
382 mono_g_hash_table_remove (states, GINT_TO_POINTER (fd));
384 for (j = i + 1; j < threadpool_io->updates_size; ++j) {
385 ThreadPoolIOUpdate *update = &threadpool_io->updates [j];
386 if (update->type == UPDATE_ADD && update->data.add.fd == fd)
387 memset (update, 0, sizeof (ThreadPoolIOUpdate));
390 for (; list; list = mono_mlist_remove_item (list, list)) {
391 mono_threadpool_enqueue_work_item (mono_object_domain (mono_mlist_get_data (list)), mono_mlist_get_data (list), &error);
392 mono_error_assert_ok (&error);
395 mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: del fd %3d", fd);
396 threadpool_io->backend.remove_fd (fd);
401 case UPDATE_REMOVE_DOMAIN: {
404 domain = update->data.remove_domain.domain;
407 FilterSockaresForDomainData user_data = { .domain = domain, .states = states };
408 mono_g_hash_table_foreach (states, filter_jobs_for_domain, &user_data);
410 for (j = i + 1; j < threadpool_io->updates_size; ++j) {
411 ThreadPoolIOUpdate *update = &threadpool_io->updates [j];
412 if (update->type == UPDATE_ADD && mono_object_domain (update->data.add.job) == domain)
413 memset (update, 0, sizeof (ThreadPoolIOUpdate));
419 g_assert_not_reached ();
423 mono_coop_cond_broadcast (&threadpool_io->updates_cond);
425 if (threadpool_io->updates_size > 0) {
426 threadpool_io->updates_size = 0;
427 memset (&threadpool_io->updates, 0, UPDATES_CAPACITY * sizeof (ThreadPoolIOUpdate));
430 mono_coop_mutex_unlock (&threadpool_io->updates_lock);
432 mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: wai");
434 mono_thread_info_install_interrupt (selector_thread_interrupt, NULL, &interrupted);
438 res = threadpool_io->backend.event_wait (wait_callback, states);
442 mono_thread_info_uninstall_interrupt (&interrupted);
445 mono_g_hash_table_destroy (states);
447 mono_coop_mutex_lock (&threadpool_io->updates_lock);
449 io_selector_running = FALSE;
450 mono_coop_cond_broadcast (&threadpool_io->updates_cond);
452 mono_coop_mutex_unlock (&threadpool_io->updates_lock);
457 /* Locking: threadpool_io->updates_lock must be held */
458 static ThreadPoolIOUpdate*
459 update_get_new (void)
461 ThreadPoolIOUpdate *update = NULL;
462 g_assert (threadpool_io->updates_size <= UPDATES_CAPACITY);
464 while (threadpool_io->updates_size == UPDATES_CAPACITY) {
465 /* we wait for updates to be applied in the selector_thread and we loop
466 * as long as none are available. if it happends too much, then we need
467 * to increase UPDATES_CAPACITY */
468 mono_coop_cond_wait (&threadpool_io->updates_cond, &threadpool_io->updates_lock);
471 g_assert (threadpool_io->updates_size < UPDATES_CAPACITY);
473 update = &threadpool_io->updates [threadpool_io->updates_size ++];
479 wakeup_pipes_init (void)
481 #if !defined(HOST_WIN32)
482 if (pipe (threadpool_io->wakeup_pipes) == -1)
483 g_error ("wakeup_pipes_init: pipe () failed, error (%d) %s\n", errno, g_strerror (errno));
484 if (fcntl (threadpool_io->wakeup_pipes [0], F_SETFL, O_NONBLOCK) == -1)
485 g_error ("wakeup_pipes_init: fcntl () failed, error (%d) %s\n", errno, g_strerror (errno));
487 struct sockaddr_in client;
488 struct sockaddr_in server;
493 server_sock = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
494 g_assert (server_sock != INVALID_SOCKET);
495 threadpool_io->wakeup_pipes [1] = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
496 g_assert (threadpool_io->wakeup_pipes [1] != INVALID_SOCKET);
498 server.sin_family = AF_INET;
499 server.sin_addr.s_addr = inet_addr ("127.0.0.1");
501 if (bind (server_sock, (SOCKADDR*) &server, sizeof (server)) == SOCKET_ERROR) {
502 closesocket (server_sock);
503 g_error ("wakeup_pipes_init: bind () failed, error (%d)\n", WSAGetLastError ());
506 size = sizeof (server);
507 if (getsockname (server_sock, (SOCKADDR*) &server, &size) == SOCKET_ERROR) {
508 closesocket (server_sock);
509 g_error ("wakeup_pipes_init: getsockname () failed, error (%d)\n", WSAGetLastError ());
511 if (listen (server_sock, 1024) == SOCKET_ERROR) {
512 closesocket (server_sock);
513 g_error ("wakeup_pipes_init: listen () failed, error (%d)\n", WSAGetLastError ());
515 if (connect ((SOCKET) threadpool_io->wakeup_pipes [1], (SOCKADDR*) &server, sizeof (server)) == SOCKET_ERROR) {
516 closesocket (server_sock);
517 g_error ("wakeup_pipes_init: connect () failed, error (%d)\n", WSAGetLastError ());
520 size = sizeof (client);
521 threadpool_io->wakeup_pipes [0] = accept (server_sock, (SOCKADDR *) &client, &size);
522 g_assert (threadpool_io->wakeup_pipes [0] != INVALID_SOCKET);
525 if (ioctlsocket (threadpool_io->wakeup_pipes [0], FIONBIO, &arg) == SOCKET_ERROR) {
526 closesocket (threadpool_io->wakeup_pipes [0]);
527 closesocket (server_sock);
528 g_error ("wakeup_pipes_init: ioctlsocket () failed, error (%d)\n", WSAGetLastError ());
531 closesocket (server_sock);
538 g_assert (!threadpool_io);
539 threadpool_io = g_new0 (ThreadPoolIO, 1);
540 g_assert (threadpool_io);
542 mono_coop_mutex_init (&threadpool_io->updates_lock);
543 mono_coop_cond_init (&threadpool_io->updates_cond);
544 mono_gc_register_root ((char *)&threadpool_io->updates [0], sizeof (threadpool_io->updates), MONO_GC_DESCRIPTOR_NULL, MONO_ROOT_SOURCE_THREAD_POOL, "i/o thread pool updates list");
546 threadpool_io->updates_size = 0;
548 threadpool_io->backend = backend_poll;
549 if (g_hasenv ("MONO_ENABLE_AIO")) {
550 #if defined(HAVE_EPOLL)
551 threadpool_io->backend = backend_epoll;
552 #elif defined(HAVE_KQUEUE)
553 threadpool_io->backend = backend_kqueue;
557 wakeup_pipes_init ();
559 if (!threadpool_io->backend.init (threadpool_io->wakeup_pipes [0]))
560 g_error ("initialize: backend->init () failed");
562 mono_coop_mutex_lock (&threadpool_io->updates_lock);
564 io_selector_running = TRUE;
567 if (!mono_thread_create_internal (mono_get_root_domain (), selector_thread, NULL, MONO_THREAD_CREATE_FLAGS_THREADPOOL | MONO_THREAD_CREATE_FLAGS_SMALL_STACK, &error))
568 g_error ("initialize: mono_thread_create_internal () failed due to %s", mono_error_get_message (&error));
570 mono_coop_mutex_unlock (&threadpool_io->updates_lock);
576 // FIXME destroy everything
580 mono_threadpool_io_cleanup (void)
582 mono_lazy_cleanup (&io_status, cleanup);
586 ves_icall_System_IOSelector_Add (gpointer handle, MonoIOSelectorJob *job)
588 ThreadPoolIOUpdate *update;
592 g_assert ((job->operation == EVENT_IN) ^ (job->operation == EVENT_OUT));
593 g_assert (job->callback);
595 if (mono_runtime_is_shutting_down ())
597 if (mono_domain_is_unloading (mono_object_domain (job)))
600 mono_lazy_initialize (&io_status, initialize);
602 mono_coop_mutex_lock (&threadpool_io->updates_lock);
604 if (!io_selector_running) {
605 mono_coop_mutex_unlock (&threadpool_io->updates_lock);
609 update = update_get_new ();
610 update->type = UPDATE_ADD;
611 update->data.add.fd = GPOINTER_TO_INT (handle);
612 update->data.add.job = job;
613 mono_memory_barrier (); /* Ensure this is safely published before we wake up the selector */
615 selector_thread_wakeup ();
617 mono_coop_mutex_unlock (&threadpool_io->updates_lock);
621 ves_icall_System_IOSelector_Remove (gpointer handle)
623 mono_threadpool_io_remove_socket (GPOINTER_TO_INT (handle));
627 mono_threadpool_io_remove_socket (int fd)
629 ThreadPoolIOUpdate *update;
631 if (!mono_lazy_is_initialized (&io_status))
634 mono_coop_mutex_lock (&threadpool_io->updates_lock);
636 if (!io_selector_running) {
637 mono_coop_mutex_unlock (&threadpool_io->updates_lock);
641 update = update_get_new ();
642 update->type = UPDATE_REMOVE_SOCKET;
643 update->data.add.fd = fd;
644 mono_memory_barrier (); /* Ensure this is safely published before we wake up the selector */
646 selector_thread_wakeup ();
648 mono_coop_cond_wait (&threadpool_io->updates_cond, &threadpool_io->updates_lock);
650 mono_coop_mutex_unlock (&threadpool_io->updates_lock);
654 mono_threadpool_io_remove_domain_jobs (MonoDomain *domain)
656 ThreadPoolIOUpdate *update;
658 if (!mono_lazy_is_initialized (&io_status))
661 mono_coop_mutex_lock (&threadpool_io->updates_lock);
663 if (!io_selector_running) {
664 mono_coop_mutex_unlock (&threadpool_io->updates_lock);
668 update = update_get_new ();
669 update->type = UPDATE_REMOVE_DOMAIN;
670 update->data.remove_domain.domain = domain;
671 mono_memory_barrier (); /* Ensure this is safely published before we wake up the selector */
673 selector_thread_wakeup ();
675 mono_coop_cond_wait (&threadpool_io->updates_cond, &threadpool_io->updates_lock);
677 mono_coop_mutex_unlock (&threadpool_io->updates_lock);
683 ves_icall_System_IOSelector_Add (gpointer handle, MonoIOSelectorJob *job)
685 g_assert_not_reached ();
689 ves_icall_System_IOSelector_Remove (gpointer handle)
691 g_assert_not_reached ();
695 mono_threadpool_io_cleanup (void)
697 g_assert_not_reached ();
701 mono_threadpool_io_remove_socket (int fd)
703 g_assert_not_reached ();
707 mono_threadpool_io_remove_domain_jobs (MonoDomain *domain)
709 g_assert_not_reached ();