[io-layer] Extract socket (#4241)
[mono.git] / mono / metadata / threadpool-io.c
1 /*
2  * threadpool-io.c: Microsoft IO threadpool runtime support
3  *
4  * Author:
5  *      Ludovic Henry (ludovic.henry@xamarin.com)
6  *
7  * Copyright 2015 Xamarin, Inc (http://www.xamarin.com)
8  * Licensed under the MIT license. See LICENSE file in the project root for full license information.
9  */
10
11 #include <config.h>
12
13 #ifndef DISABLE_SOCKETS
14
15 #include <glib.h>
16
17 #if defined(HOST_WIN32)
18 #include <windows.h>
19 #else
20 #include <errno.h>
21 #include <fcntl.h>
22 #endif
23
24 #include <mono/metadata/gc-internals.h>
25 #include <mono/metadata/mono-mlist.h>
26 #include <mono/metadata/threadpool.h>
27 #include <mono/metadata/threadpool-io.h>
28 #include <mono/utils/atomic.h>
29 #include <mono/utils/mono-threads.h>
30 #include <mono/utils/mono-lazy-init.h>
31 #include <mono/utils/mono-logger-internals.h>
32 #include <mono/io-layer/io-layer.h>
33
34 typedef struct {
35         gboolean (*init) (gint wakeup_pipe_fd);
36         void     (*register_fd) (gint fd, gint events, gboolean is_new);
37         void     (*remove_fd) (gint fd);
38         gint     (*event_wait) (void (*callback) (gint fd, gint events, gpointer user_data), gpointer user_data);
39 } ThreadPoolIOBackend;
40
41 /* Keep in sync with System.IOOperation in mcs/class/System/System/IOSelector.cs */
42 enum MonoIOOperation {
43         EVENT_IN   = 1 << 0,
44         EVENT_OUT  = 1 << 1,
45         EVENT_ERR  = 1 << 2, /* not in managed */
46 };
47
48 #include "threadpool-io-epoll.c"
49 #include "threadpool-io-kqueue.c"
50 #include "threadpool-io-poll.c"
51
52 #define UPDATES_CAPACITY 128
53
54 /* Keep in sync with System.IOSelectorJob in mcs/class/System/System/IOSelector.cs */
55 struct _MonoIOSelectorJob {
56         MonoObject object;
57         gint32 operation;
58         MonoObject *callback;
59         MonoObject *state;
60 };
61
62 typedef enum {
63         UPDATE_EMPTY = 0,
64         UPDATE_ADD,
65         UPDATE_REMOVE_SOCKET,
66         UPDATE_REMOVE_DOMAIN,
67 } ThreadPoolIOUpdateType;
68
69 typedef struct {
70         gint fd;
71         MonoIOSelectorJob *job;
72 } ThreadPoolIOUpdate_Add;
73
74 typedef struct {
75         gint fd;
76 } ThreadPoolIOUpdate_RemoveSocket;
77
78 typedef struct {
79         MonoDomain *domain;
80 } ThreadPoolIOUpdate_RemoveDomain;
81
82 typedef struct {
83         ThreadPoolIOUpdateType type;
84         union {
85                 ThreadPoolIOUpdate_Add add;
86                 ThreadPoolIOUpdate_RemoveSocket remove_socket;
87                 ThreadPoolIOUpdate_RemoveDomain remove_domain;
88         } data;
89 } ThreadPoolIOUpdate;
90
91 typedef struct {
92         ThreadPoolIOBackend backend;
93
94         ThreadPoolIOUpdate updates [UPDATES_CAPACITY];
95         gint updates_size;
96         MonoCoopMutex updates_lock;
97         MonoCoopCond updates_cond;
98
99 #if !defined(HOST_WIN32)
100         gint wakeup_pipes [2];
101 #else
102         SOCKET wakeup_pipes [2];
103 #endif
104 } ThreadPoolIO;
105
106 static mono_lazy_init_t io_status = MONO_LAZY_INIT_STATUS_NOT_INITIALIZED;
107
108 static gboolean io_selector_running = FALSE;
109
110 static ThreadPoolIO* threadpool_io;
111
112 static MonoIOSelectorJob*
113 get_job_for_event (MonoMList **list, gint32 event)
114 {
115         MonoMList *current;
116
117         g_assert (list);
118
119         for (current = *list; current; current = mono_mlist_next (current)) {
120                 MonoIOSelectorJob *job = (MonoIOSelectorJob*) mono_mlist_get_data (current);
121                 if (job->operation == event) {
122                         *list = mono_mlist_remove_item (*list, current);
123                         return job;
124                 }
125         }
126
127         return NULL;
128 }
129
130 static gint
131 get_operations_for_jobs (MonoMList *list)
132 {
133         MonoMList *current;
134         gint operations = 0;
135
136         for (current = list; current; current = mono_mlist_next (current))
137                 operations |= ((MonoIOSelectorJob*) mono_mlist_get_data (current))->operation;
138
139         return operations;
140 }
141
142 static void
143 selector_thread_wakeup (void)
144 {
145         gchar msg = 'c';
146         gint written;
147
148         for (;;) {
149 #if !defined(HOST_WIN32)
150                 written = write (threadpool_io->wakeup_pipes [1], &msg, 1);
151                 if (written == 1)
152                         break;
153                 if (written == -1) {
154                         g_warning ("selector_thread_wakeup: write () failed, error (%d) %s\n", errno, g_strerror (errno));
155                         break;
156                 }
157 #else
158                 written = send (threadpool_io->wakeup_pipes [1], &msg, 1, 0);
159                 if (written == 1)
160                         break;
161                 if (written == SOCKET_ERROR) {
162                         g_warning ("selector_thread_wakeup: write () failed, error (%d)\n", WSAGetLastError ());
163                         break;
164                 }
165 #endif
166         }
167 }
168
169 static void
170 selector_thread_wakeup_drain_pipes (void)
171 {
172         gchar buffer [128];
173         gint received;
174
175         for (;;) {
176 #if !defined(HOST_WIN32)
177                 received = read (threadpool_io->wakeup_pipes [0], buffer, sizeof (buffer));
178                 if (received == 0)
179                         break;
180                 if (received == -1) {
181                         if (errno != EINTR && errno != EAGAIN)
182                                 g_warning ("selector_thread_wakeup_drain_pipes: read () failed, error (%d) %s\n", errno, g_strerror (errno));
183                         break;
184                 }
185 #else
186                 received = recv (threadpool_io->wakeup_pipes [0], buffer, sizeof (buffer), 0);
187                 if (received == 0)
188                         break;
189                 if (received == SOCKET_ERROR) {
190                         if (WSAGetLastError () != WSAEINTR && WSAGetLastError () != WSAEWOULDBLOCK)
191                                 g_warning ("selector_thread_wakeup_drain_pipes: recv () failed, error (%d) %s\n", WSAGetLastError ());
192                         break;
193                 }
194 #endif
195         }
196 }
197
198 typedef struct {
199         MonoDomain *domain;
200         MonoGHashTable *states;
201 } FilterSockaresForDomainData;
202
203 static void
204 filter_jobs_for_domain (gpointer key, gpointer value, gpointer user_data)
205 {
206         FilterSockaresForDomainData *data;
207         MonoMList *list = (MonoMList *)value, *element;
208         MonoDomain *domain;
209         MonoGHashTable *states;
210
211         g_assert (user_data);
212         data = (FilterSockaresForDomainData *)user_data;
213         domain = data->domain;
214         states = data->states;
215
216         for (element = list; element; element = mono_mlist_next (element)) {
217                 MonoIOSelectorJob *job = (MonoIOSelectorJob*) mono_mlist_get_data (element);
218                 if (mono_object_domain (job) == domain)
219                         mono_mlist_set_data (element, NULL);
220         }
221
222         /* we skip all the first elements which are NULL */
223         for (; list; list = mono_mlist_next (list)) {
224                 if (mono_mlist_get_data (list))
225                         break;
226         }
227
228         if (list) {
229                 g_assert (mono_mlist_get_data (list));
230
231                 /* we delete all the NULL elements after the first one */
232                 for (element = list; element;) {
233                         MonoMList *next;
234                         if (!(next = mono_mlist_next (element)))
235                                 break;
236                         if (mono_mlist_get_data (next))
237                                 element = next;
238                         else
239                                 mono_mlist_set_next (element, mono_mlist_next (next));
240                 }
241         }
242
243         mono_g_hash_table_replace (states, key, list);
244 }
245
246 static void
247 wait_callback (gint fd, gint events, gpointer user_data)
248 {
249         MonoError error;
250
251         if (mono_runtime_is_shutting_down ())
252                 return;
253
254         if (fd == threadpool_io->wakeup_pipes [0]) {
255                 mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: wke");
256                 selector_thread_wakeup_drain_pipes ();
257         } else {
258                 MonoGHashTable *states;
259                 MonoMList *list = NULL;
260                 gpointer k;
261                 gboolean remove_fd = FALSE;
262                 gint operations;
263
264                 g_assert (user_data);
265                 states = (MonoGHashTable *)user_data;
266
267                 mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: cal fd %3d, events = %2s | %2s | %3s",
268                         fd, (events & EVENT_IN) ? "RD" : "..", (events & EVENT_OUT) ? "WR" : "..", (events & EVENT_ERR) ? "ERR" : "...");
269
270                 if (!mono_g_hash_table_lookup_extended (states, GINT_TO_POINTER (fd), &k, (gpointer*) &list))
271                         g_error ("wait_callback: fd %d not found in states table", fd);
272
273                 if (list && (events & EVENT_IN) != 0) {
274                         MonoIOSelectorJob *job = get_job_for_event (&list, EVENT_IN);
275                         if (job) {
276                                 mono_threadpool_enqueue_work_item (((MonoObject*) job)->vtable->domain, (MonoObject*) job, &error);
277                                 mono_error_assert_ok (&error);
278                         }
279
280                 }
281                 if (list && (events & EVENT_OUT) != 0) {
282                         MonoIOSelectorJob *job = get_job_for_event (&list, EVENT_OUT);
283                         if (job) {
284                                 mono_threadpool_enqueue_work_item (((MonoObject*) job)->vtable->domain, (MonoObject*) job, &error);
285                                 mono_error_assert_ok (&error);
286                         }
287                 }
288
289                 remove_fd = (events & EVENT_ERR) == EVENT_ERR;
290                 if (!remove_fd) {
291                         mono_g_hash_table_replace (states, GINT_TO_POINTER (fd), list);
292
293                         operations = get_operations_for_jobs (list);
294
295                         mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: res fd %3d, events = %2s | %2s | %3s",
296                                 fd, (operations & EVENT_IN) ? "RD" : "..", (operations & EVENT_OUT) ? "WR" : "..", (operations & EVENT_ERR) ? "ERR" : "...");
297
298                         threadpool_io->backend.register_fd (fd, operations, FALSE);
299                 } else {
300                         mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: err fd %d", fd);
301
302                         mono_g_hash_table_remove (states, GINT_TO_POINTER (fd));
303
304                         threadpool_io->backend.remove_fd (fd);
305                 }
306         }
307 }
308
309 static gsize WINAPI
310 selector_thread (gpointer data)
311 {
312         MonoError error;
313         MonoGHashTable *states;
314
315         io_selector_running = TRUE;
316
317         if (mono_runtime_is_shutting_down ()) {
318                 io_selector_running = FALSE;
319                 return 0;
320         }
321
322         states = mono_g_hash_table_new_type (g_direct_hash, g_direct_equal, MONO_HASH_VALUE_GC, MONO_ROOT_SOURCE_THREAD_POOL, "i/o thread pool states table");
323
324         for (;;) {
325                 gint i, j;
326                 gint res;
327
328                 mono_coop_mutex_lock (&threadpool_io->updates_lock);
329
330                 for (i = 0; i < threadpool_io->updates_size; ++i) {
331                         ThreadPoolIOUpdate *update = &threadpool_io->updates [i];
332
333                         switch (update->type) {
334                         case UPDATE_EMPTY:
335                                 break;
336                         case UPDATE_ADD: {
337                                 gint fd;
338                                 gint operations;
339                                 gpointer k;
340                                 gboolean exists;
341                                 MonoMList *list = NULL;
342                                 MonoIOSelectorJob *job;
343
344                                 fd = update->data.add.fd;
345                                 g_assert (fd >= 0);
346
347                                 job = update->data.add.job;
348                                 g_assert (job);
349
350                                 exists = mono_g_hash_table_lookup_extended (states, GINT_TO_POINTER (fd), &k, (gpointer*) &list);
351                                 list = mono_mlist_append_checked (list, (MonoObject*) job, &error);
352                                 mono_error_assert_ok (&error);
353                                 mono_g_hash_table_replace (states, GINT_TO_POINTER (fd), list);
354
355                                 operations = get_operations_for_jobs (list);
356
357                                 mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: %3s fd %3d, operations = %2s | %2s | %3s",
358                                         exists ? "mod" : "add", fd, (operations & EVENT_IN) ? "RD" : "..", (operations & EVENT_OUT) ? "WR" : "..", (operations & EVENT_ERR) ? "ERR" : "...");
359
360                                 threadpool_io->backend.register_fd (fd, operations, !exists);
361
362                                 break;
363                         }
364                         case UPDATE_REMOVE_SOCKET: {
365                                 gint fd;
366                                 gpointer k;
367                                 MonoMList *list = NULL;
368
369                                 fd = update->data.remove_socket.fd;
370                                 g_assert (fd >= 0);
371
372                                 if (mono_g_hash_table_lookup_extended (states, GINT_TO_POINTER (fd), &k, (gpointer*) &list)) {
373                                         mono_g_hash_table_remove (states, GINT_TO_POINTER (fd));
374
375                                         for (j = i + 1; j < threadpool_io->updates_size; ++j) {
376                                                 ThreadPoolIOUpdate *update = &threadpool_io->updates [j];
377                                                 if (update->type == UPDATE_ADD && update->data.add.fd == fd)
378                                                         memset (update, 0, sizeof (ThreadPoolIOUpdate));
379                                         }
380
381                                         for (; list; list = mono_mlist_remove_item (list, list)) {
382                                                 mono_threadpool_enqueue_work_item (mono_object_domain (mono_mlist_get_data (list)), mono_mlist_get_data (list), &error);
383                                                 mono_error_assert_ok (&error);
384                                         }
385
386                                         mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: del fd %3d", fd);
387                                         threadpool_io->backend.remove_fd (fd);
388                                 }
389
390                                 break;
391                         }
392                         case UPDATE_REMOVE_DOMAIN: {
393                                 MonoDomain *domain;
394
395                                 domain = update->data.remove_domain.domain;
396                                 g_assert (domain);
397
398                                 FilterSockaresForDomainData user_data = { .domain = domain, .states = states };
399                                 mono_g_hash_table_foreach (states, filter_jobs_for_domain, &user_data);
400
401                                 for (j = i + 1; j < threadpool_io->updates_size; ++j) {
402                                         ThreadPoolIOUpdate *update = &threadpool_io->updates [j];
403                                         if (update->type == UPDATE_ADD && mono_object_domain (update->data.add.job) == domain)
404                                                 memset (update, 0, sizeof (ThreadPoolIOUpdate));
405                                 }
406
407                                 break;
408                         }
409                         default:
410                                 g_assert_not_reached ();
411                         }
412                 }
413
414                 mono_coop_cond_broadcast (&threadpool_io->updates_cond);
415
416                 if (threadpool_io->updates_size > 0) {
417                         threadpool_io->updates_size = 0;
418                         memset (&threadpool_io->updates, 0, UPDATES_CAPACITY * sizeof (ThreadPoolIOUpdate));
419                 }
420
421                 mono_coop_mutex_unlock (&threadpool_io->updates_lock);
422
423                 mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: wai");
424
425                 res = threadpool_io->backend.event_wait (wait_callback, states);
426
427                 if (res == -1 || mono_runtime_is_shutting_down ())
428                         break;
429         }
430
431         mono_g_hash_table_destroy (states);
432
433         io_selector_running = FALSE;
434
435         return 0;
436 }
437
438 /* Locking: threadpool_io->updates_lock must be held */
439 static ThreadPoolIOUpdate*
440 update_get_new (void)
441 {
442         ThreadPoolIOUpdate *update = NULL;
443         g_assert (threadpool_io->updates_size <= UPDATES_CAPACITY);
444
445         while (threadpool_io->updates_size == UPDATES_CAPACITY) {
446                 /* we wait for updates to be applied in the selector_thread and we loop
447                  * as long as none are available. if it happends too much, then we need
448                  * to increase UPDATES_CAPACITY */
449                 mono_coop_cond_wait (&threadpool_io->updates_cond, &threadpool_io->updates_lock);
450         }
451
452         g_assert (threadpool_io->updates_size < UPDATES_CAPACITY);
453
454         update = &threadpool_io->updates [threadpool_io->updates_size ++];
455
456         return update;
457 }
458
459 static void
460 wakeup_pipes_init (void)
461 {
462 #if !defined(HOST_WIN32)
463         if (pipe (threadpool_io->wakeup_pipes) == -1)
464                 g_error ("wakeup_pipes_init: pipe () failed, error (%d) %s\n", errno, g_strerror (errno));
465         if (fcntl (threadpool_io->wakeup_pipes [0], F_SETFL, O_NONBLOCK) == -1)
466                 g_error ("wakeup_pipes_init: fcntl () failed, error (%d) %s\n", errno, g_strerror (errno));
467 #else
468         struct sockaddr_in client;
469         struct sockaddr_in server;
470         SOCKET server_sock;
471         gulong arg;
472         gint size;
473
474         server_sock = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
475         g_assert (server_sock != INVALID_SOCKET);
476         threadpool_io->wakeup_pipes [1] = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
477         g_assert (threadpool_io->wakeup_pipes [1] != INVALID_SOCKET);
478
479         server.sin_family = AF_INET;
480         server.sin_addr.s_addr = inet_addr ("127.0.0.1");
481         server.sin_port = 0;
482         if (bind (server_sock, (SOCKADDR*) &server, sizeof (server)) == SOCKET_ERROR) {
483                 closesocket (server_sock);
484                 g_error ("wakeup_pipes_init: bind () failed, error (%d)\n", WSAGetLastError ());
485         }
486
487         size = sizeof (server);
488         if (getsockname (server_sock, (SOCKADDR*) &server, &size) == SOCKET_ERROR) {
489                 closesocket (server_sock);
490                 g_error ("wakeup_pipes_init: getsockname () failed, error (%d)\n", WSAGetLastError ());
491         }
492         if (listen (server_sock, 1024) == SOCKET_ERROR) {
493                 closesocket (server_sock);
494                 g_error ("wakeup_pipes_init: listen () failed, error (%d)\n", WSAGetLastError ());
495         }
496         if (connect ((SOCKET) threadpool_io->wakeup_pipes [1], (SOCKADDR*) &server, sizeof (server)) == SOCKET_ERROR) {
497                 closesocket (server_sock);
498                 g_error ("wakeup_pipes_init: connect () failed, error (%d)\n", WSAGetLastError ());
499         }
500
501         size = sizeof (client);
502         threadpool_io->wakeup_pipes [0] = accept (server_sock, (SOCKADDR *) &client, &size);
503         g_assert (threadpool_io->wakeup_pipes [0] != INVALID_SOCKET);
504
505         arg = 1;
506         if (ioctlsocket (threadpool_io->wakeup_pipes [0], FIONBIO, &arg) == SOCKET_ERROR) {
507                 closesocket (threadpool_io->wakeup_pipes [0]);
508                 closesocket (server_sock);
509                 g_error ("wakeup_pipes_init: ioctlsocket () failed, error (%d)\n", WSAGetLastError ());
510         }
511
512         closesocket (server_sock);
513 #endif
514 }
515
516 static void
517 initialize (void)
518 {
519         g_assert (!threadpool_io);
520         threadpool_io = g_new0 (ThreadPoolIO, 1);
521         g_assert (threadpool_io);
522
523         mono_coop_mutex_init (&threadpool_io->updates_lock);
524         mono_coop_cond_init (&threadpool_io->updates_cond);
525         mono_gc_register_root ((char *)&threadpool_io->updates [0], sizeof (threadpool_io->updates), MONO_GC_DESCRIPTOR_NULL, MONO_ROOT_SOURCE_THREAD_POOL, "i/o thread pool updates list");
526
527         threadpool_io->updates_size = 0;
528
529         threadpool_io->backend = backend_poll;
530         if (g_getenv ("MONO_ENABLE_AIO") != NULL) {
531 #if defined(HAVE_EPOLL)
532                 threadpool_io->backend = backend_epoll;
533 #elif defined(HAVE_KQUEUE)
534                 threadpool_io->backend = backend_kqueue;
535 #endif
536         }
537
538         wakeup_pipes_init ();
539
540         if (!threadpool_io->backend.init (threadpool_io->wakeup_pipes [0]))
541                 g_error ("initialize: backend->init () failed");
542
543         MonoError error;
544         if (!mono_thread_create_internal (mono_get_root_domain (), selector_thread, NULL, TRUE, SMALL_STACK, &error))
545                 g_error ("initialize: mono_thread_create_internal () failed due to %s", mono_error_get_message (&error));
546 }
547
548 static void
549 cleanup (void)
550 {
551         /* we make the assumption along the code that we are
552          * cleaning up only if the runtime is shutting down */
553         g_assert (mono_runtime_is_shutting_down ());
554
555         selector_thread_wakeup ();
556         while (io_selector_running)
557                 mono_thread_info_usleep (1000);
558 }
559
560 void
561 mono_threadpool_io_cleanup (void)
562 {
563         mono_lazy_cleanup (&io_status, cleanup);
564 }
565
566 void
567 ves_icall_System_IOSelector_Add (gpointer handle, MonoIOSelectorJob *job)
568 {
569         ThreadPoolIOUpdate *update;
570
571         g_assert (handle);
572
573         g_assert ((job->operation == EVENT_IN) ^ (job->operation == EVENT_OUT));
574         g_assert (job->callback);
575
576         if (mono_runtime_is_shutting_down ())
577                 return;
578         if (mono_domain_is_unloading (mono_object_domain (job)))
579                 return;
580
581         mono_lazy_initialize (&io_status, initialize);
582
583         mono_coop_mutex_lock (&threadpool_io->updates_lock);
584
585         update = update_get_new ();
586         update->type = UPDATE_ADD;
587         update->data.add.fd = GPOINTER_TO_INT (handle);
588         update->data.add.job = job;
589         mono_memory_barrier (); /* Ensure this is safely published before we wake up the selector */
590
591         selector_thread_wakeup ();
592
593         mono_coop_mutex_unlock (&threadpool_io->updates_lock);
594 }
595
596 void
597 ves_icall_System_IOSelector_Remove (gpointer handle)
598 {
599         mono_threadpool_io_remove_socket (GPOINTER_TO_INT (handle));
600 }
601
602 void
603 mono_threadpool_io_remove_socket (int fd)
604 {
605         ThreadPoolIOUpdate *update;
606
607         if (!mono_lazy_is_initialized (&io_status))
608                 return;
609
610         mono_coop_mutex_lock (&threadpool_io->updates_lock);
611
612         update = update_get_new ();
613         update->type = UPDATE_REMOVE_SOCKET;
614         update->data.add.fd = fd;
615         mono_memory_barrier (); /* Ensure this is safely published before we wake up the selector */
616
617         selector_thread_wakeup ();
618
619         mono_coop_cond_wait (&threadpool_io->updates_cond, &threadpool_io->updates_lock);
620
621         mono_coop_mutex_unlock (&threadpool_io->updates_lock);
622 }
623
624 void
625 mono_threadpool_io_remove_domain_jobs (MonoDomain *domain)
626 {
627         ThreadPoolIOUpdate *update;
628
629         if (!mono_lazy_is_initialized (&io_status))
630                 return;
631
632         mono_coop_mutex_lock (&threadpool_io->updates_lock);
633
634         update = update_get_new ();
635         update->type = UPDATE_REMOVE_DOMAIN;
636         update->data.remove_domain.domain = domain;
637         mono_memory_barrier (); /* Ensure this is safely published before we wake up the selector */
638
639         selector_thread_wakeup ();
640
641         mono_coop_cond_wait (&threadpool_io->updates_cond, &threadpool_io->updates_lock);
642
643         mono_coop_mutex_unlock (&threadpool_io->updates_lock);
644 }
645
646 #else
647
648 void
649 ves_icall_System_IOSelector_Add (gpointer handle, MonoIOSelectorJob *job)
650 {
651         g_assert_not_reached ();
652 }
653
654 void
655 ves_icall_System_IOSelector_Remove (gpointer handle)
656 {
657         g_assert_not_reached ();
658 }
659
660 void
661 mono_threadpool_io_cleanup (void)
662 {
663         g_assert_not_reached ();
664 }
665
666 void
667 mono_threadpool_io_remove_socket (int fd)
668 {
669         g_assert_not_reached ();
670 }
671
672 void
673 mono_threadpool_io_remove_domain_jobs (MonoDomain *domain)
674 {
675         g_assert_not_reached ();
676 }
677
678 #endif