Merge pull request #2003 from esdrubal/seq_test_fix2
[mono.git] / mono / metadata / threadpool-ms-io.c
1 /*
2  * threadpool-ms-io.c: Microsoft IO threadpool runtime support
3  *
4  * Author:
5  *      Ludovic Henry (ludovic.henry@xamarin.com)
6  *
7  * Copyright 2015 Xamarin, Inc (http://www.xamarin.com)
8  */
9
10 #include <config.h>
11
12 #ifndef DISABLE_SOCKETS
13
14 #include <glib.h>
15
16 #if defined(HOST_WIN32)
17 #include <windows.h>
18 #else
19 #include <errno.h>
20 #include <fcntl.h>
21 #endif
22
23 #include <mono/metadata/gc-internal.h>
24 #include <mono/metadata/mono-mlist.h>
25 #include <mono/metadata/threadpool-ms.h>
26 #include <mono/metadata/threadpool-ms-io.h>
27 #include <mono/utils/atomic.h>
28 #include <mono/utils/mono-threads.h>
29 #include <mono/utils/mono-lazy-init.h>
30 #include <mono/utils/mono-logger-internal.h>
31
32 typedef struct {
33         gboolean (*init) (gint wakeup_pipe_fd);
34         void     (*cleanup) (void);
35         void     (*register_fd) (gint fd, gint events, gboolean is_new);
36         void     (*remove_fd) (gint fd);
37         gint     (*event_wait) (void (*callback) (gint fd, gint events, gpointer user_data), gpointer user_data);
38 } ThreadPoolIOBackend;
39
40 enum {
41         EVENT_IN   = 1 << 0,
42         EVENT_OUT  = 1 << 1,
43 } ThreadPoolIOEvent;
44
45 #include "threadpool-ms-io-epoll.c"
46 #include "threadpool-ms-io-kqueue.c"
47 #include "threadpool-ms-io-poll.c"
48
49 #define UPDATES_CAPACITY 128
50
51 /* Keep in sync with System.Net.Sockets.Socket.SocketOperation */
52 enum {
53         AIO_OP_FIRST,
54         AIO_OP_ACCEPT = 0,
55         AIO_OP_CONNECT,
56         AIO_OP_RECEIVE,
57         AIO_OP_RECEIVEFROM,
58         AIO_OP_SEND,
59         AIO_OP_SENDTO,
60         AIO_OP_RECV_JUST_CALLBACK,
61         AIO_OP_SEND_JUST_CALLBACK,
62         AIO_OP_READPIPE,
63         AIO_OP_CONSOLE2,
64         AIO_OP_DISCONNECT,
65         AIO_OP_ACCEPTRECEIVE,
66         AIO_OP_RECEIVE_BUFFERS,
67         AIO_OP_SEND_BUFFERS,
68         AIO_OP_LAST
69 };
70
71 typedef enum {
72         UPDATE_EMPTY = 0,
73         UPDATE_ADD,
74         UPDATE_REMOVE_SOCKET,
75         UPDATE_REMOVE_DOMAIN,
76 } ThreadPoolIOUpdateType;
77
78 typedef struct {
79         gint fd;
80         MonoSocketAsyncResult *sockares;
81 } ThreadPoolIOUpdate_Add;
82
83 typedef struct {
84         gint fd;
85 } ThreadPoolIOUpdate_RemoveSocket;
86
87 typedef struct {
88         MonoDomain *domain;
89 } ThreadPoolIOUpdate_RemoveDomain;
90
91 typedef struct {
92         ThreadPoolIOUpdateType type;
93         union {
94                 ThreadPoolIOUpdate_Add add;
95                 ThreadPoolIOUpdate_RemoveSocket remove_socket;
96                 ThreadPoolIOUpdate_RemoveDomain remove_domain;
97         } data;
98 } ThreadPoolIOUpdate;
99
100 typedef struct {
101         ThreadPoolIOBackend backend;
102
103         ThreadPoolIOUpdate updates [UPDATES_CAPACITY];
104         gint updates_size;
105         mono_mutex_t updates_lock;
106         mono_cond_t updates_cond;
107
108 #if !defined(HOST_WIN32)
109         gint wakeup_pipes [2];
110 #else
111         SOCKET wakeup_pipes [2];
112 #endif
113 } ThreadPoolIO;
114
115 static mono_lazy_init_t io_status = MONO_LAZY_INIT_STATUS_NOT_INITIALIZED;
116
117 static gboolean io_selector_running = FALSE;
118
119 static ThreadPoolIO* threadpool_io;
120
121 static int
122 get_events_from_sockares (MonoSocketAsyncResult *ares)
123 {
124         switch (ares->operation) {
125         case AIO_OP_ACCEPT:
126         case AIO_OP_RECEIVE:
127         case AIO_OP_RECV_JUST_CALLBACK:
128         case AIO_OP_RECEIVEFROM:
129         case AIO_OP_READPIPE:
130         case AIO_OP_ACCEPTRECEIVE:
131         case AIO_OP_RECEIVE_BUFFERS:
132                 return EVENT_IN;
133         case AIO_OP_SEND:
134         case AIO_OP_SEND_JUST_CALLBACK:
135         case AIO_OP_SENDTO:
136         case AIO_OP_CONNECT:
137         case AIO_OP_SEND_BUFFERS:
138         case AIO_OP_DISCONNECT:
139                 return EVENT_OUT;
140         default:
141                 g_assert_not_reached ();
142         }
143 }
144
145 static MonoSocketAsyncResult*
146 get_sockares_for_event (MonoMList **list, gint event)
147 {
148         MonoMList *current;
149
150         g_assert (list);
151
152         for (current = *list; current; current = mono_mlist_next (current)) {
153                 MonoSocketAsyncResult *ares = (MonoSocketAsyncResult*) mono_mlist_get_data (current);
154                 if (get_events_from_sockares (ares) == event) {
155                         *list = mono_mlist_remove_item (*list, current);
156                         return ares;
157                 }
158         }
159
160         return NULL;
161 }
162
163 static gint
164 get_events (MonoMList *list)
165 {
166         MonoMList *current;
167         gint events = 0;
168
169         for (current = list; current; current = mono_mlist_next (current))
170                 events |= get_events_from_sockares ((MonoSocketAsyncResult*) mono_mlist_get_data (current));
171
172         return events;
173 }
174
175 static void
176 selector_thread_wakeup (void)
177 {
178         gchar msg = 'c';
179         gint written;
180
181         for (;;) {
182 #if !defined(HOST_WIN32)
183                 written = write (threadpool_io->wakeup_pipes [1], &msg, 1);
184                 if (written == 1)
185                         break;
186                 if (written == -1) {
187                         g_warning ("selector_thread_wakeup: write () failed, error (%d) %s\n", errno, g_strerror (errno));
188                         break;
189                 }
190 #else
191                 written = send (threadpool_io->wakeup_pipes [1], &msg, 1, 0);
192                 if (written == 1)
193                         break;
194                 if (written == SOCKET_ERROR) {
195                         g_warning ("selector_thread_wakeup: write () failed, error (%d)\n", WSAGetLastError ());
196                         break;
197                 }
198 #endif
199         }
200 }
201
202 static void
203 selector_thread_wakeup_drain_pipes (void)
204 {
205         gchar buffer [128];
206         gint received;
207
208         for (;;) {
209 #if !defined(HOST_WIN32)
210                 received = read (threadpool_io->wakeup_pipes [0], buffer, sizeof (buffer));
211                 if (received == 0)
212                         break;
213                 if (received == -1) {
214                         if (errno != EINTR && errno != EAGAIN)
215                                 g_warning ("selector_thread_wakeup_drain_pipes: read () failed, error (%d) %s\n", errno, g_strerror (errno));
216                         break;
217                 }
218 #else
219                 received = recv (threadpool_io->wakeup_pipes [0], buffer, sizeof (buffer), 0);
220                 if (received == 0)
221                         break;
222                 if (received == SOCKET_ERROR) {
223                         if (WSAGetLastError () != WSAEINTR && WSAGetLastError () != WSAEWOULDBLOCK)
224                                 g_warning ("selector_thread_wakeup_drain_pipes: recv () failed, error (%d) %s\n", WSAGetLastError ());
225                         break;
226                 }
227 #endif
228         }
229 }
230
231 typedef struct {
232         MonoDomain *domain;
233         MonoGHashTable *states;
234 } FilterSockaresForDomainData;
235
236 static void
237 filter_sockares_for_domain (gpointer key, gpointer value, gpointer user_data)
238 {
239         FilterSockaresForDomainData *data;
240         MonoMList *list = value, *element;
241         MonoDomain *domain;
242         MonoGHashTable *states;
243
244         g_assert (user_data);
245         data = user_data;
246         domain = data->domain;
247         states = data->states;
248
249         for (element = list; element; element = mono_mlist_next (element)) {
250                 MonoSocketAsyncResult *sockares = (MonoSocketAsyncResult*) mono_mlist_get_data (element);
251                 if (mono_object_domain (sockares) == domain)
252                         mono_mlist_set_data (element, NULL);
253         }
254
255         /* we skip all the first elements which are NULL */
256         for (; list; list = mono_mlist_next (list)) {
257                 if (mono_mlist_get_data (list))
258                         break;
259         }
260
261         if (list) {
262                 g_assert (mono_mlist_get_data (list));
263
264                 /* we delete all the NULL elements after the first one */
265                 for (element = list; element;) {
266                         MonoMList *next;
267                         if (!(next = mono_mlist_next (element)))
268                                 break;
269                         if (mono_mlist_get_data (next))
270                                 element = next;
271                         else
272                                 mono_mlist_set_next (element, mono_mlist_next (next));
273                 }
274         }
275
276         mono_g_hash_table_replace (states, key, list);
277 }
278
279 static void
280 wait_callback (gint fd, gint events, gpointer user_data)
281 {
282         if (mono_runtime_is_shutting_down ())
283                 return;
284
285         if (fd == threadpool_io->wakeup_pipes [0]) {
286                 mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: wke");
287                 selector_thread_wakeup_drain_pipes ();
288         } else {
289                 MonoGHashTable *states;
290                 MonoMList *list = NULL;
291                 gpointer k;
292
293                 g_assert (user_data);
294                 states = user_data;
295
296                 mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: cal fd %3d, events = %2s | %2s",
297                         fd, (events & EVENT_IN) ? "RD" : "..", (events & EVENT_OUT) ? "WR" : "..");
298
299                 if (!mono_g_hash_table_lookup_extended (states, GINT_TO_POINTER (fd), &k, (gpointer*) &list))
300                         g_error ("wait_callback: fd %d not found in states table", fd);
301
302                 if (list && (events & EVENT_IN) != 0) {
303                         MonoSocketAsyncResult *sockares = get_sockares_for_event (&list, EVENT_IN);
304                         if (sockares)
305                                 mono_threadpool_ms_enqueue_work_item (((MonoObject*) sockares)->vtable->domain, (MonoObject*) sockares);
306                 }
307                 if (list && (events & EVENT_OUT) != 0) {
308                         MonoSocketAsyncResult *sockares = get_sockares_for_event (&list, EVENT_OUT);
309                         if (sockares)
310                                 mono_threadpool_ms_enqueue_work_item (((MonoObject*) sockares)->vtable->domain, (MonoObject*) sockares);
311                 }
312
313                 mono_g_hash_table_replace (states, GINT_TO_POINTER (fd), list);
314
315                 events = get_events (list);
316
317                 mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: res fd %3d, events = %2s | %2s",
318                         fd, (events & EVENT_IN) ? "RD" : "..", (events & EVENT_OUT) ? "WR" : "..");
319
320                 threadpool_io->backend.register_fd (fd, events, FALSE);
321         }
322 }
323
324 static void
325 selector_thread (gpointer data)
326 {
327         MonoGHashTable *states;
328
329         io_selector_running = TRUE;
330
331         if (mono_runtime_is_shutting_down ()) {
332                 io_selector_running = FALSE;
333                 return;
334         }
335
336         states = mono_g_hash_table_new_type (g_direct_hash, g_direct_equal, MONO_HASH_VALUE_GC);
337
338         for (;;) {
339                 gint i, j;
340                 gint res;
341
342                 mono_mutex_lock (&threadpool_io->updates_lock);
343
344                 for (i = 0; i < threadpool_io->updates_size; ++i) {
345                         ThreadPoolIOUpdate *update = &threadpool_io->updates [i];
346
347                         switch (update->type) {
348                         case UPDATE_EMPTY:
349                                 break;
350                         case UPDATE_ADD: {
351                                 gint fd;
352                                 gint events;
353                                 gpointer k;
354                                 gboolean exists;
355                                 MonoMList *list = NULL;
356                                 MonoSocketAsyncResult *sockares;
357
358                                 fd = update->data.add.fd;
359                                 g_assert (fd >= 0);
360
361                                 sockares = update->data.add.sockares;
362                                 g_assert (sockares);
363
364                                 exists = mono_g_hash_table_lookup_extended (states, GINT_TO_POINTER (fd), &k, (gpointer*) &list);
365                                 list = mono_mlist_append (list, (MonoObject*) sockares);
366                                 mono_g_hash_table_replace (states, sockares->handle, list);
367
368                                 events = get_events (list);
369
370                                 mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: %3s fd %3d, events = %2s | %2s",
371                                         exists ? "mod" : "add", fd, (events & EVENT_IN) ? "RD" : "..", (events & EVENT_OUT) ? "WR" : "..");
372
373                                 threadpool_io->backend.register_fd (fd, events, !exists);
374
375                                 break;
376                         }
377                         case UPDATE_REMOVE_SOCKET: {
378                                 gint fd;
379                                 gpointer k;
380                                 MonoMList *list = NULL;
381
382                                 fd = update->data.remove_socket.fd;
383                                 g_assert (fd >= 0);
384
385                                 if (mono_g_hash_table_lookup_extended (states, GINT_TO_POINTER (fd), &k, (gpointer*) &list)) {
386                                         mono_g_hash_table_remove (states, GINT_TO_POINTER (fd));
387
388                                         for (j = i + 1; j < threadpool_io->updates_size; ++j) {
389                                                 ThreadPoolIOUpdate *update = &threadpool_io->updates [j];
390                                                 if (update->type == UPDATE_ADD && update->data.add.fd == fd)
391                                                         memset (update, 0, sizeof (ThreadPoolIOUpdate));
392                                         }
393
394                                         for (; list; list = mono_mlist_remove_item (list, list))
395                                                 mono_threadpool_ms_enqueue_work_item (mono_object_domain (mono_mlist_get_data (list)), mono_mlist_get_data (list));
396
397                                         mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: del fd %3d", fd);
398                                         threadpool_io->backend.remove_fd (fd);
399                                 }
400
401                                 break;
402                         }
403                         case UPDATE_REMOVE_DOMAIN: {
404                                 MonoDomain *domain;
405
406                                 domain = update->data.remove_domain.domain;
407                                 g_assert (domain);
408
409                                 FilterSockaresForDomainData user_data = { .domain = domain, .states = states };
410                                 mono_g_hash_table_foreach (states, filter_sockares_for_domain, &user_data);
411
412                                 for (j = i + 1; j < threadpool_io->updates_size; ++j) {
413                                         ThreadPoolIOUpdate *update = &threadpool_io->updates [j];
414                                         if (update->type == UPDATE_ADD && mono_object_domain (update->data.add.sockares) == domain)
415                                                 memset (update, 0, sizeof (ThreadPoolIOUpdate));
416                                 }
417
418                                 break;
419                         }
420                         default:
421                                 g_assert_not_reached ();
422                         }
423                 }
424
425                 mono_cond_broadcast (&threadpool_io->updates_cond);
426
427                 if (threadpool_io->updates_size > 0) {
428                         threadpool_io->updates_size = 0;
429                         memset (&threadpool_io->updates, 0, UPDATES_CAPACITY * sizeof (ThreadPoolIOUpdate));
430                 }
431
432                 mono_mutex_unlock (&threadpool_io->updates_lock);
433
434                 mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: wai");
435
436                 res = threadpool_io->backend.event_wait (wait_callback, states);
437
438                 if (res == -1 || mono_runtime_is_shutting_down ())
439                         break;
440         }
441
442         mono_g_hash_table_destroy (states);
443
444         io_selector_running = FALSE;
445 }
446
447 /* Locking: threadpool_io->updates_lock must be held */
448 static ThreadPoolIOUpdate*
449 update_get_new (void)
450 {
451         ThreadPoolIOUpdate *update = NULL;
452         g_assert (threadpool_io->updates_size <= UPDATES_CAPACITY);
453
454         while (threadpool_io->updates_size == UPDATES_CAPACITY) {
455                 /* we wait for updates to be applied in the selector_thread and we loop
456                  * as long as none are available. if it happends too much, then we need
457                  * to increase UPDATES_CAPACITY */
458                 mono_cond_wait (&threadpool_io->updates_cond, &threadpool_io->updates_lock);
459         }
460
461         g_assert (threadpool_io->updates_size < UPDATES_CAPACITY);
462
463         update = &threadpool_io->updates [threadpool_io->updates_size ++];
464
465         return update;
466 }
467
468 static void
469 wakeup_pipes_init (void)
470 {
471 #if !defined(HOST_WIN32)
472         if (pipe (threadpool_io->wakeup_pipes) == -1)
473                 g_error ("wakeup_pipes_init: pipe () failed, error (%d) %s\n", errno, g_strerror (errno));
474         if (fcntl (threadpool_io->wakeup_pipes [0], F_SETFL, O_NONBLOCK) == -1)
475                 g_error ("wakeup_pipes_init: fcntl () failed, error (%d) %s\n", errno, g_strerror (errno));
476 #else
477         struct sockaddr_in client;
478         struct sockaddr_in server;
479         SOCKET server_sock;
480         gulong arg;
481         gint size;
482
483         server_sock = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
484         g_assert (server_sock != INVALID_SOCKET);
485         threadpool_io->wakeup_pipes [1] = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
486         g_assert (threadpool_io->wakeup_pipes [1] != INVALID_SOCKET);
487
488         server.sin_family = AF_INET;
489         server.sin_addr.s_addr = inet_addr ("127.0.0.1");
490         server.sin_port = 0;
491         if (bind (server_sock, (SOCKADDR*) &server, sizeof (server)) == SOCKET_ERROR) {
492                 closesocket (server_sock);
493                 g_error ("wakeup_pipes_init: bind () failed, error (%d)\n", WSAGetLastError ());
494         }
495
496         size = sizeof (server);
497         if (getsockname (server_sock, (SOCKADDR*) &server, &size) == SOCKET_ERROR) {
498                 closesocket (server_sock);
499                 g_error ("wakeup_pipes_init: getsockname () failed, error (%d)\n", WSAGetLastError ());
500         }
501         if (listen (server_sock, 1024) == SOCKET_ERROR) {
502                 closesocket (server_sock);
503                 g_error ("wakeup_pipes_init: listen () failed, error (%d)\n", WSAGetLastError ());
504         }
505         if (connect ((SOCKET) threadpool_io->wakeup_pipes [1], (SOCKADDR*) &server, sizeof (server)) == SOCKET_ERROR) {
506                 closesocket (server_sock);
507                 g_error ("wakeup_pipes_init: connect () failed, error (%d)\n", WSAGetLastError ());
508         }
509
510         size = sizeof (client);
511         threadpool_io->wakeup_pipes [0] = accept (server_sock, (SOCKADDR *) &client, &size);
512         g_assert (threadpool_io->wakeup_pipes [0] != INVALID_SOCKET);
513
514         arg = 1;
515         if (ioctlsocket (threadpool_io->wakeup_pipes [0], FIONBIO, &arg) == SOCKET_ERROR) {
516                 closesocket (threadpool_io->wakeup_pipes [0]);
517                 closesocket (server_sock);
518                 g_error ("wakeup_pipes_init: ioctlsocket () failed, error (%d)\n", WSAGetLastError ());
519         }
520
521         closesocket (server_sock);
522 #endif
523 }
524
525 static void
526 initialize (void)
527 {
528         g_assert (!threadpool_io);
529         threadpool_io = g_new0 (ThreadPoolIO, 1);
530         g_assert (threadpool_io);
531
532         mono_mutex_init_recursive (&threadpool_io->updates_lock);
533         mono_cond_init (&threadpool_io->updates_cond, NULL);
534         mono_gc_register_root ((void*)&threadpool_io->updates [0], sizeof (threadpool_io->updates), MONO_GC_DESCRIPTOR_NULL);
535
536         threadpool_io->updates_size = 0;
537
538         threadpool_io->backend = backend_poll;
539         if (g_getenv ("MONO_ENABLE_AIO") != NULL) {
540 #if defined(HAVE_EPOLL)
541                 threadpool_io->backend = backend_epoll;
542 #elif defined(HAVE_KQUEUE)
543                 threadpool_io->backend = backend_kqueue;
544 #endif
545         }
546
547         wakeup_pipes_init ();
548
549         if (!threadpool_io->backend.init (threadpool_io->wakeup_pipes [0]))
550                 g_error ("initialize: backend->init () failed");
551
552         if (!mono_thread_create_internal (mono_get_root_domain (), selector_thread, NULL, TRUE, SMALL_STACK))
553                 g_error ("initialize: mono_thread_create_internal () failed");
554 }
555
556 static void
557 cleanup (void)
558 {
559         /* we make the assumption along the code that we are
560          * cleaning up only if the runtime is shutting down */
561         g_assert (mono_runtime_is_shutting_down ());
562
563         selector_thread_wakeup ();
564         while (io_selector_running)
565                 g_usleep (1000);
566
567         mono_mutex_destroy (&threadpool_io->updates_lock);
568         mono_cond_destroy (&threadpool_io->updates_cond);
569
570         threadpool_io->backend.cleanup ();
571
572 #if !defined(HOST_WIN32)
573         close (threadpool_io->wakeup_pipes [0]);
574         close (threadpool_io->wakeup_pipes [1]);
575 #else
576         closesocket (threadpool_io->wakeup_pipes [0]);
577         closesocket (threadpool_io->wakeup_pipes [1]);
578 #endif
579
580         g_assert (threadpool_io);
581         g_free (threadpool_io);
582         threadpool_io = NULL;
583         g_assert (!threadpool_io);
584 }
585
586 static gboolean
587 is_socket_async_callback (MonoImage *system_image, MonoClass *class)
588 {
589         MonoClass *socket_async_callback_class = NULL;
590
591         socket_async_callback_class = mono_class_from_name (system_image, "System.Net.Sockets", "SocketAsyncCallback");
592
593         return class == socket_async_callback_class;
594 }
595
596 static gboolean
597 is_async_read_handler (MonoImage *system_image, MonoClass *class)
598 {
599         MonoClass *async_read_handler_class = NULL;
600
601         async_read_handler_class = mono_class_from_name (system_image, "System.Diagnostics", "Process/AsyncReadHandler");
602
603         return class == async_read_handler_class;
604 }
605
606 gboolean
607 mono_threadpool_ms_is_io (MonoObject *target, MonoObject *state)
608 {
609         MonoImage *system_image;
610         MonoSocketAsyncResult *sockares;
611
612         system_image = mono_image_loaded ("System");
613         if (!system_image)
614                 return FALSE;
615
616         if (!is_socket_async_callback (system_image, target->vtable->klass) && !is_async_read_handler (system_image, target->vtable->klass))
617                 return FALSE;
618
619         sockares = (MonoSocketAsyncResult*) state;
620         if (sockares->operation < AIO_OP_FIRST || sockares->operation >= AIO_OP_LAST)
621                 return FALSE;
622
623         return TRUE;
624 }
625
626 void
627 mono_threadpool_ms_io_cleanup (void)
628 {
629         mono_lazy_cleanup (&io_status, cleanup);
630 }
631
632 MonoAsyncResult *
633 mono_threadpool_ms_io_add (MonoAsyncResult *ares, MonoSocketAsyncResult *sockares)
634 {
635         ThreadPoolIOUpdate *update;
636
637         g_assert (ares);
638         g_assert (sockares);
639
640         if (mono_runtime_is_shutting_down ())
641                 return NULL;
642         if (mono_domain_is_unloading (mono_object_domain (sockares)))
643                 return NULL;
644
645         mono_lazy_initialize (&io_status, initialize);
646
647         MONO_OBJECT_SETREF (sockares, ares, ares);
648
649         mono_mutex_lock (&threadpool_io->updates_lock);
650
651         update = update_get_new ();
652         update->type = UPDATE_ADD;
653         update->data.add.fd = GPOINTER_TO_INT (sockares->handle);
654         update->data.add.sockares = sockares;
655         mono_memory_barrier (); /* Ensure this is safely published before we wake up the selector */
656
657         selector_thread_wakeup ();
658
659         mono_mutex_unlock (&threadpool_io->updates_lock);
660
661         return ares;
662 }
663
664 void
665 mono_threadpool_ms_io_remove_socket (int fd)
666 {
667         ThreadPoolIOUpdate *update;
668
669         if (!mono_lazy_is_initialized (&io_status))
670                 return;
671
672         mono_mutex_lock (&threadpool_io->updates_lock);
673
674         update = update_get_new ();
675         update->type = UPDATE_REMOVE_SOCKET;
676         update->data.add.fd = fd;
677         mono_memory_barrier (); /* Ensure this is safely published before we wake up the selector */
678
679         selector_thread_wakeup ();
680
681         mono_cond_wait (&threadpool_io->updates_cond, &threadpool_io->updates_lock);
682
683         mono_mutex_unlock (&threadpool_io->updates_lock);
684 }
685
686 void
687 mono_threadpool_ms_io_remove_domain_jobs (MonoDomain *domain)
688 {
689         ThreadPoolIOUpdate *update;
690
691         if (!mono_lazy_is_initialized (&io_status))
692                 return;
693
694         mono_mutex_lock (&threadpool_io->updates_lock);
695
696         update = update_get_new ();
697         update->type = UPDATE_REMOVE_DOMAIN;
698         update->data.remove_domain.domain = domain;
699         mono_memory_barrier (); /* Ensure this is safely published before we wake up the selector */
700
701         selector_thread_wakeup ();
702
703         mono_cond_wait (&threadpool_io->updates_cond, &threadpool_io->updates_lock);
704
705         mono_mutex_unlock (&threadpool_io->updates_lock);
706 }
707
708 void
709 icall_append_io_job (MonoObject *target, MonoSocketAsyncResult *state)
710 {
711         MonoAsyncResult *ares;
712
713         /* Don't call mono_async_result_new() to avoid capturing the context */
714         ares = (MonoAsyncResult *) mono_object_new (mono_domain_get (), mono_defaults.asyncresult_class);
715         MONO_OBJECT_SETREF (ares, async_delegate, target);
716         MONO_OBJECT_SETREF (ares, async_state, state);
717
718         mono_threadpool_ms_io_add (ares, state);
719         return;
720 }
721
722 #else
723
724 gboolean
725 mono_threadpool_ms_is_io (MonoObject *target, MonoObject *state)
726 {
727         return FALSE;
728 }
729
730 void
731 mono_threadpool_ms_io_cleanup (void)
732 {
733         g_assert_not_reached ();
734 }
735
736 MonoAsyncResult *
737 mono_threadpool_ms_io_add (MonoAsyncResult *ares, MonoSocketAsyncResult *sockares)
738 {
739         g_assert_not_reached ();
740 }
741
742 void
743 mono_threadpool_ms_io_remove_socket (int fd)
744 {
745         g_assert_not_reached ();
746 }
747
748 void
749 mono_threadpool_ms_io_remove_domain_jobs (MonoDomain *domain)
750 {
751         g_assert_not_reached ();
752 }
753
754 void
755 icall_append_io_job (MonoObject *target, MonoSocketAsyncResult *state)
756 {
757         g_assert_not_reached ();
758 }
759
760 #endif