[threadpool-io] Ensure selector thread is running before waiting on it (#4572)
[mono.git] / mono / metadata / threadpool-io.c
1 /*
2  * threadpool-io.c: Microsoft IO threadpool runtime support
3  *
4  * Author:
5  *      Ludovic Henry (ludovic.henry@xamarin.com)
6  *
7  * Copyright 2015 Xamarin, Inc (http://www.xamarin.com)
8  * Licensed under the MIT license. See LICENSE file in the project root for full license information.
9  */
10
11 #include <config.h>
12
13 #ifndef DISABLE_SOCKETS
14
15 #include <glib.h>
16
17 #if defined(HOST_WIN32)
18 #include <windows.h>
19 #else
20 #include <errno.h>
21 #include <fcntl.h>
22 #endif
23
24 #include <mono/metadata/gc-internals.h>
25 #include <mono/metadata/mono-mlist.h>
26 #include <mono/metadata/threadpool.h>
27 #include <mono/metadata/threadpool-io.h>
28 #include <mono/utils/atomic.h>
29 #include <mono/utils/mono-threads.h>
30 #include <mono/utils/mono-lazy-init.h>
31 #include <mono/utils/mono-logger-internals.h>
32 #include <mono/utils/w32api.h>
33
34 typedef struct {
35         gboolean (*init) (gint wakeup_pipe_fd);
36         void     (*register_fd) (gint fd, gint events, gboolean is_new);
37         void     (*remove_fd) (gint fd);
38         gint     (*event_wait) (void (*callback) (gint fd, gint events, gpointer user_data), gpointer user_data);
39 } ThreadPoolIOBackend;
40
41 /* Keep in sync with System.IOOperation in mcs/class/System/System/IOSelector.cs */
42 enum MonoIOOperation {
43         EVENT_IN   = 1 << 0,
44         EVENT_OUT  = 1 << 1,
45         EVENT_ERR  = 1 << 2, /* not in managed */
46 };
47
48 #include "threadpool-io-epoll.c"
49 #include "threadpool-io-kqueue.c"
50 #include "threadpool-io-poll.c"
51
52 #define UPDATES_CAPACITY 128
53
54 /* Keep in sync with System.IOSelectorJob in mcs/class/System/System/IOSelector.cs */
55 struct _MonoIOSelectorJob {
56         MonoObject object;
57         gint32 operation;
58         MonoObject *callback;
59         MonoObject *state;
60 };
61
62 typedef enum {
63         UPDATE_EMPTY = 0,
64         UPDATE_ADD,
65         UPDATE_REMOVE_SOCKET,
66         UPDATE_REMOVE_DOMAIN,
67 } ThreadPoolIOUpdateType;
68
69 typedef struct {
70         gint fd;
71         MonoIOSelectorJob *job;
72 } ThreadPoolIOUpdate_Add;
73
74 typedef struct {
75         gint fd;
76 } ThreadPoolIOUpdate_RemoveSocket;
77
78 typedef struct {
79         MonoDomain *domain;
80 } ThreadPoolIOUpdate_RemoveDomain;
81
82 typedef struct {
83         ThreadPoolIOUpdateType type;
84         union {
85                 ThreadPoolIOUpdate_Add add;
86                 ThreadPoolIOUpdate_RemoveSocket remove_socket;
87                 ThreadPoolIOUpdate_RemoveDomain remove_domain;
88         } data;
89 } ThreadPoolIOUpdate;
90
91 typedef struct {
92         ThreadPoolIOBackend backend;
93
94         ThreadPoolIOUpdate updates [UPDATES_CAPACITY];
95         gint updates_size;
96         MonoCoopMutex updates_lock;
97         MonoCoopCond updates_cond;
98
99 #if !defined(HOST_WIN32)
100         gint wakeup_pipes [2];
101 #else
102         SOCKET wakeup_pipes [2];
103 #endif
104 } ThreadPoolIO;
105
106 static mono_lazy_init_t io_status = MONO_LAZY_INIT_STATUS_NOT_INITIALIZED;
107
108 static gboolean io_selector_running = FALSE;
109
110 static ThreadPoolIO* threadpool_io;
111
112 static MonoIOSelectorJob*
113 get_job_for_event (MonoMList **list, gint32 event)
114 {
115         MonoMList *current;
116
117         g_assert (list);
118
119         for (current = *list; current; current = mono_mlist_next (current)) {
120                 MonoIOSelectorJob *job = (MonoIOSelectorJob*) mono_mlist_get_data (current);
121                 if (job->operation == event) {
122                         *list = mono_mlist_remove_item (*list, current);
123                         return job;
124                 }
125         }
126
127         return NULL;
128 }
129
130 static gint
131 get_operations_for_jobs (MonoMList *list)
132 {
133         MonoMList *current;
134         gint operations = 0;
135
136         for (current = list; current; current = mono_mlist_next (current))
137                 operations |= ((MonoIOSelectorJob*) mono_mlist_get_data (current))->operation;
138
139         return operations;
140 }
141
142 static void
143 selector_thread_wakeup (void)
144 {
145         gchar msg = 'c';
146         gint written;
147
148         for (;;) {
149 #if !defined(HOST_WIN32)
150                 written = write (threadpool_io->wakeup_pipes [1], &msg, 1);
151                 if (written == 1)
152                         break;
153                 if (written == -1) {
154                         g_warning ("selector_thread_wakeup: write () failed, error (%d) %s\n", errno, g_strerror (errno));
155                         break;
156                 }
157 #else
158                 written = send (threadpool_io->wakeup_pipes [1], &msg, 1, 0);
159                 if (written == 1)
160                         break;
161                 if (written == SOCKET_ERROR) {
162                         g_warning ("selector_thread_wakeup: write () failed, error (%d)\n", WSAGetLastError ());
163                         break;
164                 }
165 #endif
166         }
167 }
168
169 static void
170 selector_thread_wakeup_drain_pipes (void)
171 {
172         gchar buffer [128];
173         gint received;
174
175         for (;;) {
176 #if !defined(HOST_WIN32)
177                 received = read (threadpool_io->wakeup_pipes [0], buffer, sizeof (buffer));
178                 if (received == 0)
179                         break;
180                 if (received == -1) {
181                         if (errno != EINTR && errno != EAGAIN)
182                                 g_warning ("selector_thread_wakeup_drain_pipes: read () failed, error (%d) %s\n", errno, g_strerror (errno));
183                         break;
184                 }
185 #else
186                 received = recv (threadpool_io->wakeup_pipes [0], buffer, sizeof (buffer), 0);
187                 if (received == 0)
188                         break;
189                 if (received == SOCKET_ERROR) {
190                         if (WSAGetLastError () != WSAEINTR && WSAGetLastError () != WSAEWOULDBLOCK)
191                                 g_warning ("selector_thread_wakeup_drain_pipes: recv () failed, error (%d) %s\n", WSAGetLastError ());
192                         break;
193                 }
194 #endif
195         }
196 }
197
198 typedef struct {
199         MonoDomain *domain;
200         MonoGHashTable *states;
201 } FilterSockaresForDomainData;
202
203 static void
204 filter_jobs_for_domain (gpointer key, gpointer value, gpointer user_data)
205 {
206         FilterSockaresForDomainData *data;
207         MonoMList *list = (MonoMList *)value, *element;
208         MonoDomain *domain;
209         MonoGHashTable *states;
210
211         g_assert (user_data);
212         data = (FilterSockaresForDomainData *)user_data;
213         domain = data->domain;
214         states = data->states;
215
216         for (element = list; element; element = mono_mlist_next (element)) {
217                 MonoIOSelectorJob *job = (MonoIOSelectorJob*) mono_mlist_get_data (element);
218                 if (mono_object_domain (job) == domain)
219                         mono_mlist_set_data (element, NULL);
220         }
221
222         /* we skip all the first elements which are NULL */
223         for (; list; list = mono_mlist_next (list)) {
224                 if (mono_mlist_get_data (list))
225                         break;
226         }
227
228         if (list) {
229                 g_assert (mono_mlist_get_data (list));
230
231                 /* we delete all the NULL elements after the first one */
232                 for (element = list; element;) {
233                         MonoMList *next;
234                         if (!(next = mono_mlist_next (element)))
235                                 break;
236                         if (mono_mlist_get_data (next))
237                                 element = next;
238                         else
239                                 mono_mlist_set_next (element, mono_mlist_next (next));
240                 }
241         }
242
243         mono_g_hash_table_replace (states, key, list);
244 }
245
246 static void
247 wait_callback (gint fd, gint events, gpointer user_data)
248 {
249         MonoError error;
250
251         if (mono_runtime_is_shutting_down ())
252                 return;
253
254         if (fd == threadpool_io->wakeup_pipes [0]) {
255                 mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: wke");
256                 selector_thread_wakeup_drain_pipes ();
257         } else {
258                 MonoGHashTable *states;
259                 MonoMList *list = NULL;
260                 gpointer k;
261                 gboolean remove_fd = FALSE;
262                 gint operations;
263
264                 g_assert (user_data);
265                 states = (MonoGHashTable *)user_data;
266
267                 mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: cal fd %3d, events = %2s | %2s | %3s",
268                         fd, (events & EVENT_IN) ? "RD" : "..", (events & EVENT_OUT) ? "WR" : "..", (events & EVENT_ERR) ? "ERR" : "...");
269
270                 if (!mono_g_hash_table_lookup_extended (states, GINT_TO_POINTER (fd), &k, (gpointer*) &list))
271                         g_error ("wait_callback: fd %d not found in states table", fd);
272
273                 if (list && (events & EVENT_IN) != 0) {
274                         MonoIOSelectorJob *job = get_job_for_event (&list, EVENT_IN);
275                         if (job) {
276                                 mono_threadpool_enqueue_work_item (((MonoObject*) job)->vtable->domain, (MonoObject*) job, &error);
277                                 mono_error_assert_ok (&error);
278                         }
279
280                 }
281                 if (list && (events & EVENT_OUT) != 0) {
282                         MonoIOSelectorJob *job = get_job_for_event (&list, EVENT_OUT);
283                         if (job) {
284                                 mono_threadpool_enqueue_work_item (((MonoObject*) job)->vtable->domain, (MonoObject*) job, &error);
285                                 mono_error_assert_ok (&error);
286                         }
287                 }
288
289                 remove_fd = (events & EVENT_ERR) == EVENT_ERR;
290                 if (!remove_fd) {
291                         mono_g_hash_table_replace (states, GINT_TO_POINTER (fd), list);
292
293                         operations = get_operations_for_jobs (list);
294
295                         mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: res fd %3d, events = %2s | %2s | %3s",
296                                 fd, (operations & EVENT_IN) ? "RD" : "..", (operations & EVENT_OUT) ? "WR" : "..", (operations & EVENT_ERR) ? "ERR" : "...");
297
298                         threadpool_io->backend.register_fd (fd, operations, FALSE);
299                 } else {
300                         mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: err fd %d", fd);
301
302                         mono_g_hash_table_remove (states, GINT_TO_POINTER (fd));
303
304                         threadpool_io->backend.remove_fd (fd);
305                 }
306         }
307 }
308
309 static void
310 selector_thread_interrupt (gpointer unused)
311 {
312         selector_thread_wakeup ();
313 }
314
315 static gsize WINAPI
316 selector_thread (gpointer data)
317 {
318         MonoError error;
319         MonoGHashTable *states;
320
321         if (mono_runtime_is_shutting_down ()) {
322                 io_selector_running = FALSE;
323                 return 0;
324         }
325
326         states = mono_g_hash_table_new_type (g_direct_hash, NULL, MONO_HASH_VALUE_GC, MONO_ROOT_SOURCE_THREAD_POOL, "i/o thread pool states table");
327
328         while (!mono_runtime_is_shutting_down ()) {
329                 gint i, j;
330                 gint res;
331                 gboolean interrupted = FALSE;
332
333                 if (mono_thread_interruption_checkpoint ())
334                         continue;
335
336                 mono_coop_mutex_lock (&threadpool_io->updates_lock);
337
338                 for (i = 0; i < threadpool_io->updates_size; ++i) {
339                         ThreadPoolIOUpdate *update = &threadpool_io->updates [i];
340
341                         switch (update->type) {
342                         case UPDATE_EMPTY:
343                                 break;
344                         case UPDATE_ADD: {
345                                 gint fd;
346                                 gint operations;
347                                 gpointer k;
348                                 gboolean exists;
349                                 MonoMList *list = NULL;
350                                 MonoIOSelectorJob *job;
351
352                                 fd = update->data.add.fd;
353                                 g_assert (fd >= 0);
354
355                                 job = update->data.add.job;
356                                 g_assert (job);
357
358                                 exists = mono_g_hash_table_lookup_extended (states, GINT_TO_POINTER (fd), &k, (gpointer*) &list);
359                                 list = mono_mlist_append_checked (list, (MonoObject*) job, &error);
360                                 mono_error_assert_ok (&error);
361                                 mono_g_hash_table_replace (states, GINT_TO_POINTER (fd), list);
362
363                                 operations = get_operations_for_jobs (list);
364
365                                 mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: %3s fd %3d, operations = %2s | %2s | %3s",
366                                         exists ? "mod" : "add", fd, (operations & EVENT_IN) ? "RD" : "..", (operations & EVENT_OUT) ? "WR" : "..", (operations & EVENT_ERR) ? "ERR" : "...");
367
368                                 threadpool_io->backend.register_fd (fd, operations, !exists);
369
370                                 break;
371                         }
372                         case UPDATE_REMOVE_SOCKET: {
373                                 gint fd;
374                                 gpointer k;
375                                 MonoMList *list = NULL;
376
377                                 fd = update->data.remove_socket.fd;
378                                 g_assert (fd >= 0);
379
380                                 if (mono_g_hash_table_lookup_extended (states, GINT_TO_POINTER (fd), &k, (gpointer*) &list)) {
381                                         mono_g_hash_table_remove (states, GINT_TO_POINTER (fd));
382
383                                         for (j = i + 1; j < threadpool_io->updates_size; ++j) {
384                                                 ThreadPoolIOUpdate *update = &threadpool_io->updates [j];
385                                                 if (update->type == UPDATE_ADD && update->data.add.fd == fd)
386                                                         memset (update, 0, sizeof (ThreadPoolIOUpdate));
387                                         }
388
389                                         for (; list; list = mono_mlist_remove_item (list, list)) {
390                                                 mono_threadpool_enqueue_work_item (mono_object_domain (mono_mlist_get_data (list)), mono_mlist_get_data (list), &error);
391                                                 mono_error_assert_ok (&error);
392                                         }
393
394                                         mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: del fd %3d", fd);
395                                         threadpool_io->backend.remove_fd (fd);
396                                 }
397
398                                 break;
399                         }
400                         case UPDATE_REMOVE_DOMAIN: {
401                                 MonoDomain *domain;
402
403                                 domain = update->data.remove_domain.domain;
404                                 g_assert (domain);
405
406                                 FilterSockaresForDomainData user_data = { .domain = domain, .states = states };
407                                 mono_g_hash_table_foreach (states, filter_jobs_for_domain, &user_data);
408
409                                 for (j = i + 1; j < threadpool_io->updates_size; ++j) {
410                                         ThreadPoolIOUpdate *update = &threadpool_io->updates [j];
411                                         if (update->type == UPDATE_ADD && mono_object_domain (update->data.add.job) == domain)
412                                                 memset (update, 0, sizeof (ThreadPoolIOUpdate));
413                                 }
414
415                                 break;
416                         }
417                         default:
418                                 g_assert_not_reached ();
419                         }
420                 }
421
422                 mono_coop_cond_broadcast (&threadpool_io->updates_cond);
423
424                 if (threadpool_io->updates_size > 0) {
425                         threadpool_io->updates_size = 0;
426                         memset (&threadpool_io->updates, 0, UPDATES_CAPACITY * sizeof (ThreadPoolIOUpdate));
427                 }
428
429                 mono_coop_mutex_unlock (&threadpool_io->updates_lock);
430
431                 mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: wai");
432
433                 mono_thread_info_install_interrupt (selector_thread_interrupt, NULL, &interrupted);
434                 if (interrupted)
435                         continue;
436
437                 res = threadpool_io->backend.event_wait (wait_callback, states);
438                 if (res == -1)
439                         break;
440
441                 mono_thread_info_uninstall_interrupt (&interrupted);
442         }
443
444         mono_g_hash_table_destroy (states);
445
446         mono_coop_mutex_lock (&threadpool_io->updates_lock);
447
448         io_selector_running = FALSE;
449         mono_coop_cond_broadcast (&threadpool_io->updates_cond);
450
451         mono_coop_mutex_unlock (&threadpool_io->updates_lock);
452
453         return 0;
454 }
455
456 /* Locking: threadpool_io->updates_lock must be held */
457 static ThreadPoolIOUpdate*
458 update_get_new (void)
459 {
460         ThreadPoolIOUpdate *update = NULL;
461         g_assert (threadpool_io->updates_size <= UPDATES_CAPACITY);
462
463         while (threadpool_io->updates_size == UPDATES_CAPACITY) {
464                 /* we wait for updates to be applied in the selector_thread and we loop
465                  * as long as none are available. if it happends too much, then we need
466                  * to increase UPDATES_CAPACITY */
467                 mono_coop_cond_wait (&threadpool_io->updates_cond, &threadpool_io->updates_lock);
468         }
469
470         g_assert (threadpool_io->updates_size < UPDATES_CAPACITY);
471
472         update = &threadpool_io->updates [threadpool_io->updates_size ++];
473
474         return update;
475 }
476
477 static void
478 wakeup_pipes_init (void)
479 {
480 #if !defined(HOST_WIN32)
481         if (pipe (threadpool_io->wakeup_pipes) == -1)
482                 g_error ("wakeup_pipes_init: pipe () failed, error (%d) %s\n", errno, g_strerror (errno));
483         if (fcntl (threadpool_io->wakeup_pipes [0], F_SETFL, O_NONBLOCK) == -1)
484                 g_error ("wakeup_pipes_init: fcntl () failed, error (%d) %s\n", errno, g_strerror (errno));
485 #else
486         struct sockaddr_in client;
487         struct sockaddr_in server;
488         SOCKET server_sock;
489         gulong arg;
490         gint size;
491
492         server_sock = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
493         g_assert (server_sock != INVALID_SOCKET);
494         threadpool_io->wakeup_pipes [1] = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
495         g_assert (threadpool_io->wakeup_pipes [1] != INVALID_SOCKET);
496
497         server.sin_family = AF_INET;
498         server.sin_addr.s_addr = inet_addr ("127.0.0.1");
499         server.sin_port = 0;
500         if (bind (server_sock, (SOCKADDR*) &server, sizeof (server)) == SOCKET_ERROR) {
501                 closesocket (server_sock);
502                 g_error ("wakeup_pipes_init: bind () failed, error (%d)\n", WSAGetLastError ());
503         }
504
505         size = sizeof (server);
506         if (getsockname (server_sock, (SOCKADDR*) &server, &size) == SOCKET_ERROR) {
507                 closesocket (server_sock);
508                 g_error ("wakeup_pipes_init: getsockname () failed, error (%d)\n", WSAGetLastError ());
509         }
510         if (listen (server_sock, 1024) == SOCKET_ERROR) {
511                 closesocket (server_sock);
512                 g_error ("wakeup_pipes_init: listen () failed, error (%d)\n", WSAGetLastError ());
513         }
514         if (connect ((SOCKET) threadpool_io->wakeup_pipes [1], (SOCKADDR*) &server, sizeof (server)) == SOCKET_ERROR) {
515                 closesocket (server_sock);
516                 g_error ("wakeup_pipes_init: connect () failed, error (%d)\n", WSAGetLastError ());
517         }
518
519         size = sizeof (client);
520         threadpool_io->wakeup_pipes [0] = accept (server_sock, (SOCKADDR *) &client, &size);
521         g_assert (threadpool_io->wakeup_pipes [0] != INVALID_SOCKET);
522
523         arg = 1;
524         if (ioctlsocket (threadpool_io->wakeup_pipes [0], FIONBIO, &arg) == SOCKET_ERROR) {
525                 closesocket (threadpool_io->wakeup_pipes [0]);
526                 closesocket (server_sock);
527                 g_error ("wakeup_pipes_init: ioctlsocket () failed, error (%d)\n", WSAGetLastError ());
528         }
529
530         closesocket (server_sock);
531 #endif
532 }
533
534 static void
535 initialize (void)
536 {
537         g_assert (!threadpool_io);
538         threadpool_io = g_new0 (ThreadPoolIO, 1);
539         g_assert (threadpool_io);
540
541         mono_coop_mutex_init (&threadpool_io->updates_lock);
542         mono_coop_cond_init (&threadpool_io->updates_cond);
543         mono_gc_register_root ((char *)&threadpool_io->updates [0], sizeof (threadpool_io->updates), MONO_GC_DESCRIPTOR_NULL, MONO_ROOT_SOURCE_THREAD_POOL, "i/o thread pool updates list");
544
545         threadpool_io->updates_size = 0;
546
547         threadpool_io->backend = backend_poll;
548         if (g_getenv ("MONO_ENABLE_AIO") != NULL) {
549 #if defined(HAVE_EPOLL)
550                 threadpool_io->backend = backend_epoll;
551 #elif defined(HAVE_KQUEUE)
552                 threadpool_io->backend = backend_kqueue;
553 #endif
554         }
555
556         wakeup_pipes_init ();
557
558         if (!threadpool_io->backend.init (threadpool_io->wakeup_pipes [0]))
559                 g_error ("initialize: backend->init () failed");
560
561         mono_coop_mutex_lock (&threadpool_io->updates_lock);
562
563         io_selector_running = TRUE;
564
565         MonoError error;
566         if (!mono_thread_create_internal (mono_get_root_domain (), selector_thread, NULL, MONO_THREAD_CREATE_FLAGS_THREADPOOL | MONO_THREAD_CREATE_FLAGS_SMALL_STACK, &error))
567                 g_error ("initialize: mono_thread_create_internal () failed due to %s", mono_error_get_message (&error));
568
569         mono_coop_mutex_unlock (&threadpool_io->updates_lock);
570 }
571
572 static void
573 cleanup (void)
574 {
575         // FIXME destroy everything
576 }
577
578 void
579 mono_threadpool_io_cleanup (void)
580 {
581         mono_lazy_cleanup (&io_status, cleanup);
582 }
583
584 void
585 ves_icall_System_IOSelector_Add (gpointer handle, MonoIOSelectorJob *job)
586 {
587         ThreadPoolIOUpdate *update;
588
589         g_assert (handle);
590
591         g_assert ((job->operation == EVENT_IN) ^ (job->operation == EVENT_OUT));
592         g_assert (job->callback);
593
594         if (mono_runtime_is_shutting_down ())
595                 return;
596         if (mono_domain_is_unloading (mono_object_domain (job)))
597                 return;
598
599         mono_lazy_initialize (&io_status, initialize);
600
601         mono_coop_mutex_lock (&threadpool_io->updates_lock);
602
603         if (!io_selector_running) {
604                 mono_coop_mutex_unlock (&threadpool_io->updates_lock);
605                 return;
606         }
607
608         update = update_get_new ();
609         update->type = UPDATE_ADD;
610         update->data.add.fd = GPOINTER_TO_INT (handle);
611         update->data.add.job = job;
612         mono_memory_barrier (); /* Ensure this is safely published before we wake up the selector */
613
614         selector_thread_wakeup ();
615
616         mono_coop_mutex_unlock (&threadpool_io->updates_lock);
617 }
618
619 void
620 ves_icall_System_IOSelector_Remove (gpointer handle)
621 {
622         mono_threadpool_io_remove_socket (GPOINTER_TO_INT (handle));
623 }
624
625 void
626 mono_threadpool_io_remove_socket (int fd)
627 {
628         ThreadPoolIOUpdate *update;
629
630         if (!mono_lazy_is_initialized (&io_status))
631                 return;
632
633         mono_coop_mutex_lock (&threadpool_io->updates_lock);
634
635         if (!io_selector_running) {
636                 mono_coop_mutex_unlock (&threadpool_io->updates_lock);
637                 return;
638         }
639
640         update = update_get_new ();
641         update->type = UPDATE_REMOVE_SOCKET;
642         update->data.add.fd = fd;
643         mono_memory_barrier (); /* Ensure this is safely published before we wake up the selector */
644
645         selector_thread_wakeup ();
646
647         mono_coop_cond_wait (&threadpool_io->updates_cond, &threadpool_io->updates_lock);
648
649         mono_coop_mutex_unlock (&threadpool_io->updates_lock);
650 }
651
652 void
653 mono_threadpool_io_remove_domain_jobs (MonoDomain *domain)
654 {
655         ThreadPoolIOUpdate *update;
656
657         if (!mono_lazy_is_initialized (&io_status))
658                 return;
659
660         mono_coop_mutex_lock (&threadpool_io->updates_lock);
661
662         if (!io_selector_running) {
663                 mono_coop_mutex_unlock (&threadpool_io->updates_lock);
664                 return;
665         }
666
667         update = update_get_new ();
668         update->type = UPDATE_REMOVE_DOMAIN;
669         update->data.remove_domain.domain = domain;
670         mono_memory_barrier (); /* Ensure this is safely published before we wake up the selector */
671
672         selector_thread_wakeup ();
673
674         mono_coop_cond_wait (&threadpool_io->updates_cond, &threadpool_io->updates_lock);
675
676         mono_coop_mutex_unlock (&threadpool_io->updates_lock);
677 }
678
679 #else
680
681 void
682 ves_icall_System_IOSelector_Add (gpointer handle, MonoIOSelectorJob *job)
683 {
684         g_assert_not_reached ();
685 }
686
687 void
688 ves_icall_System_IOSelector_Remove (gpointer handle)
689 {
690         g_assert_not_reached ();
691 }
692
693 void
694 mono_threadpool_io_cleanup (void)
695 {
696         g_assert_not_reached ();
697 }
698
699 void
700 mono_threadpool_io_remove_socket (int fd)
701 {
702         g_assert_not_reached ();
703 }
704
705 void
706 mono_threadpool_io_remove_domain_jobs (MonoDomain *domain)
707 {
708         g_assert_not_reached ();
709 }
710
711 #endif