2 * threadpool-ms-io.c: Microsoft IO threadpool runtime support
5 * Ludovic Henry (ludovic.henry@xamarin.com)
7 * Copyright 2015 Xamarin, Inc (http://www.xamarin.com)
12 #ifndef DISABLE_SOCKETS
16 #if defined(HOST_WIN32)
23 #include <mono/metadata/gc-internal.h>
24 #include <mono/metadata/mono-mlist.h>
25 #include <mono/metadata/threadpool-internals.h>
26 #include <mono/metadata/threadpool-ms.h>
27 #include <mono/metadata/threadpool-ms-io.h>
28 #include <mono/utils/atomic.h>
29 #include <mono/utils/mono-poll.h>
30 #include <mono/utils/mono-threads.h>
32 /* Keep in sync with System.Net.Sockets.Socket.SocketOperation */
41 AIO_OP_RECV_JUST_CALLBACK,
42 AIO_OP_SEND_JUST_CALLBACK,
47 AIO_OP_RECEIVE_BUFFERS,
59 gboolean (*init) (gint wakeup_pipe_fd);
60 void (*cleanup) (void);
61 void (*update_add) (ThreadPoolIOUpdate *update);
62 gint (*event_wait) (void);
63 gint (*event_max) (void);
64 gint (*event_fd_at) (guint i);
65 gboolean (*event_create_sockares_at) (guint i, gint fd, MonoMList **list);
66 } ThreadPoolIOBackend;
69 get_events_from_sockares (MonoSocketAsyncResult *ares)
71 switch (ares->operation) {
74 case AIO_OP_RECV_JUST_CALLBACK:
75 case AIO_OP_RECEIVEFROM:
77 case AIO_OP_ACCEPTRECEIVE:
78 case AIO_OP_RECEIVE_BUFFERS:
81 case AIO_OP_SEND_JUST_CALLBACK:
84 case AIO_OP_SEND_BUFFERS:
85 case AIO_OP_DISCONNECT:
88 g_assert_not_reached ();
92 static MonoSocketAsyncResult*
93 get_sockares_for_event (MonoMList **list, gint event)
95 MonoSocketAsyncResult *state = NULL;
100 for (current = *list; current; current = mono_mlist_next (current)) {
101 state = (MonoSocketAsyncResult*) mono_mlist_get_data (current);
102 if (get_events_from_sockares ((MonoSocketAsyncResult*) state) == event)
108 *list = mono_mlist_remove_item (*list, current);
114 get_events (MonoMList *list)
116 MonoSocketAsyncResult *ares;
119 for (; list; list = mono_mlist_next (list))
120 if ((ares = (MonoSocketAsyncResult*) mono_mlist_get_data (list)))
121 events |= get_events_from_sockares (ares);
126 #include "threadpool-ms-io-epoll.c"
127 #include "threadpool-ms-io-kqueue.c"
128 #include "threadpool-ms-io-poll.c"
131 MonoGHashTable *states;
132 mono_mutex_t states_lock;
134 ThreadPoolIOBackend backend;
136 ThreadPoolIOUpdate *updates;
138 mono_mutex_t updates_lock;
140 #if !defined(HOST_WIN32)
141 gint wakeup_pipes [2];
143 SOCKET wakeup_pipes [2];
147 static gint32 io_status = STATUS_NOT_INITIALIZED;
148 static gint32 io_thread_status = STATUS_NOT_INITIALIZED;
150 static ThreadPoolIO* threadpool_io;
153 selector_thread_wakeup (void)
159 #if !defined(HOST_WIN32)
160 written = write (threadpool_io->wakeup_pipes [1], &msg, 1);
164 g_warning ("selector_thread_wakeup: write () failed, error (%d) %s\n", errno, g_strerror (errno));
168 written = send (threadpool_io->wakeup_pipes [1], &msg, 1, 0);
171 if (written == SOCKET_ERROR) {
172 g_warning ("selector_thread_wakeup: write () failed, error (%d)\n", WSAGetLastError ());
180 selector_thread_wakeup_drain_pipes (void)
186 #if !defined(HOST_WIN32)
187 received = read (threadpool_io->wakeup_pipes [0], buffer, sizeof (buffer));
190 if (received == -1) {
191 if (errno != EINTR && errno != EAGAIN)
192 g_warning ("selector_thread_wakeup_drain_pipes: read () failed, error (%d) %s\n", errno, g_strerror (errno));
196 received = recv (threadpool_io->wakeup_pipes [0], buffer, sizeof (buffer), 0);
199 if (received == SOCKET_ERROR) {
200 if (WSAGetLastError () != WSAEINTR && WSAGetLastError () != WSAEWOULDBLOCK)
201 g_warning ("selector_thread_wakeup_drain_pipes: recv () failed, error (%d) %s\n", WSAGetLastError ());
209 selector_thread (gpointer data)
211 io_thread_status = STATUS_INITIALIZED;
218 mono_gc_set_skip_thread (TRUE);
220 mono_mutex_lock (&threadpool_io->updates_lock);
221 for (i = 0; i < threadpool_io->updates_size; ++i) {
222 threadpool_io->backend.update_add (&threadpool_io->updates [i]);
224 if (threadpool_io->updates_size > 0) {
225 threadpool_io->updates_size = 0;
226 threadpool_io->updates = g_renew (ThreadPoolIOUpdate, threadpool_io->updates, threadpool_io->updates_size);
228 mono_mutex_unlock (&threadpool_io->updates_lock);
230 ready = threadpool_io->backend.event_wait ();
232 mono_gc_set_skip_thread (FALSE);
234 if (ready == -1 || mono_runtime_is_shutting_down ())
237 max = threadpool_io->backend.event_max ();
239 mono_mutex_lock (&threadpool_io->states_lock);
240 for (i = 0; i < max && ready > 0; ++i) {
245 fd = threadpool_io->backend.event_fd_at (i);
247 if (fd == threadpool_io->wakeup_pipes [0]) {
248 selector_thread_wakeup_drain_pipes ();
253 list = mono_g_hash_table_lookup (threadpool_io->states, GINT_TO_POINTER (fd));
255 valid_fd = threadpool_io->backend.event_create_sockares_at (i, fd, &list);
260 mono_g_hash_table_replace (threadpool_io->states, GINT_TO_POINTER (fd), list);
262 mono_g_hash_table_remove (threadpool_io->states, GINT_TO_POINTER (fd));
266 mono_mutex_unlock (&threadpool_io->states_lock);
269 io_thread_status = STATUS_CLEANED_UP;
273 wakeup_pipes_init (void)
275 #if !defined(HOST_WIN32)
276 if (pipe (threadpool_io->wakeup_pipes) == -1)
277 g_error ("wakeup_pipes_init: pipe () failed, error (%d) %s\n", errno, g_strerror (errno));
278 if (fcntl (threadpool_io->wakeup_pipes [0], F_SETFL, O_NONBLOCK) == -1)
279 g_error ("wakeup_pipes_init: fcntl () failed, error (%d) %s\n", errno, g_strerror (errno));
281 struct sockaddr_in client;
282 struct sockaddr_in server;
287 server_sock = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
288 g_assert (server_sock != INVALID_SOCKET);
289 threadpool_io->wakeup_pipes [1] = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
290 g_assert (threadpool_io->wakeup_pipes [1] != INVALID_SOCKET);
292 server.sin_family = AF_INET;
293 server.sin_addr.s_addr = inet_addr ("127.0.0.1");
295 if (bind (server_sock, (SOCKADDR*) &server, sizeof (server)) == SOCKET_ERROR) {
296 closesocket (server_sock);
297 g_error ("wakeup_pipes_init: bind () failed, error (%d)\n", WSAGetLastError ());
300 size = sizeof (server);
301 if (getsockname (server_sock, (SOCKADDR*) &server, &size) == SOCKET_ERROR) {
302 closesocket (server_sock);
303 g_error ("wakeup_pipes_init: getsockname () failed, error (%d)\n", WSAGetLastError ());
305 if (listen (server_sock, 1024) == SOCKET_ERROR) {
306 closesocket (server_sock);
307 g_error ("wakeup_pipes_init: listen () failed, error (%d)\n", WSAGetLastError ());
309 if (connect ((SOCKET) threadpool_io->wakeup_pipes [1], (SOCKADDR*) &server, sizeof (server)) == SOCKET_ERROR) {
310 closesocket (server_sock);
311 g_error ("wakeup_pipes_init: connect () failed, error (%d)\n", WSAGetLastError ());
314 size = sizeof (client);
315 threadpool_io->wakeup_pipes [0] = accept (server_sock, (SOCKADDR *) &client, &size);
316 g_assert (threadpool_io->wakeup_pipes [0] != INVALID_SOCKET);
319 if (ioctlsocket (threadpool_io->wakeup_pipes [0], FIONBIO, &arg) == SOCKET_ERROR) {
320 closesocket (threadpool_io->wakeup_pipes [0]);
321 closesocket (server_sock);
322 g_error ("wakeup_pipes_init: ioctlsocket () failed, error (%d)\n", WSAGetLastError ());
325 closesocket (server_sock);
330 ensure_initialized (void)
332 if (io_status >= STATUS_INITIALIZED)
334 if (io_status == STATUS_INITIALIZING || InterlockedCompareExchange (&io_status, STATUS_INITIALIZING, STATUS_NOT_INITIALIZED) != STATUS_NOT_INITIALIZED) {
335 while (io_status == STATUS_INITIALIZING)
336 mono_thread_info_yield ();
337 g_assert (io_status >= STATUS_INITIALIZED);
341 g_assert (!threadpool_io);
342 threadpool_io = g_new0 (ThreadPoolIO, 1);
343 g_assert (threadpool_io);
345 threadpool_io->states = mono_g_hash_table_new_type (g_direct_hash, g_direct_equal, MONO_HASH_VALUE_GC);
346 MONO_GC_REGISTER_ROOT_FIXED (threadpool_io->states);
347 mono_mutex_init (&threadpool_io->states_lock);
349 threadpool_io->updates = NULL;
350 threadpool_io->updates_size = 0;
351 mono_mutex_init (&threadpool_io->updates_lock);
353 #if defined(HAVE_EPOLL)
354 threadpool_io->backend = backend_epoll;
355 #elif defined(HAVE_KQUEUE)
356 threadpool_io->backend = backend_kqueue;
358 threadpool_io->backend = backend_poll;
360 if (g_getenv ("MONO_DISABLE_AIO") != NULL)
361 threadpool_io->backend = backend_poll;
363 wakeup_pipes_init ();
365 if (!threadpool_io->backend.init (threadpool_io->wakeup_pipes [0]))
366 g_error ("ensure_initialized: backend->init () failed");
368 if (!mono_thread_create_internal (mono_get_root_domain (), selector_thread, NULL, TRUE, SMALL_STACK))
369 g_error ("ensure_initialized: mono_thread_create_internal () failed");
371 io_thread_status = STATUS_INITIALIZING;
372 mono_memory_write_barrier ();
374 io_status = STATUS_INITIALIZED;
378 ensure_cleanedup (void)
380 if (io_status == STATUS_NOT_INITIALIZED && InterlockedCompareExchange (&io_status, STATUS_CLEANED_UP, STATUS_NOT_INITIALIZED) == STATUS_NOT_INITIALIZED)
382 if (io_status == STATUS_INITIALIZING) {
383 while (io_status == STATUS_INITIALIZING)
384 mono_thread_info_yield ();
386 if (io_status == STATUS_CLEANED_UP)
388 if (io_status == STATUS_CLEANING_UP || InterlockedCompareExchange (&io_status, STATUS_CLEANING_UP, STATUS_INITIALIZED) != STATUS_INITIALIZED) {
389 while (io_status == STATUS_CLEANING_UP)
390 mono_thread_info_yield ();
391 g_assert (io_status == STATUS_CLEANED_UP);
395 /* we make the assumption along the code that we are
396 * cleaning up only if the runtime is shutting down */
397 g_assert (mono_runtime_is_shutting_down ());
399 selector_thread_wakeup ();
400 while (io_thread_status != STATUS_CLEANED_UP)
403 MONO_GC_UNREGISTER_ROOT (threadpool_io->states);
404 mono_g_hash_table_destroy (threadpool_io->states);
405 mono_mutex_destroy (&threadpool_io->states_lock);
407 g_free (threadpool_io->updates);
408 mono_mutex_destroy (&threadpool_io->updates_lock);
410 threadpool_io->backend.cleanup ();
412 #if !defined(HOST_WIN32)
413 close (threadpool_io->wakeup_pipes [0]);
414 close (threadpool_io->wakeup_pipes [1]);
416 closesocket (threadpool_io->wakeup_pipes [0]);
417 closesocket (threadpool_io->wakeup_pipes [1]);
420 g_assert (threadpool_io);
421 g_free (threadpool_io);
422 threadpool_io = NULL;
423 g_assert (!threadpool_io);
425 io_status = STATUS_CLEANED_UP;
429 is_socket_async_callback (MonoImage *system_image, MonoClass *class)
431 MonoClass *socket_async_callback_class = NULL;
433 socket_async_callback_class = mono_class_from_name (system_image, "System.Net.Sockets", "SocketAsyncCallback");
434 g_assert (socket_async_callback_class);
436 return class == socket_async_callback_class;
440 is_async_read_handler (MonoImage *system_image, MonoClass *class)
442 MonoClass *process_class = NULL;
444 process_class = mono_class_from_name (system_image, "System.Diagnostics", "Process");
445 g_assert (process_class);
447 return class->nested_in && class->nested_in == process_class && strcmp (class->name, "AsyncReadHandler") == 0;
451 mono_threadpool_ms_is_io (MonoObject *target, MonoObject *state)
453 MonoImage *system_image;
454 MonoSocketAsyncResult *sockares;
456 system_image = mono_image_loaded ("System");
460 if (!is_socket_async_callback (system_image, target->vtable->klass) && !is_async_read_handler (system_image, target->vtable->klass))
463 sockares = (MonoSocketAsyncResult*) state;
464 if (sockares->operation < AIO_OP_FIRST || sockares->operation >= AIO_OP_LAST)
471 mono_threadpool_ms_io_cleanup (void)
477 mono_threadpool_ms_io_add (MonoAsyncResult *ares, MonoSocketAsyncResult *sockares)
479 ThreadPoolIOUpdate *update;
488 if (mono_runtime_is_shutting_down ())
491 ensure_initialized ();
493 MONO_OBJECT_SETREF (sockares, ares, ares);
495 fd = GPOINTER_TO_INT (sockares->handle);
497 mono_mutex_lock (&threadpool_io->states_lock);
498 g_assert (threadpool_io->states);
500 list = mono_g_hash_table_lookup (threadpool_io->states, GINT_TO_POINTER (fd));
501 is_new = list == NULL;
502 list = mono_mlist_append (list, (MonoObject*) sockares);
503 mono_g_hash_table_replace (threadpool_io->states, sockares->handle, list);
505 events = get_events (list);
507 mono_mutex_lock (&threadpool_io->updates_lock);
508 threadpool_io->updates_size += 1;
509 threadpool_io->updates = g_renew (ThreadPoolIOUpdate, threadpool_io->updates, threadpool_io->updates_size);
511 update = &threadpool_io->updates [threadpool_io->updates_size - 1];
513 update->events = events;
514 update->is_new = is_new;
515 mono_mutex_unlock (&threadpool_io->updates_lock);
517 mono_mutex_unlock (&threadpool_io->states_lock);
519 selector_thread_wakeup ();
525 mono_threadpool_ms_io_remove_socket (int fd)
529 if (io_status != STATUS_INITIALIZED)
532 mono_mutex_lock (&threadpool_io->states_lock);
533 g_assert (threadpool_io->states);
534 list = mono_g_hash_table_lookup (threadpool_io->states, GINT_TO_POINTER (fd));
536 mono_g_hash_table_remove (threadpool_io->states, GINT_TO_POINTER (fd));
537 mono_mutex_unlock (&threadpool_io->states_lock);
540 MonoSocketAsyncResult *sockares, *sockares2;
542 sockares = (MonoSocketAsyncResult*) mono_mlist_get_data (list);
543 if (sockares->operation == AIO_OP_RECEIVE)
544 sockares->operation = AIO_OP_RECV_JUST_CALLBACK;
545 else if (sockares->operation == AIO_OP_SEND)
546 sockares->operation = AIO_OP_SEND_JUST_CALLBACK;
548 sockares2 = get_sockares_for_event (&list, MONO_POLLIN);
550 mono_threadpool_ms_enqueue_work_item (((MonoObject*) sockares2)->vtable->domain, (MonoObject*) sockares2);
555 sockares2 = get_sockares_for_event (&list, MONO_POLLOUT);
557 mono_threadpool_ms_enqueue_work_item (((MonoObject*) sockares2)->vtable->domain, (MonoObject*) sockares2);
562 remove_sockstate_for_domain (gpointer key, gpointer value, gpointer user_data)
565 gboolean remove = FALSE;
567 for (list = value; list; list = mono_mlist_next (list)) {
568 MonoObject *data = mono_mlist_get_data (list);
569 if (mono_object_domain (data) == user_data) {
571 mono_mlist_set_data (list, NULL);
575 //FIXME is there some sort of additional unregistration we need to perform here?
580 mono_threadpool_ms_io_remove_domain_jobs (MonoDomain *domain)
582 if (io_status == STATUS_INITIALIZED) {
583 mono_mutex_lock (&threadpool_io->states_lock);
584 mono_g_hash_table_foreach_remove (threadpool_io->states, remove_sockstate_for_domain, domain);
585 mono_mutex_unlock (&threadpool_io->states_lock);
592 mono_threadpool_ms_is_io (MonoObject *target, MonoObject *state)
598 mono_threadpool_ms_io_cleanup (void)
600 g_assert_not_reached ();
604 mono_threadpool_ms_io_add (MonoAsyncResult *ares, MonoSocketAsyncResult *sockares)
606 g_assert_not_reached ();
610 mono_threadpool_ms_io_remove_socket (int fd)
612 g_assert_not_reached ();
616 mono_threadpool_ms_io_remove_domain_jobs (MonoDomain *domain)
618 g_assert_not_reached ();