[eglib] Prefer <langinfo.h> to <localcharset.h>
[mono.git] / mono / metadata / threadpool-ms-io.c
1 /*
2  * threadpool-ms-io.c: Microsoft IO threadpool runtime support
3  *
4  * Author:
5  *      Ludovic Henry (ludovic.henry@xamarin.com)
6  *
7  * Copyright 2015 Xamarin, Inc (http://www.xamarin.com)
8  */
9
10 #include <config.h>
11
12 #ifndef DISABLE_SOCKETS
13
14 #include <glib.h>
15
16 #if defined(HOST_WIN32)
17 #include <windows.h>
18 #else
19 #include <errno.h>
20 #include <fcntl.h>
21 #endif
22
23 #include <mono/metadata/gc-internal.h>
24 #include <mono/metadata/mono-mlist.h>
25 #include <mono/metadata/threadpool-internals.h>
26 #include <mono/metadata/threadpool-ms.h>
27 #include <mono/metadata/threadpool-ms-io.h>
28 #include <mono/utils/atomic.h>
29 #include <mono/utils/mono-poll.h>
30 #include <mono/utils/mono-threads.h>
31
32 /* Keep in sync with System.Net.Sockets.Socket.SocketOperation */
33 enum {
34         AIO_OP_FIRST,
35         AIO_OP_ACCEPT = 0,
36         AIO_OP_CONNECT,
37         AIO_OP_RECEIVE,
38         AIO_OP_RECEIVEFROM,
39         AIO_OP_SEND,
40         AIO_OP_SENDTO,
41         AIO_OP_RECV_JUST_CALLBACK,
42         AIO_OP_SEND_JUST_CALLBACK,
43         AIO_OP_READPIPE,
44         AIO_OP_CONSOLE2,
45         AIO_OP_DISCONNECT,
46         AIO_OP_ACCEPTRECEIVE,
47         AIO_OP_RECEIVE_BUFFERS,
48         AIO_OP_SEND_BUFFERS,
49         AIO_OP_LAST
50 };
51
52 typedef struct {
53         gint fd;
54         gint events;
55         gboolean is_new;
56 } ThreadPoolIOUpdate;
57
58 typedef struct {
59         gboolean (*init) (gint wakeup_pipe_fd);
60         void     (*cleanup) (void);
61         void     (*update_add) (ThreadPoolIOUpdate *update);
62         gint     (*event_wait) (void);
63         gint     (*event_max) (void);
64         gint     (*event_fd_at) (guint i);
65         gboolean (*event_create_sockares_at) (guint i, gint fd, MonoMList **list);
66 } ThreadPoolIOBackend;
67
68 static int
69 get_events_from_sockares (MonoSocketAsyncResult *ares)
70 {
71         switch (ares->operation) {
72         case AIO_OP_ACCEPT:
73         case AIO_OP_RECEIVE:
74         case AIO_OP_RECV_JUST_CALLBACK:
75         case AIO_OP_RECEIVEFROM:
76         case AIO_OP_READPIPE:
77         case AIO_OP_ACCEPTRECEIVE:
78         case AIO_OP_RECEIVE_BUFFERS:
79                 return MONO_POLLIN;
80         case AIO_OP_SEND:
81         case AIO_OP_SEND_JUST_CALLBACK:
82         case AIO_OP_SENDTO:
83         case AIO_OP_CONNECT:
84         case AIO_OP_SEND_BUFFERS:
85         case AIO_OP_DISCONNECT:
86                 return MONO_POLLOUT;
87         default:
88                 g_assert_not_reached ();
89         }
90 }
91
92 static MonoSocketAsyncResult*
93 get_sockares_for_event (MonoMList **list, gint event)
94 {
95         MonoSocketAsyncResult *state = NULL;
96         MonoMList *current;
97
98         g_assert (list);
99
100         for (current = *list; current; current = mono_mlist_next (current)) {
101                 state = (MonoSocketAsyncResult*) mono_mlist_get_data (current);
102                 if (get_events_from_sockares ((MonoSocketAsyncResult*) state) == event)
103                         break;
104                 state = NULL;
105         }
106
107         if (current)
108                 *list = mono_mlist_remove_item (*list, current);
109
110         return state;
111 }
112
113 static gint
114 get_events (MonoMList *list)
115 {
116         MonoSocketAsyncResult *ares;
117         gint events = 0;
118
119         for (; list; list = mono_mlist_next (list))
120                 if ((ares = (MonoSocketAsyncResult*) mono_mlist_get_data (list)))
121                         events |= get_events_from_sockares (ares);
122
123         return events;
124 }
125
126 #include "threadpool-ms-io-epoll.c"
127 #include "threadpool-ms-io-kqueue.c"
128 #include "threadpool-ms-io-poll.c"
129
130 typedef struct {
131         MonoGHashTable *states;
132         mono_mutex_t states_lock;
133
134         ThreadPoolIOBackend backend;
135
136         ThreadPoolIOUpdate *updates;
137         guint updates_size;
138         mono_mutex_t updates_lock;
139
140 #if !defined(HOST_WIN32)
141         gint wakeup_pipes [2];
142 #else
143         SOCKET wakeup_pipes [2];
144 #endif
145 } ThreadPoolIO;
146
147 static gint32 io_status = STATUS_NOT_INITIALIZED;
148 static gint32 io_thread_status = STATUS_NOT_INITIALIZED;
149
150 static ThreadPoolIO* threadpool_io;
151
152 static void
153 selector_thread_wakeup (void)
154 {
155         gchar msg = 'c';
156         gint written;
157
158         for (;;) {
159 #if !defined(HOST_WIN32)
160                 written = write (threadpool_io->wakeup_pipes [1], &msg, 1);
161                 if (written == 1)
162                         break;
163                 if (written == -1) {
164                         g_warning ("selector_thread_wakeup: write () failed, error (%d) %s\n", errno, g_strerror (errno));
165                         break;
166                 }
167 #else
168                 written = send (threadpool_io->wakeup_pipes [1], &msg, 1, 0);
169                 if (written == 1)
170                         break;
171                 if (written == SOCKET_ERROR) {
172                         g_warning ("selector_thread_wakeup: write () failed, error (%d)\n", WSAGetLastError ());
173                         break;
174                 }
175 #endif
176         }
177 }
178
179 static void
180 selector_thread_wakeup_drain_pipes (void)
181 {
182         gchar buffer [128];
183         gint received;
184
185         for (;;) {
186 #if !defined(HOST_WIN32)
187                 received = read (threadpool_io->wakeup_pipes [0], buffer, sizeof (buffer));
188                 if (received == 0)
189                         break;
190                 if (received == -1) {
191                         if (errno != EINTR && errno != EAGAIN)
192                                 g_warning ("selector_thread_wakeup_drain_pipes: read () failed, error (%d) %s\n", errno, g_strerror (errno));
193                         break;
194                 }
195 #else
196                 received = recv (threadpool_io->wakeup_pipes [0], buffer, sizeof (buffer), 0);
197                 if (received == 0)
198                         break;
199                 if (received == SOCKET_ERROR) {
200                         if (WSAGetLastError () != WSAEINTR && WSAGetLastError () != WSAEWOULDBLOCK)
201                                 g_warning ("selector_thread_wakeup_drain_pipes: recv () failed, error (%d) %s\n", WSAGetLastError ());
202                         break;
203                 }
204 #endif
205         }
206 }
207
208 static void
209 selector_thread (gpointer data)
210 {
211         io_thread_status = STATUS_INITIALIZED;
212
213         for (;;) {
214                 guint i;
215                 guint max;
216                 gint ready = 0;
217
218                 mono_gc_set_skip_thread (TRUE);
219
220                 mono_mutex_lock (&threadpool_io->updates_lock);
221                 for (i = 0; i < threadpool_io->updates_size; ++i) {
222                         threadpool_io->backend.update_add (&threadpool_io->updates [i]);
223                 }
224                 if (threadpool_io->updates_size > 0) {
225                         threadpool_io->updates_size = 0;
226                         threadpool_io->updates = g_renew (ThreadPoolIOUpdate, threadpool_io->updates, threadpool_io->updates_size);
227                 }
228                 mono_mutex_unlock (&threadpool_io->updates_lock);
229
230                 ready = threadpool_io->backend.event_wait ();
231
232                 mono_gc_set_skip_thread (FALSE);
233
234                 if (ready == -1 || mono_runtime_is_shutting_down ())
235                         break;
236
237                 max = threadpool_io->backend.event_max ();
238
239                 mono_mutex_lock (&threadpool_io->states_lock);
240                 for (i = 0; i < max && ready > 0; ++i) {
241                         MonoMList *list;
242                         gboolean valid_fd;
243                         gint fd;
244
245                         fd = threadpool_io->backend.event_fd_at (i);
246
247                         if (fd == threadpool_io->wakeup_pipes [0]) {
248                                 selector_thread_wakeup_drain_pipes ();
249                                 ready -= 1;
250                                 continue;
251                         }
252
253                         list = mono_g_hash_table_lookup (threadpool_io->states, GINT_TO_POINTER (fd));
254
255                         valid_fd = threadpool_io->backend.event_create_sockares_at (i, fd, &list);
256                         if (!valid_fd)
257                                 continue;
258
259                         if (list)
260                                 mono_g_hash_table_replace (threadpool_io->states, GINT_TO_POINTER (fd), list);
261                         else
262                                 mono_g_hash_table_remove (threadpool_io->states, GINT_TO_POINTER (fd));
263
264                         ready -= 1;
265                 }
266                 mono_mutex_unlock (&threadpool_io->states_lock);
267         }
268
269         io_thread_status = STATUS_CLEANED_UP;
270 }
271
272 static void
273 wakeup_pipes_init (void)
274 {
275 #if !defined(HOST_WIN32)
276         if (pipe (threadpool_io->wakeup_pipes) == -1)
277                 g_error ("wakeup_pipes_init: pipe () failed, error (%d) %s\n", errno, g_strerror (errno));
278         if (fcntl (threadpool_io->wakeup_pipes [0], F_SETFL, O_NONBLOCK) == -1)
279                 g_error ("wakeup_pipes_init: fcntl () failed, error (%d) %s\n", errno, g_strerror (errno));
280 #else
281         struct sockaddr_in client;
282         struct sockaddr_in server;
283         SOCKET server_sock;
284         gulong arg;
285         gint size;
286
287         server_sock = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
288         g_assert (server_sock != INVALID_SOCKET);
289         threadpool_io->wakeup_pipes [1] = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
290         g_assert (threadpool_io->wakeup_pipes [1] != INVALID_SOCKET);
291
292         server.sin_family = AF_INET;
293         server.sin_addr.s_addr = inet_addr ("127.0.0.1");
294         server.sin_port = 0;
295         if (bind (server_sock, (SOCKADDR*) &server, sizeof (server)) == SOCKET_ERROR) {
296                 closesocket (server_sock);
297                 g_error ("wakeup_pipes_init: bind () failed, error (%d)\n", WSAGetLastError ());
298         }
299
300         size = sizeof (server);
301         if (getsockname (server_sock, (SOCKADDR*) &server, &size) == SOCKET_ERROR) {
302                 closesocket (server_sock);
303                 g_error ("wakeup_pipes_init: getsockname () failed, error (%d)\n", WSAGetLastError ());
304         }
305         if (listen (server_sock, 1024) == SOCKET_ERROR) {
306                 closesocket (server_sock);
307                 g_error ("wakeup_pipes_init: listen () failed, error (%d)\n", WSAGetLastError ());
308         }
309         if (connect ((SOCKET) threadpool_io->wakeup_pipes [1], (SOCKADDR*) &server, sizeof (server)) == SOCKET_ERROR) {
310                 closesocket (server_sock);
311                 g_error ("wakeup_pipes_init: connect () failed, error (%d)\n", WSAGetLastError ());
312         }
313
314         size = sizeof (client);
315         threadpool_io->wakeup_pipes [0] = accept (server_sock, (SOCKADDR *) &client, &size);
316         g_assert (threadpool_io->wakeup_pipes [0] != INVALID_SOCKET);
317
318         arg = 1;
319         if (ioctlsocket (threadpool_io->wakeup_pipes [0], FIONBIO, &arg) == SOCKET_ERROR) {
320                 closesocket (threadpool_io->wakeup_pipes [0]);
321                 closesocket (server_sock);
322                 g_error ("wakeup_pipes_init: ioctlsocket () failed, error (%d)\n", WSAGetLastError ());
323         }
324
325         closesocket (server_sock);
326 #endif
327 }
328
329 static void
330 ensure_initialized (void)
331 {
332         if (io_status >= STATUS_INITIALIZED)
333                 return;
334         if (io_status == STATUS_INITIALIZING || InterlockedCompareExchange (&io_status, STATUS_INITIALIZING, STATUS_NOT_INITIALIZED) != STATUS_NOT_INITIALIZED) {
335                 while (io_status == STATUS_INITIALIZING)
336                         mono_thread_info_yield ();
337                 g_assert (io_status >= STATUS_INITIALIZED);
338                 return;
339         }
340
341         g_assert (!threadpool_io);
342         threadpool_io = g_new0 (ThreadPoolIO, 1);
343         g_assert (threadpool_io);
344
345         threadpool_io->states = mono_g_hash_table_new_type (g_direct_hash, g_direct_equal, MONO_HASH_VALUE_GC);
346         MONO_GC_REGISTER_ROOT_FIXED (threadpool_io->states);
347         mono_mutex_init (&threadpool_io->states_lock);
348
349         threadpool_io->updates = NULL;
350         threadpool_io->updates_size = 0;
351         mono_mutex_init (&threadpool_io->updates_lock);
352
353 #if defined(HAVE_EPOLL)
354         threadpool_io->backend = backend_epoll;
355 #elif defined(HAVE_KQUEUE)
356         threadpool_io->backend = backend_kqueue;
357 #else
358         threadpool_io->backend = backend_poll;
359 #endif
360         if (g_getenv ("MONO_DISABLE_AIO") != NULL)
361                 threadpool_io->backend = backend_poll;
362
363         wakeup_pipes_init ();
364
365         if (!threadpool_io->backend.init (threadpool_io->wakeup_pipes [0]))
366                 g_error ("ensure_initialized: backend->init () failed");
367
368         if (!mono_thread_create_internal (mono_get_root_domain (), selector_thread, NULL, TRUE, SMALL_STACK))
369                 g_error ("ensure_initialized: mono_thread_create_internal () failed");
370
371         io_thread_status = STATUS_INITIALIZING;
372         mono_memory_write_barrier ();
373
374         io_status = STATUS_INITIALIZED;
375 }
376
377 static void
378 ensure_cleanedup (void)
379 {
380         if (io_status == STATUS_NOT_INITIALIZED && InterlockedCompareExchange (&io_status, STATUS_CLEANED_UP, STATUS_NOT_INITIALIZED) == STATUS_NOT_INITIALIZED)
381                 return;
382         if (io_status == STATUS_INITIALIZING) {
383                 while (io_status == STATUS_INITIALIZING)
384                         mono_thread_info_yield ();
385         }
386         if (io_status == STATUS_CLEANED_UP)
387                 return;
388         if (io_status == STATUS_CLEANING_UP || InterlockedCompareExchange (&io_status, STATUS_CLEANING_UP, STATUS_INITIALIZED) != STATUS_INITIALIZED) {
389                 while (io_status == STATUS_CLEANING_UP)
390                         mono_thread_info_yield ();
391                 g_assert (io_status == STATUS_CLEANED_UP);
392                 return;
393         }
394
395         /* we make the assumption along the code that we are
396          * cleaning up only if the runtime is shutting down */
397         g_assert (mono_runtime_is_shutting_down ());
398
399         selector_thread_wakeup ();
400         while (io_thread_status != STATUS_CLEANED_UP)
401                 g_usleep (1000);
402
403         MONO_GC_UNREGISTER_ROOT (threadpool_io->states);
404         mono_g_hash_table_destroy (threadpool_io->states);
405         mono_mutex_destroy (&threadpool_io->states_lock);
406
407         g_free (threadpool_io->updates);
408         mono_mutex_destroy (&threadpool_io->updates_lock);
409
410         threadpool_io->backend.cleanup ();
411
412 #if !defined(HOST_WIN32)
413         close (threadpool_io->wakeup_pipes [0]);
414         close (threadpool_io->wakeup_pipes [1]);
415 #else
416         closesocket (threadpool_io->wakeup_pipes [0]);
417         closesocket (threadpool_io->wakeup_pipes [1]);
418 #endif
419
420         g_assert (threadpool_io);
421         g_free (threadpool_io);
422         threadpool_io = NULL;
423         g_assert (!threadpool_io);
424
425         io_status = STATUS_CLEANED_UP;
426 }
427
428 static gboolean
429 is_socket_async_callback (MonoImage *system_image, MonoClass *class)
430 {
431         MonoClass *socket_async_callback_class = NULL;
432
433         socket_async_callback_class = mono_class_from_name (system_image, "System.Net.Sockets", "SocketAsyncCallback");
434         g_assert (socket_async_callback_class);
435
436         return class == socket_async_callback_class;
437 }
438
439 static gboolean
440 is_async_read_handler (MonoImage *system_image, MonoClass *class)
441 {
442         MonoClass *process_class = NULL;
443
444         process_class = mono_class_from_name (system_image, "System.Diagnostics", "Process");
445         g_assert (process_class);
446
447         return class->nested_in && class->nested_in == process_class && strcmp (class->name, "AsyncReadHandler") == 0;
448 }
449
450 gboolean
451 mono_threadpool_ms_is_io (MonoObject *target, MonoObject *state)
452 {
453         MonoImage *system_image;
454         MonoSocketAsyncResult *sockares;
455
456         system_image = mono_image_loaded ("System");
457         if (!system_image)
458                 return FALSE;
459
460         if (!is_socket_async_callback (system_image, target->vtable->klass) && !is_async_read_handler (system_image, target->vtable->klass))
461                 return FALSE;
462
463         sockares = (MonoSocketAsyncResult*) state;
464         if (sockares->operation < AIO_OP_FIRST || sockares->operation >= AIO_OP_LAST)
465                 return FALSE;
466
467         return TRUE;
468 }
469
470 void
471 mono_threadpool_ms_io_cleanup (void)
472 {
473         ensure_cleanedup ();
474 }
475
476 MonoAsyncResult *
477 mono_threadpool_ms_io_add (MonoAsyncResult *ares, MonoSocketAsyncResult *sockares)
478 {
479         ThreadPoolIOUpdate *update;
480         MonoMList *list;
481         gboolean is_new;
482         gint events;
483         gint fd;
484
485         g_assert (ares);
486         g_assert (sockares);
487
488         if (mono_runtime_is_shutting_down ())
489                 return NULL;
490
491         ensure_initialized ();
492
493         MONO_OBJECT_SETREF (sockares, ares, ares);
494
495         fd = GPOINTER_TO_INT (sockares->handle);
496
497         mono_mutex_lock (&threadpool_io->states_lock);
498         g_assert (threadpool_io->states);
499
500         list = mono_g_hash_table_lookup (threadpool_io->states, GINT_TO_POINTER (fd));
501         is_new = list == NULL;
502         list = mono_mlist_append (list, (MonoObject*) sockares);
503         mono_g_hash_table_replace (threadpool_io->states, sockares->handle, list);
504
505         events = get_events (list);
506
507         mono_mutex_lock (&threadpool_io->updates_lock);
508         threadpool_io->updates_size += 1;
509         threadpool_io->updates = g_renew (ThreadPoolIOUpdate, threadpool_io->updates, threadpool_io->updates_size);
510
511         update = &threadpool_io->updates [threadpool_io->updates_size - 1];
512         update->fd = fd;
513         update->events = events;
514         update->is_new = is_new;
515         mono_mutex_unlock (&threadpool_io->updates_lock);
516
517         mono_mutex_unlock (&threadpool_io->states_lock);
518
519         selector_thread_wakeup ();
520
521         return ares;
522 }
523
524 void
525 mono_threadpool_ms_io_remove_socket (int fd)
526 {
527         MonoMList *list;
528
529         if (io_status != STATUS_INITIALIZED)
530                 return;
531
532         mono_mutex_lock (&threadpool_io->states_lock);
533         g_assert (threadpool_io->states);
534         list = mono_g_hash_table_lookup (threadpool_io->states, GINT_TO_POINTER (fd));
535         if (list)
536                 mono_g_hash_table_remove (threadpool_io->states, GINT_TO_POINTER (fd));
537         mono_mutex_unlock (&threadpool_io->states_lock);
538
539         while (list) {
540                 MonoSocketAsyncResult *sockares, *sockares2;
541
542                 sockares = (MonoSocketAsyncResult*) mono_mlist_get_data (list);
543                 if (sockares->operation == AIO_OP_RECEIVE)
544                         sockares->operation = AIO_OP_RECV_JUST_CALLBACK;
545                 else if (sockares->operation == AIO_OP_SEND)
546                         sockares->operation = AIO_OP_SEND_JUST_CALLBACK;
547
548                 sockares2 = get_sockares_for_event (&list, MONO_POLLIN);
549                 if (sockares2)
550                         mono_threadpool_ms_enqueue_work_item (((MonoObject*) sockares2)->vtable->domain, (MonoObject*) sockares2);
551
552                 if (!list)
553                         break;
554
555                 sockares2 = get_sockares_for_event (&list, MONO_POLLOUT);
556                 if (sockares2)
557                         mono_threadpool_ms_enqueue_work_item (((MonoObject*) sockares2)->vtable->domain, (MonoObject*) sockares2);
558         }
559 }
560
561 static gboolean
562 remove_sockstate_for_domain (gpointer key, gpointer value, gpointer user_data)
563 {
564         MonoMList *list;
565         gboolean remove = FALSE;
566
567         for (list = value; list; list = mono_mlist_next (list)) {
568                 MonoObject *data = mono_mlist_get_data (list);
569                 if (mono_object_domain (data) == user_data) {
570                         remove = TRUE;
571                         mono_mlist_set_data (list, NULL);
572                 }
573         }
574
575         //FIXME is there some sort of additional unregistration we need to perform here?
576         return remove;
577 }
578
579 void
580 mono_threadpool_ms_io_remove_domain_jobs (MonoDomain *domain)
581 {
582         if (io_status == STATUS_INITIALIZED) {
583                 mono_mutex_lock (&threadpool_io->states_lock);
584                 mono_g_hash_table_foreach_remove (threadpool_io->states, remove_sockstate_for_domain, domain);
585                 mono_mutex_unlock (&threadpool_io->states_lock);
586         }
587 }
588
589 #else
590
591 gboolean
592 mono_threadpool_ms_is_io (MonoObject *target, MonoObject *state)
593 {
594         return FALSE;
595 }
596
597 void
598 mono_threadpool_ms_io_cleanup (void)
599 {
600         g_assert_not_reached ();
601 }
602
603 MonoAsyncResult *
604 mono_threadpool_ms_io_add (MonoAsyncResult *ares, MonoSocketAsyncResult *sockares)
605 {
606         g_assert_not_reached ();
607 }
608
609 void
610 mono_threadpool_ms_io_remove_socket (int fd)
611 {
612         g_assert_not_reached ();
613 }
614
615 void
616 mono_threadpool_ms_io_remove_domain_jobs (MonoDomain *domain)
617 {
618         g_assert_not_reached ();
619 }
620
621 #endif