2 * threadpool-ms-io.c: Microsoft IO threadpool runtime support
5 * Ludovic Henry (ludovic.henry@xamarin.com)
7 * Copyright 2015 Xamarin, Inc (http://www.xamarin.com)
12 #ifndef DISABLE_SOCKETS
16 #if defined(HOST_WIN32)
23 #if defined(HAVE_EPOLL)
24 #include <sys/epoll.h>
25 #elif defined(HAVE_KQUEUE)
26 #include <sys/types.h>
27 #include <sys/event.h>
31 #include <mono/metadata/gc-internal.h>
32 #include <mono/metadata/mono-mlist.h>
33 #include <mono/metadata/threadpool-internals.h>
34 #include <mono/metadata/threadpool-ms.h>
35 #include <mono/metadata/threadpool-ms-io.h>
36 #include <mono/utils/atomic.h>
37 #include <mono/utils/mono-poll.h>
38 #include <mono/utils/mono-threads.h>
40 /* Keep in sync with System.Net.Sockets.MonoSocketRuntimeWorkItem */
41 struct _MonoSocketRuntimeWorkItem {
43 MonoSocketAsyncResult *socket_async_result;
46 /* Keep in sync with System.Net.Sockets.Socket.SocketOperation */
55 AIO_OP_RECV_JUST_CALLBACK,
56 AIO_OP_SEND_JUST_CALLBACK,
61 AIO_OP_RECEIVE_BUFFERS,
70 } ThreadPoolIOBackend;
76 #if defined(HAVE_EPOLL)
78 struct epoll_event *event;
81 #elif defined(HAVE_KQUEUE)
93 MonoGHashTable *states;
94 mono_mutex_t states_lock;
96 ThreadPoolIOBackend backend;
98 ThreadPoolIOUpdate *updates;
100 mono_mutex_t updates_lock;
102 #if !defined(HOST_WIN32)
103 gint wakeup_pipes [2];
105 SOCKET wakeup_pipes [2];
109 #if defined(HAVE_EPOLL)
112 struct epoll_event *events;
114 #elif defined(HAVE_KQUEUE)
117 struct kevent *events;
128 static gint32 io_status = STATUS_NOT_INITIALIZED;
129 static gint32 io_thread_status = STATUS_NOT_INITIALIZED;
131 static ThreadPoolIO* threadpool_io;
134 get_events_from_state (MonoSocketAsyncResult *ares)
136 switch (ares->operation) {
139 case AIO_OP_RECV_JUST_CALLBACK:
140 case AIO_OP_RECEIVEFROM:
141 case AIO_OP_READPIPE:
142 case AIO_OP_ACCEPTRECEIVE:
143 case AIO_OP_RECEIVE_BUFFERS:
146 case AIO_OP_SEND_JUST_CALLBACK:
149 case AIO_OP_SEND_BUFFERS:
150 case AIO_OP_DISCONNECT:
153 g_assert_not_reached ();
157 static MonoSocketAsyncResult*
158 get_state (MonoMList **list, gint event)
160 MonoSocketAsyncResult *state = NULL;
165 for (current = *list; current; current = mono_mlist_next (current)) {
166 state = (MonoSocketAsyncResult*) mono_mlist_get_data (current);
167 if (get_events_from_state ((MonoSocketAsyncResult*) state) == event)
173 *list = mono_mlist_remove_item (*list, current);
179 get_events (MonoMList *list)
181 MonoSocketAsyncResult *ares;
184 for (; list; list = mono_mlist_next (list))
185 if ((ares = (MonoSocketAsyncResult*) mono_mlist_get_data (list)))
186 events |= get_events_from_state (ares);
192 polling_thread_wakeup (void)
198 #if !defined(HOST_WIN32)
199 written = write (threadpool_io->wakeup_pipes [1], &msg, 1);
203 g_warning ("polling_thread_wakeup: write () failed, error (%d) %s\n", errno, g_strerror (errno));
207 written = send (threadpool_io->wakeup_pipes [1], &msg, 1, 0);
210 if (written == SOCKET_ERROR) {
211 g_warning ("polling_thread_wakeup: write () failed, error (%d)\n", WSAGetLastError ());
219 polling_thread_drain_wakeup_pipes (void)
225 #if !defined(HOST_WIN32)
226 received = read (threadpool_io->wakeup_pipes [0], buffer, sizeof (buffer));
229 if (received == -1) {
230 if (errno != EINTR && errno != EAGAIN)
231 g_warning ("poll_thread: read () failed, error (%d) %s\n", errno, g_strerror (errno));
235 received = recv (threadpool_io->wakeup_pipes [0], buffer, sizeof (buffer), 0);
238 if (received == SOCKET_ERROR) {
239 if (WSAGetLastError () != WSAEINTR && WSAGetLastError () != WSAEWOULDBLOCK)
240 g_warning ("poll_thread: recv () failed, error (%d) %s\n", WSAGetLastError ());
247 #if defined(HAVE_EPOLL)
249 #if defined(HOST_WIN32)
250 /* We assume that epoll is not available on windows */
254 #define EPOLL_NEVENTS 128
260 threadpool_io->epoll.fd = epoll_create1 (EPOLL_CLOEXEC);
262 threadpool_io->epoll.fd = epoll_create1 (256);
263 fcntl (threadpool_io->epoll.fd, F_SETFD, FD_CLOEXEC);
266 if (threadpool_io->epoll.fd == -1) {
268 g_warning ("epoll_init: epoll (EPOLL_CLOEXEC) failed, error (%d) %s\n", errno, g_strerror (errno));
270 g_warning ("epoll_init: epoll (256) failed, error (%d) %s\n", errno, g_strerror (errno));
275 if (epoll_ctl (threadpool_io->epoll.fd, EPOLL_CTL_ADD, threadpool_io->wakeup_pipes [0], EPOLLIN) == -1) {
276 g_warning ("epoll_init: epoll_ctl () failed, error (%d) %s", errno, g_strerror (errno));
277 close (threadpool_io->epoll.fd);
281 threadpool_io->epoll.events = g_new0 (struct epoll_event, EPOLL_NEVENTS);
289 g_free (threadpool_io->epoll.events);
291 close (threadpool_io->epoll.fd);
295 epoll_update (gint fd, gint events, gboolean is_new)
297 ThreadPoolIOUpdate *update;
298 struct epoll_event *event;
301 event = g_new0 (struct epoll_event, 1);
303 if ((events & MONO_POLLIN) != 0)
304 event->events |= EPOLLIN;
305 if ((events & MONO_POLLOUT) != 0)
306 event->events |= EPOLLOUT;
308 mono_mutex_lock (&threadpool_io->updates_lock);
309 threadpool_io->updates_size += 1;
310 threadpool_io->updates = g_renew (ThreadPoolIOUpdate, threadpool_io->updates, threadpool_io->updates_size);
312 update = &threadpool_io->updates [threadpool_io->updates_size - 1];
314 update->epoll.event = event;
315 update->epoll.op = is_new ? EPOLL_CTL_ADD : EPOLL_CTL_MOD;
316 mono_mutex_unlock (&threadpool_io->updates_lock);
318 polling_thread_wakeup ();
322 epoll_thread_add_update (ThreadPoolIOUpdate *update)
324 if (epoll_ctl (threadpool_io->epoll.fd, update->epoll.op, update->fd, update->epoll.event) == -1)
325 g_warning ("epoll_thread_add_update: epoll_ctl(%s) failed, error (%d) %s", update->epoll.op == EPOLL_CTL_ADD ? "EPOLL_CTL_ADD" : "EPOLL_CTL_MOD", errno, g_strerror (errno));
326 g_free (update->epoll.event);
330 epoll_thread_wait_for_event (void)
334 ready = epoll_wait (threadpool_io->epoll.fd, threadpool_io->epoll.events, EPOLL_NEVENTS, -1);
338 check_for_interruption_critical ();
342 g_warning ("epoll_thread_wait_for_event: epoll_wait () failed, error (%d) %s", errno, g_strerror (errno));
351 epoll_thread_get_fd_at (guint i)
353 return threadpool_io->epoll.events [i].data.fd;
357 epoll_thread_create_socket_async_results (gint fd, struct epoll_event *epoll_event, MonoMList **list)
359 g_assert (epoll_event);
363 epoll_ctl (threadpool_io->epoll.fd, EPOLL_CTL_DEL, fd, epoll_event);
367 if ((epoll_event->events & (EPOLLIN | EPOLLERR | EPOLLHUP)) != 0) {
368 MonoSocketAsyncResult *io_event = get_state (list, MONO_POLLIN);
370 mono_threadpool_io_enqueue_socket_async_result (((MonoObject*) io_event)->vtable->domain, io_event);
372 if ((epoll_event->events & (EPOLLOUT | EPOLLERR | EPOLLHUP)) != 0) {
373 MonoSocketAsyncResult *io_event = get_state (list, MONO_POLLOUT);
375 mono_threadpool_io_enqueue_socket_async_result (((MonoObject*) io_event)->vtable->domain, io_event);
378 events = get_events (*list);
379 epoll_event->events = ((events & MONO_POLLOUT) ? EPOLLOUT : 0) | ((events & MONO_POLLIN) ? EPOLLIN : 0);
380 if (epoll_ctl (threadpool_io->epoll.fd, EPOLL_CTL_MOD, fd, epoll_event) == -1) {
381 if (epoll_ctl (threadpool_io->epoll.fd, EPOLL_CTL_ADD, fd, epoll_event) == -1)
382 g_warning ("epoll_thread_create_socket_async_results: epoll_ctl () failed, error (%d) %s", errno, g_strerror (errno));
389 #elif defined(HAVE_KQUEUE)
391 #if defined(HOST_WIN32)
392 /* We assume that kqueue is not available on windows */
396 #define KQUEUE_NEVENTS 128
403 threadpool_io->kqueue.fd = kqueue ();
404 if (threadpool_io->kqueue.fd == -1) {
405 g_warning ("kqueue_init: kqueue () failed, error (%d) %s", errno, g_strerror (errno));
409 EV_SET (&event, threadpool_io->wakeup_pipes [0], EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, 0);
410 if (kevent (threadpool_io->kqueue.fd, &event, 1, NULL, 0, NULL) == -1) {
411 g_warning ("kqueue_init: kevent () failed, error (%d) %s", errno, g_strerror (errno));
412 close (threadpool_io->kqueue.fd);
416 threadpool_io->kqueue.events = g_new0 (struct kevent, KQUEUE_NEVENTS);
422 kqueue_cleanup (void)
424 g_free (threadpool_io->kqueue.events);
426 close (threadpool_io->kqueue.fd);
430 kqueue_update (gint fd, gint events, gboolean is_new)
432 ThreadPoolIOUpdate *update;
433 struct kevent *event;
435 event = g_new0 (struct kevent, 1);
436 if ((events & MONO_POLLIN) != 0)
437 EV_SET (event, fd, EVFILT_READ, EV_ADD | EV_ENABLE | EV_ONESHOT, 0, 0, 0);
438 if ((events & MONO_POLLOUT) != 0)
439 EV_SET (event, fd, EVFILT_WRITE, EV_ADD | EV_ENABLE | EV_ONESHOT, 0, 0, 0);
441 mono_mutex_lock (&threadpool_io->updates_lock);
442 threadpool_io->updates_size += 1;
443 threadpool_io->updates = g_renew (ThreadPoolIOUpdate, threadpool_io->updates, threadpool_io->updates_size);
445 update = &threadpool_io->updates [threadpool_io->updates_size - 1];
447 update->kqueue.event = event;
448 mono_mutex_unlock (&threadpool_io->updates_lock);
450 polling_thread_wakeup ();
454 kqueue_thread_add_update (ThreadPoolIOUpdate *update)
456 if (kevent (threadpool_io->kqueue.fd, update->kqueue.event, 1, NULL, 0, NULL) == -1)
457 g_warning ("kqueue_thread_add_update: kevent(update) failed, error (%d) %s", errno, g_strerror (errno));
458 g_free (update->kqueue.event);
462 kqueue_thread_wait_for_event (void)
466 ready = kevent (threadpool_io->kqueue.fd, NULL, 0, threadpool_io->kqueue.events, KQUEUE_NEVENTS, NULL);
470 check_for_interruption_critical ();
474 g_warning ("kqueue_thread_wait_for_event: kevent () failed, error (%d) %s", errno, g_strerror (errno));
483 kqueue_thread_get_fd_at (guint i)
485 return threadpool_io->kqueue.events [i].ident;
489 kqueue_thread_create_socket_async_results (gint fd, struct kevent *kqueue_event, MonoMList **list)
491 g_assert (kqueue_event);
497 if (kqueue_event->filter == EVFILT_READ || (kqueue_event->flags & EV_ERROR) != 0) {
498 MonoSocketAsyncResult *io_event = get_state (list, MONO_POLLIN);
500 mono_threadpool_io_enqueue_socket_async_result (((MonoObject*) io_event)->vtable->domain, io_event);
502 if (kqueue_event->filter == EVFILT_WRITE || (kqueue_event->flags & EV_ERROR) != 0) {
503 MonoSocketAsyncResult *io_event = get_state (list, MONO_POLLOUT);
505 mono_threadpool_io_enqueue_socket_async_result (((MonoObject*) io_event)->vtable->domain, io_event);
508 events = get_events (*list);
509 if (kqueue_event->filter == EVFILT_READ && (events & MONO_POLLIN) != 0) {
510 EV_SET (kqueue_event, fd, EVFILT_READ, EV_ADD | EV_ENABLE | EV_ONESHOT, 0, 0, 0);
511 if (kevent (threadpool_io->kqueue.fd, kqueue_event, 1, NULL, 0, NULL) == -1)
512 g_warning ("kqueue_thread_create_socket_async_results: kevent (read) failed, error (%d) %s", errno, g_strerror (errno));
514 if (kqueue_event->filter == EVFILT_WRITE && (events & MONO_POLLOUT) != 0) {
515 EV_SET (kqueue_event, fd, EVFILT_WRITE, EV_ADD | EV_ENABLE | EV_ONESHOT, 0, 0, 0);
516 if (kevent (threadpool_io->kqueue.fd, kqueue_event, 1, NULL, 0, NULL) == -1)
517 g_warning ("kqueue_thread_create_socket_async_results: kevent (write) failed, error (%d) %s", errno, g_strerror (errno));
526 #define POLL_NEVENTS 1024
529 POLL_INIT_FD (mono_pollfd *poll_fd, gint fd, gint events)
532 poll_fd->events = events;
533 poll_fd->revents = 0;
541 threadpool_io->poll.fds_max = 1;
542 threadpool_io->poll.fds_size = POLL_NEVENTS;
543 threadpool_io->poll.fds = g_new0 (mono_pollfd, threadpool_io->poll.fds_size);
545 POLL_INIT_FD (threadpool_io->poll.fds, threadpool_io->wakeup_pipes [0], MONO_POLLIN);
546 for (i = threadpool_io->poll.fds_max; i < threadpool_io->poll.fds_size; ++i)
547 POLL_INIT_FD (threadpool_io->poll.fds + i, -1, 0);
555 g_free (threadpool_io->poll.fds);
559 poll_update (gint fd, gint events, gboolean is_new)
561 ThreadPoolIOUpdate *update;
563 mono_mutex_lock (&threadpool_io->updates_lock);
564 threadpool_io->updates_size += 1;
565 threadpool_io->updates = g_renew (ThreadPoolIOUpdate, threadpool_io->updates, threadpool_io->updates_size);
567 update = &threadpool_io->updates [threadpool_io->updates_size - 1];
569 POLL_INIT_FD (&update->poll.fd, fd, events);
570 mono_mutex_unlock (&threadpool_io->updates_lock);
572 polling_thread_wakeup ();
576 poll_mark_bad_fds (mono_pollfd *poll_fds, gint poll_fds_size)
581 mono_pollfd *poll_fd;
583 for (i = 0; i < poll_fds_size; i++) {
584 poll_fd = poll_fds + i;
585 if (poll_fd->fd == -1)
588 ret = mono_poll (poll_fd, 1, 0);
592 #if !defined(HOST_WIN32)
595 if (WSAGetLastError () == WSAEBADF)
598 poll_fd->revents |= MONO_POLLNVAL;
608 poll_thread_add_update (ThreadPoolIOUpdate *update)
610 gboolean found = FALSE;
613 for (j = 1; j < threadpool_io->poll.fds_size; ++j) {
614 mono_pollfd *poll_fd = threadpool_io->poll.fds + j;
615 if (poll_fd->fd == update->poll.fd.fd) {
622 for (j = 1; j < threadpool_io->poll.fds_size; ++j) {
623 mono_pollfd *poll_fd = threadpool_io->poll.fds + j;
624 if (poll_fd->fd == -1)
629 if (j == threadpool_io->poll.fds_size) {
630 threadpool_io->poll.fds_size += POLL_NEVENTS;
631 threadpool_io->poll.fds = g_renew (mono_pollfd, threadpool_io->poll.fds, threadpool_io->poll.fds_size);
632 for (k = j; k < threadpool_io->poll.fds_size; ++k)
633 POLL_INIT_FD (threadpool_io->poll.fds + k, -1, 0);
636 POLL_INIT_FD (threadpool_io->poll.fds + j, update->poll.fd.fd, update->poll.fd.events);
638 if (j >= threadpool_io->poll.fds_max)
639 threadpool_io->poll.fds_max = j + 1;
643 poll_thread_wait_for_event (void)
647 ready = mono_poll (threadpool_io->poll.fds, threadpool_io->poll.fds_max, -1);
650 * Apart from EINTR, we only check EBADF, for the rest:
651 * EINVAL: mono_poll() 'protects' us from descriptor
652 * numbers above the limit if using select() by marking
653 * then as MONO_POLLERR. If a system poll() is being
654 * used, the number of descriptor we're passing will not
655 * be over sysconf(_SC_OPEN_MAX), as the error would have
656 * happened when opening.
658 * EFAULT: we own the memory pointed by pfds.
659 * ENOMEM: we're doomed anyway
662 #if !defined(HOST_WIN32)
665 switch (WSAGetLastError ())
668 #if !defined(HOST_WIN32)
673 check_for_interruption_critical ();
676 #if !defined(HOST_WIN32)
681 ready = poll_mark_bad_fds (threadpool_io->poll.fds, threadpool_io->poll.fds_max);
684 #if !defined(HOST_WIN32)
685 g_warning ("poll_thread_wait_for_event: mono_poll () failed, error (%d) %s", errno, g_strerror (errno));
687 g_warning ("poll_thread_wait_for_event: mono_poll () failed, error (%d)\n", WSAGetLastError ());
697 poll_thread_get_fd_at (guint i)
699 return threadpool_io->poll.fds [i].fd;
703 poll_thread_create_socket_async_results (gint fd, mono_pollfd *poll_fd, MonoMList **list)
708 if (fd == -1 || poll_fd->revents == 0)
712 POLL_INIT_FD (poll_fd, -1, 0);
714 if ((poll_fd->revents & (MONO_POLLIN | MONO_POLLERR | MONO_POLLHUP | MONO_POLLNVAL)) != 0) {
715 MonoSocketAsyncResult *io_event = get_state (list, MONO_POLLIN);
717 mono_threadpool_io_enqueue_socket_async_result (((MonoObject*) io_event)->vtable->domain, io_event);
719 if ((poll_fd->revents & (MONO_POLLOUT | MONO_POLLERR | MONO_POLLHUP | MONO_POLLNVAL)) != 0) {
720 MonoSocketAsyncResult *io_event = get_state (list, MONO_POLLOUT);
722 mono_threadpool_io_enqueue_socket_async_result (((MonoObject*) io_event)->vtable->domain, io_event);
725 poll_fd->events = get_events (*list);
732 polling_thread (gpointer data)
734 io_thread_status = STATUS_INITIALIZED;
741 mono_gc_set_skip_thread (TRUE);
743 mono_mutex_lock (&threadpool_io->updates_lock);
744 for (i = 0; i < threadpool_io->updates_size; ++i) {
745 switch (threadpool_io->backend) {
746 #if defined(HAVE_EPOLL)
748 epoll_thread_add_update (&threadpool_io->updates [i]);
750 #elif defined(HAVE_KQUEUE)
752 kqueue_thread_add_update (&threadpool_io->updates [i]);
756 poll_thread_add_update (&threadpool_io->updates [i]);
759 g_assert_not_reached ();
763 if (threadpool_io->updates_size > 0) {
764 threadpool_io->updates_size = 0;
765 threadpool_io->updates = g_renew (ThreadPoolIOUpdate, threadpool_io->updates, threadpool_io->updates_size);
767 mono_mutex_unlock (&threadpool_io->updates_lock);
769 switch (threadpool_io->backend) {
770 #if defined(HAVE_EPOLL)
772 ready = epoll_thread_wait_for_event ();
774 #elif defined(HAVE_KQUEUE)
776 ready = kqueue_thread_wait_for_event ();
780 ready = poll_thread_wait_for_event ();
783 g_assert_not_reached ();
786 mono_gc_set_skip_thread (FALSE);
788 if (ready == -1 || mono_runtime_is_shutting_down ())
791 switch (threadpool_io->backend) {
792 #if defined(HAVE_EPOLL)
796 #elif defined(HAVE_KQUEUE)
798 max = KQUEUE_NEVENTS;
802 max = threadpool_io->poll.fds_max;
805 g_assert_not_reached ();
808 mono_mutex_lock (&threadpool_io->states_lock);
809 for (i = 0; i < max && ready > 0; ++i) {
814 switch (threadpool_io->backend) {
815 #if defined(HAVE_EPOLL)
817 fd = epoll_thread_get_fd_at (i);
819 #elif defined(HAVE_KQUEUE)
821 fd = kqueue_thread_get_fd_at (i);
825 fd = poll_thread_get_fd_at (i);
828 g_assert_not_reached ();
831 if (fd == threadpool_io->wakeup_pipes [0]) {
832 polling_thread_drain_wakeup_pipes ();
837 list = mono_g_hash_table_lookup (threadpool_io->states, GINT_TO_POINTER (fd));
839 switch (threadpool_io->backend) {
840 #if defined(HAVE_EPOLL)
842 created = epoll_thread_create_socket_async_results (fd, &threadpool_io->epoll.events [i], &list);
844 #elif defined(HAVE_KQUEUE)
846 created = kqueue_thread_create_socket_async_results (fd, &threadpool_io->kqueue.events [i], &list);
850 created = poll_thread_create_socket_async_results (fd, &threadpool_io->poll.fds [i], &list);
853 g_assert_not_reached ();
860 mono_g_hash_table_replace (threadpool_io->states, GINT_TO_POINTER (fd), list);
862 mono_g_hash_table_remove (threadpool_io->states, GINT_TO_POINTER (fd));
866 mono_mutex_unlock (&threadpool_io->states_lock);
869 io_thread_status = STATUS_CLEANED_UP;
873 wakeup_pipes_init (void)
875 #if !defined(HOST_WIN32)
876 if (pipe (threadpool_io->wakeup_pipes) == -1)
877 g_error ("wakeup_pipes_init: pipe () failed, error (%d) %s\n", errno, g_strerror (errno));
878 if (fcntl (threadpool_io->wakeup_pipes [0], F_SETFL, O_NONBLOCK) == -1)
879 g_error ("wakeup_pipes_init: fcntl () failed, error (%d) %s\n", errno, g_strerror (errno));
881 struct sockaddr_in client;
882 struct sockaddr_in server;
887 server_sock = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
888 g_assert (server_sock != INVALID_SOCKET);
889 threadpool_io->wakeup_pipes [1] = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
890 g_assert (threadpool_io->wakeup_pipes [1] != INVALID_SOCKET);
892 server.sin_family = AF_INET;
893 server.sin_addr.s_addr = inet_addr ("127.0.0.1");
895 if (bind (server_sock, (SOCKADDR*) &server, sizeof (server)) == SOCKET_ERROR) {
896 closesocket (server_sock);
897 g_error ("wakeup_pipes_init: bind () failed, error (%d)\n", WSAGetLastError ());
900 size = sizeof (server);
901 if (getsockname (server_sock, (SOCKADDR*) &server, &size) == SOCKET_ERROR) {
902 closesocket (server_sock);
903 g_error ("wakeup_pipes_init: getsockname () failed, error (%d)\n", WSAGetLastError ());
905 if (listen (server_sock, 1024) == SOCKET_ERROR) {
906 closesocket (server_sock);
907 g_error ("wakeup_pipes_init: listen () failed, error (%d)\n", WSAGetLastError ());
909 if (connect ((SOCKET) threadpool_io->wakeup_pipes [1], (SOCKADDR*) &server, sizeof (server)) == SOCKET_ERROR) {
910 closesocket (server_sock);
911 g_error ("wakeup_pipes_init: connect () failed, error (%d)\n", WSAGetLastError ());
914 size = sizeof (client);
915 threadpool_io->wakeup_pipes [0] = accept (server_sock, (SOCKADDR *) &client, &size);
916 g_assert (threadpool_io->wakeup_pipes [0] != INVALID_SOCKET);
919 if (ioctlsocket (threadpool_io->wakeup_pipes [0], FIONBIO, &arg) == SOCKET_ERROR) {
920 closesocket (threadpool_io->wakeup_pipes [0]);
921 closesocket (server_sock);
922 g_error ("wakeup_pipes_init: ioctlsocket () failed, error (%d)\n", WSAGetLastError ());
925 closesocket (server_sock);
930 ensure_initialized (void)
932 if (io_status >= STATUS_INITIALIZED)
934 if (io_status == STATUS_INITIALIZING || InterlockedCompareExchange (&io_status, STATUS_INITIALIZING, STATUS_NOT_INITIALIZED) != STATUS_NOT_INITIALIZED) {
935 while (io_status == STATUS_INITIALIZING)
936 mono_thread_info_yield ();
937 g_assert (io_status >= STATUS_INITIALIZED);
941 g_assert (!threadpool_io);
942 threadpool_io = g_new0 (ThreadPoolIO, 1);
943 g_assert (threadpool_io);
945 threadpool_io->states = mono_g_hash_table_new_type (g_direct_hash, g_direct_equal, MONO_HASH_VALUE_GC);
946 MONO_GC_REGISTER_ROOT_FIXED (threadpool_io->states);
947 mono_mutex_init (&threadpool_io->states_lock);
949 threadpool_io->updates = NULL;
950 threadpool_io->updates_size = 0;
951 mono_mutex_init (&threadpool_io->updates_lock);
953 #if defined(HAVE_EPOLL)
954 threadpool_io->backend = BACKEND_EPOLL;
955 #elif defined(HAVE_KQUEUE)
956 threadpool_io->backend = BACKEND_KQUEUE;
958 threadpool_io->backend = BACKEND_POLL;
960 if (g_getenv ("MONO_DISABLE_AIO") != NULL)
961 threadpool_io->backend = BACKEND_POLL;
963 wakeup_pipes_init ();
966 switch (threadpool_io->backend) {
967 #if defined(HAVE_EPOLL)
969 if (!epoll_init ()) {
970 threadpool_io->backend = BACKEND_POLL;
971 goto retry_init_backend;
974 #elif defined(HAVE_KQUEUE)
976 if (!kqueue_init ()) {
977 threadpool_io->backend = BACKEND_POLL;
978 goto retry_init_backend;
984 g_error ("ensure_initialized: poll_init () failed");
987 g_assert_not_reached ();
990 if (!mono_thread_create_internal (mono_get_root_domain (), polling_thread, NULL, TRUE, SMALL_STACK))
991 g_error ("ensure_initialized: mono_thread_create_internal () failed");
993 io_thread_status = STATUS_INITIALIZING;
994 mono_memory_write_barrier ();
996 io_status = STATUS_INITIALIZED;
1000 ensure_cleanedup (void)
1002 if (io_status == STATUS_NOT_INITIALIZED && InterlockedCompareExchange (&io_status, STATUS_CLEANED_UP, STATUS_NOT_INITIALIZED) == STATUS_NOT_INITIALIZED)
1004 if (io_status == STATUS_INITIALIZING) {
1005 while (io_status == STATUS_INITIALIZING)
1006 mono_thread_info_yield ();
1008 if (io_status == STATUS_CLEANED_UP)
1010 if (io_status == STATUS_CLEANING_UP || InterlockedCompareExchange (&io_status, STATUS_CLEANING_UP, STATUS_INITIALIZED) != STATUS_INITIALIZED) {
1011 while (io_status == STATUS_CLEANING_UP)
1012 mono_thread_info_yield ();
1013 g_assert (io_status == STATUS_CLEANED_UP);
1017 /* we make the assumption along the code that we are
1018 * cleaning up only if the runtime is shutting down */
1019 g_assert (mono_runtime_is_shutting_down ());
1021 polling_thread_wakeup ();
1022 while (io_thread_status != STATUS_CLEANED_UP)
1025 MONO_GC_UNREGISTER_ROOT (threadpool_io->states);
1026 mono_g_hash_table_destroy (threadpool_io->states);
1027 mono_mutex_destroy (&threadpool_io->states_lock);
1029 g_free (threadpool_io->updates);
1030 mono_mutex_destroy (&threadpool_io->updates_lock);
1032 switch (threadpool_io->backend) {
1033 #if defined(HAVE_EPOLL)
1037 #elif defined(HAVE_KQUEUE)
1038 case BACKEND_KQUEUE:
1046 g_assert_not_reached ();
1049 #if !defined(HOST_WIN32)
1050 close (threadpool_io->wakeup_pipes [0]);
1051 close (threadpool_io->wakeup_pipes [1]);
1053 closesocket (threadpool_io->wakeup_pipes [0]);
1054 closesocket (threadpool_io->wakeup_pipes [1]);
1057 g_assert (threadpool_io);
1058 g_free (threadpool_io);
1059 threadpool_io = NULL;
1060 g_assert (!threadpool_io);
1062 io_status = STATUS_CLEANED_UP;
1066 mono_threadpool_ms_is_io (MonoObject *target, MonoObject *state)
1068 static MonoClass *socket_class = NULL;
1069 static MonoClass *socket_async_class = NULL;
1070 static MonoClass *process_class = NULL;
1071 static MonoClass *async_read_handler_class = NULL;
1073 MonoSocketAsyncResult *sockares;
1075 if (!mono_defaults.system)
1076 mono_defaults.system = mono_image_loaded ("System");
1077 if (!mono_defaults.system)
1079 g_assert (mono_defaults.system);
1082 socket_class = mono_class_from_name (mono_defaults.system, "System.Net.Sockets", "Socket");
1083 g_assert (socket_class);
1086 process_class = mono_class_from_name (mono_defaults.system, "System.Diagnostics", "Process");
1087 g_assert (process_class);
1089 class = target->vtable->klass;
1091 if (!socket_async_class) {
1092 if (class->nested_in && class->nested_in == socket_class && strcmp (class->name, "SocketAsyncCall") == 0)
1093 socket_async_class = class;
1096 if (!async_read_handler_class) {
1097 if (class->nested_in && class->nested_in == process_class && strcmp (class->name, "AsyncReadHandler") == 0)
1098 async_read_handler_class = class;
1101 if (class != socket_async_class && class != async_read_handler_class)
1104 sockares = (MonoSocketAsyncResult*) state;
1105 if (sockares->operation < AIO_OP_FIRST || sockares->operation >= AIO_OP_LAST)
1112 mono_threadpool_ms_io_cleanup (void)
1114 ensure_cleanedup ();
1118 mono_threadpool_ms_io_add (MonoAsyncResult *ares, MonoSocketAsyncResult *sockares)
1126 g_assert (sockares);
1128 if (mono_runtime_is_shutting_down ())
1131 ensure_initialized ();
1133 MONO_OBJECT_SETREF (sockares, ares, ares);
1135 fd = GPOINTER_TO_INT (sockares->handle);
1137 mono_mutex_lock (&threadpool_io->states_lock);
1138 g_assert (threadpool_io->states);
1140 list = mono_g_hash_table_lookup (threadpool_io->states, GINT_TO_POINTER (fd));
1141 is_new = list == NULL;
1142 list = mono_mlist_append (list, (MonoObject*) sockares);
1143 mono_g_hash_table_replace (threadpool_io->states, sockares->handle, list);
1145 events = get_events (list);
1147 switch (threadpool_io->backend) {
1148 #if defined(HAVE_EPOLL)
1149 case BACKEND_EPOLL: {
1150 epoll_update (fd, events, is_new);
1153 #elif defined(HAVE_KQUEUE)
1154 case BACKEND_KQUEUE: {
1155 kqueue_update (fd, events, is_new);
1159 case BACKEND_POLL: {
1160 poll_update (fd, events, is_new);
1164 g_assert_not_reached ();
1167 mono_mutex_unlock (&threadpool_io->states_lock);
1173 mono_threadpool_ms_io_remove_socket (int fd)
1177 if (io_status != STATUS_INITIALIZED)
1180 mono_mutex_lock (&threadpool_io->states_lock);
1181 g_assert (threadpool_io->states);
1182 list = mono_g_hash_table_lookup (threadpool_io->states, GINT_TO_POINTER (fd));
1184 mono_g_hash_table_remove (threadpool_io->states, GINT_TO_POINTER (fd));
1185 mono_mutex_unlock (&threadpool_io->states_lock);
1188 MonoSocketAsyncResult *sockares, *sockares2;
1190 sockares = (MonoSocketAsyncResult*) mono_mlist_get_data (list);
1191 if (sockares->operation == AIO_OP_RECEIVE)
1192 sockares->operation = AIO_OP_RECV_JUST_CALLBACK;
1193 else if (sockares->operation == AIO_OP_SEND)
1194 sockares->operation = AIO_OP_SEND_JUST_CALLBACK;
1196 sockares2 = get_state (&list, MONO_POLLIN);
1198 mono_threadpool_io_enqueue_socket_async_result (((MonoObject*) sockares2)->vtable->domain, sockares2);
1203 sockares2 = get_state (&list, MONO_POLLOUT);
1205 mono_threadpool_io_enqueue_socket_async_result (((MonoObject*) sockares2)->vtable->domain, sockares2);
1210 remove_sockstate_for_domain (gpointer key, gpointer value, gpointer user_data)
1213 gboolean remove = FALSE;
1215 for (list = value; list; list = mono_mlist_next (list)) {
1216 MonoObject *data = mono_mlist_get_data (list);
1217 if (mono_object_domain (data) == user_data) {
1219 mono_mlist_set_data (list, NULL);
1223 //FIXME is there some sort of additional unregistration we need to perform here?
1228 mono_threadpool_ms_io_remove_domain_jobs (MonoDomain *domain)
1230 if (io_status == STATUS_INITIALIZED) {
1231 mono_mutex_lock (&threadpool_io->states_lock);
1232 mono_g_hash_table_foreach_remove (threadpool_io->states, remove_sockstate_for_domain, domain);
1233 mono_mutex_unlock (&threadpool_io->states_lock);
1238 mono_threadpool_io_enqueue_socket_async_result (MonoDomain *domain, MonoSocketAsyncResult *sockares)
1240 static MonoClass *socket_runtime_work_item_class = NULL;
1241 MonoSocketRuntimeWorkItem *srwi;
1243 g_assert (sockares);
1245 if (!mono_defaults.system)
1246 mono_defaults.system = mono_image_loaded ("System");
1247 g_assert (mono_defaults.system);
1249 if (!socket_runtime_work_item_class)
1250 socket_runtime_work_item_class = mono_class_from_name (mono_defaults.system, "System.Net.Sockets", "MonoSocketRuntimeWorkItem");
1251 g_assert (socket_runtime_work_item_class);
1253 srwi = (MonoSocketRuntimeWorkItem*) mono_object_new (domain, socket_runtime_work_item_class);
1254 MONO_OBJECT_SETREF (srwi, socket_async_result, sockares);
1256 mono_threadpool_ms_enqueue_work_item (domain, (MonoObject*) srwi);
1260 ves_icall_System_Net_Sockets_MonoSocketRuntimeWorkItem_ExecuteWorkItem (MonoSocketRuntimeWorkItem *rwi)
1262 MonoSocketAsyncResult *sockares;
1263 MonoAsyncResult *ares;
1264 MonoObject *exc = NULL;
1268 sockares = rwi->socket_async_result;
1269 g_assert (sockares);
1270 g_assert (sockares->ares);
1272 switch (sockares->operation) {
1273 case AIO_OP_RECEIVE:
1274 sockares->total = ves_icall_System_Net_Sockets_Socket_Receive_internal ((SOCKET) (gssize) sockares->handle, sockares->buffer, sockares->offset,
1275 sockares->size, sockares->socket_flags, &sockares->error);
1278 sockares->total = ves_icall_System_Net_Sockets_Socket_Send_internal ((SOCKET) (gssize) sockares->handle, sockares->buffer, sockares->offset,
1279 sockares->size, sockares->socket_flags, &sockares->error);
1283 ares = sockares->ares;
1286 mono_async_result_invoke (ares, &exc);
1288 if (sockares->completed && sockares->callback) {
1289 MonoAsyncResult *cb_ares;
1291 /* Don't call mono_async_result_new() to avoid capturing the context */
1292 cb_ares = (MonoAsyncResult*) mono_object_new (mono_domain_get (), mono_defaults.asyncresult_class);
1293 MONO_OBJECT_SETREF (cb_ares, async_delegate, sockares->callback);
1294 MONO_OBJECT_SETREF (cb_ares, async_state, (MonoObject*) sockares);
1296 mono_threadpool_ms_enqueue_async_result (mono_domain_get (), cb_ares);
1300 mono_raise_exception ((MonoException*) exc);
1306 mono_threadpool_ms_is_io (MonoObject *target, MonoObject *state)
1312 mono_threadpool_ms_io_cleanup (void)
1314 g_assert_not_reached ();
1318 mono_threadpool_ms_io_add (MonoAsyncResult *ares, MonoSocketAsyncResult *sockares)
1320 g_assert_not_reached ();
1324 mono_threadpool_ms_io_remove_socket (int fd)
1326 g_assert_not_reached ();
1330 mono_threadpool_ms_io_remove_domain_jobs (MonoDomain *domain)
1332 g_assert_not_reached ();
1336 mono_threadpool_io_enqueue_socket_async_result (MonoDomain *domain, MonoSocketAsyncResult *sockares)
1338 g_assert_not_reached ();
1342 ves_icall_System_Net_Sockets_MonoSocketRuntimeWorkItem_ExecuteWorkItem (MonoSocketRuntimeWorkItem *rwi)
1344 g_assert_not_reached ();