Merge pull request #4621 from alexanderkyte/strdup_env
[mono.git] / mono / metadata / threadpool-io.c
1 /**
2  * \file
3  * Microsoft IO threadpool runtime support
4  *
5  * Author:
6  *      Ludovic Henry (ludovic.henry@xamarin.com)
7  *
8  * Copyright 2015 Xamarin, Inc (http://www.xamarin.com)
9  * Licensed under the MIT license. See LICENSE file in the project root for full license information.
10  */
11
12 #include <config.h>
13
14 #ifndef DISABLE_SOCKETS
15
16 #include <glib.h>
17
18 #if defined(HOST_WIN32)
19 #include <windows.h>
20 #else
21 #include <errno.h>
22 #include <fcntl.h>
23 #endif
24
25 #include <mono/metadata/gc-internals.h>
26 #include <mono/metadata/mono-mlist.h>
27 #include <mono/metadata/threadpool.h>
28 #include <mono/metadata/threadpool-io.h>
29 #include <mono/utils/atomic.h>
30 #include <mono/utils/mono-threads.h>
31 #include <mono/utils/mono-lazy-init.h>
32 #include <mono/utils/mono-logger-internals.h>
33 #include <mono/utils/w32api.h>
34
35 typedef struct {
36         gboolean (*init) (gint wakeup_pipe_fd);
37         void     (*register_fd) (gint fd, gint events, gboolean is_new);
38         void     (*remove_fd) (gint fd);
39         gint     (*event_wait) (void (*callback) (gint fd, gint events, gpointer user_data), gpointer user_data);
40 } ThreadPoolIOBackend;
41
42 /* Keep in sync with System.IOOperation in mcs/class/System/System/IOSelector.cs */
43 enum MonoIOOperation {
44         EVENT_IN   = 1 << 0,
45         EVENT_OUT  = 1 << 1,
46         EVENT_ERR  = 1 << 2, /* not in managed */
47 };
48
49 #include "threadpool-io-epoll.c"
50 #include "threadpool-io-kqueue.c"
51 #include "threadpool-io-poll.c"
52
53 #define UPDATES_CAPACITY 128
54
55 /* Keep in sync with System.IOSelectorJob in mcs/class/System/System/IOSelector.cs */
56 struct _MonoIOSelectorJob {
57         MonoObject object;
58         gint32 operation;
59         MonoObject *callback;
60         MonoObject *state;
61 };
62
63 typedef enum {
64         UPDATE_EMPTY = 0,
65         UPDATE_ADD,
66         UPDATE_REMOVE_SOCKET,
67         UPDATE_REMOVE_DOMAIN,
68 } ThreadPoolIOUpdateType;
69
70 typedef struct {
71         gint fd;
72         MonoIOSelectorJob *job;
73 } ThreadPoolIOUpdate_Add;
74
75 typedef struct {
76         gint fd;
77 } ThreadPoolIOUpdate_RemoveSocket;
78
79 typedef struct {
80         MonoDomain *domain;
81 } ThreadPoolIOUpdate_RemoveDomain;
82
83 typedef struct {
84         ThreadPoolIOUpdateType type;
85         union {
86                 ThreadPoolIOUpdate_Add add;
87                 ThreadPoolIOUpdate_RemoveSocket remove_socket;
88                 ThreadPoolIOUpdate_RemoveDomain remove_domain;
89         } data;
90 } ThreadPoolIOUpdate;
91
92 typedef struct {
93         ThreadPoolIOBackend backend;
94
95         ThreadPoolIOUpdate updates [UPDATES_CAPACITY];
96         gint updates_size;
97         MonoCoopMutex updates_lock;
98         MonoCoopCond updates_cond;
99
100 #if !defined(HOST_WIN32)
101         gint wakeup_pipes [2];
102 #else
103         SOCKET wakeup_pipes [2];
104 #endif
105 } ThreadPoolIO;
106
107 static mono_lazy_init_t io_status = MONO_LAZY_INIT_STATUS_NOT_INITIALIZED;
108
109 static gboolean io_selector_running = FALSE;
110
111 static ThreadPoolIO* threadpool_io;
112
113 static MonoIOSelectorJob*
114 get_job_for_event (MonoMList **list, gint32 event)
115 {
116         MonoMList *current;
117
118         g_assert (list);
119
120         for (current = *list; current; current = mono_mlist_next (current)) {
121                 MonoIOSelectorJob *job = (MonoIOSelectorJob*) mono_mlist_get_data (current);
122                 if (job->operation == event) {
123                         *list = mono_mlist_remove_item (*list, current);
124                         return job;
125                 }
126         }
127
128         return NULL;
129 }
130
131 static gint
132 get_operations_for_jobs (MonoMList *list)
133 {
134         MonoMList *current;
135         gint operations = 0;
136
137         for (current = list; current; current = mono_mlist_next (current))
138                 operations |= ((MonoIOSelectorJob*) mono_mlist_get_data (current))->operation;
139
140         return operations;
141 }
142
143 static void
144 selector_thread_wakeup (void)
145 {
146         gchar msg = 'c';
147         gint written;
148
149         for (;;) {
150 #if !defined(HOST_WIN32)
151                 written = write (threadpool_io->wakeup_pipes [1], &msg, 1);
152                 if (written == 1)
153                         break;
154                 if (written == -1) {
155                         g_warning ("selector_thread_wakeup: write () failed, error (%d) %s\n", errno, g_strerror (errno));
156                         break;
157                 }
158 #else
159                 written = send (threadpool_io->wakeup_pipes [1], &msg, 1, 0);
160                 if (written == 1)
161                         break;
162                 if (written == SOCKET_ERROR) {
163                         g_warning ("selector_thread_wakeup: write () failed, error (%d)\n", WSAGetLastError ());
164                         break;
165                 }
166 #endif
167         }
168 }
169
170 static void
171 selector_thread_wakeup_drain_pipes (void)
172 {
173         gchar buffer [128];
174         gint received;
175
176         for (;;) {
177 #if !defined(HOST_WIN32)
178                 received = read (threadpool_io->wakeup_pipes [0], buffer, sizeof (buffer));
179                 if (received == 0)
180                         break;
181                 if (received == -1) {
182                         if (errno != EINTR && errno != EAGAIN)
183                                 g_warning ("selector_thread_wakeup_drain_pipes: read () failed, error (%d) %s\n", errno, g_strerror (errno));
184                         break;
185                 }
186 #else
187                 received = recv (threadpool_io->wakeup_pipes [0], buffer, sizeof (buffer), 0);
188                 if (received == 0)
189                         break;
190                 if (received == SOCKET_ERROR) {
191                         if (WSAGetLastError () != WSAEINTR && WSAGetLastError () != WSAEWOULDBLOCK)
192                                 g_warning ("selector_thread_wakeup_drain_pipes: recv () failed, error (%d) %s\n", WSAGetLastError ());
193                         break;
194                 }
195 #endif
196         }
197 }
198
199 typedef struct {
200         MonoDomain *domain;
201         MonoGHashTable *states;
202 } FilterSockaresForDomainData;
203
204 static void
205 filter_jobs_for_domain (gpointer key, gpointer value, gpointer user_data)
206 {
207         FilterSockaresForDomainData *data;
208         MonoMList *list = (MonoMList *)value, *element;
209         MonoDomain *domain;
210         MonoGHashTable *states;
211
212         g_assert (user_data);
213         data = (FilterSockaresForDomainData *)user_data;
214         domain = data->domain;
215         states = data->states;
216
217         for (element = list; element; element = mono_mlist_next (element)) {
218                 MonoIOSelectorJob *job = (MonoIOSelectorJob*) mono_mlist_get_data (element);
219                 if (mono_object_domain (job) == domain)
220                         mono_mlist_set_data (element, NULL);
221         }
222
223         /* we skip all the first elements which are NULL */
224         for (; list; list = mono_mlist_next (list)) {
225                 if (mono_mlist_get_data (list))
226                         break;
227         }
228
229         if (list) {
230                 g_assert (mono_mlist_get_data (list));
231
232                 /* we delete all the NULL elements after the first one */
233                 for (element = list; element;) {
234                         MonoMList *next;
235                         if (!(next = mono_mlist_next (element)))
236                                 break;
237                         if (mono_mlist_get_data (next))
238                                 element = next;
239                         else
240                                 mono_mlist_set_next (element, mono_mlist_next (next));
241                 }
242         }
243
244         mono_g_hash_table_replace (states, key, list);
245 }
246
247 static void
248 wait_callback (gint fd, gint events, gpointer user_data)
249 {
250         MonoError error;
251
252         if (mono_runtime_is_shutting_down ())
253                 return;
254
255         if (fd == threadpool_io->wakeup_pipes [0]) {
256                 mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: wke");
257                 selector_thread_wakeup_drain_pipes ();
258         } else {
259                 MonoGHashTable *states;
260                 MonoMList *list = NULL;
261                 gpointer k;
262                 gboolean remove_fd = FALSE;
263                 gint operations;
264
265                 g_assert (user_data);
266                 states = (MonoGHashTable *)user_data;
267
268                 mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: cal fd %3d, events = %2s | %2s | %3s",
269                         fd, (events & EVENT_IN) ? "RD" : "..", (events & EVENT_OUT) ? "WR" : "..", (events & EVENT_ERR) ? "ERR" : "...");
270
271                 if (!mono_g_hash_table_lookup_extended (states, GINT_TO_POINTER (fd), &k, (gpointer*) &list))
272                         g_error ("wait_callback: fd %d not found in states table", fd);
273
274                 if (list && (events & EVENT_IN) != 0) {
275                         MonoIOSelectorJob *job = get_job_for_event (&list, EVENT_IN);
276                         if (job) {
277                                 mono_threadpool_enqueue_work_item (((MonoObject*) job)->vtable->domain, (MonoObject*) job, &error);
278                                 mono_error_assert_ok (&error);
279                         }
280
281                 }
282                 if (list && (events & EVENT_OUT) != 0) {
283                         MonoIOSelectorJob *job = get_job_for_event (&list, EVENT_OUT);
284                         if (job) {
285                                 mono_threadpool_enqueue_work_item (((MonoObject*) job)->vtable->domain, (MonoObject*) job, &error);
286                                 mono_error_assert_ok (&error);
287                         }
288                 }
289
290                 remove_fd = (events & EVENT_ERR) == EVENT_ERR;
291                 if (!remove_fd) {
292                         mono_g_hash_table_replace (states, GINT_TO_POINTER (fd), list);
293
294                         operations = get_operations_for_jobs (list);
295
296                         mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: res fd %3d, events = %2s | %2s | %3s",
297                                 fd, (operations & EVENT_IN) ? "RD" : "..", (operations & EVENT_OUT) ? "WR" : "..", (operations & EVENT_ERR) ? "ERR" : "...");
298
299                         threadpool_io->backend.register_fd (fd, operations, FALSE);
300                 } else {
301                         mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: err fd %d", fd);
302
303                         mono_g_hash_table_remove (states, GINT_TO_POINTER (fd));
304
305                         threadpool_io->backend.remove_fd (fd);
306                 }
307         }
308 }
309
310 static void
311 selector_thread_interrupt (gpointer unused)
312 {
313         selector_thread_wakeup ();
314 }
315
316 static gsize WINAPI
317 selector_thread (gpointer data)
318 {
319         MonoError error;
320         MonoGHashTable *states;
321
322         if (mono_runtime_is_shutting_down ()) {
323                 io_selector_running = FALSE;
324                 return 0;
325         }
326
327         states = mono_g_hash_table_new_type (g_direct_hash, NULL, MONO_HASH_VALUE_GC, MONO_ROOT_SOURCE_THREAD_POOL, "i/o thread pool states table");
328
329         while (!mono_runtime_is_shutting_down ()) {
330                 gint i, j;
331                 gint res;
332                 gboolean interrupted = FALSE;
333
334                 if (mono_thread_interruption_checkpoint ())
335                         continue;
336
337                 mono_coop_mutex_lock (&threadpool_io->updates_lock);
338
339                 for (i = 0; i < threadpool_io->updates_size; ++i) {
340                         ThreadPoolIOUpdate *update = &threadpool_io->updates [i];
341
342                         switch (update->type) {
343                         case UPDATE_EMPTY:
344                                 break;
345                         case UPDATE_ADD: {
346                                 gint fd;
347                                 gint operations;
348                                 gpointer k;
349                                 gboolean exists;
350                                 MonoMList *list = NULL;
351                                 MonoIOSelectorJob *job;
352
353                                 fd = update->data.add.fd;
354                                 g_assert (fd >= 0);
355
356                                 job = update->data.add.job;
357                                 g_assert (job);
358
359                                 exists = mono_g_hash_table_lookup_extended (states, GINT_TO_POINTER (fd), &k, (gpointer*) &list);
360                                 list = mono_mlist_append_checked (list, (MonoObject*) job, &error);
361                                 mono_error_assert_ok (&error);
362                                 mono_g_hash_table_replace (states, GINT_TO_POINTER (fd), list);
363
364                                 operations = get_operations_for_jobs (list);
365
366                                 mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: %3s fd %3d, operations = %2s | %2s | %3s",
367                                         exists ? "mod" : "add", fd, (operations & EVENT_IN) ? "RD" : "..", (operations & EVENT_OUT) ? "WR" : "..", (operations & EVENT_ERR) ? "ERR" : "...");
368
369                                 threadpool_io->backend.register_fd (fd, operations, !exists);
370
371                                 break;
372                         }
373                         case UPDATE_REMOVE_SOCKET: {
374                                 gint fd;
375                                 gpointer k;
376                                 MonoMList *list = NULL;
377
378                                 fd = update->data.remove_socket.fd;
379                                 g_assert (fd >= 0);
380
381                                 if (mono_g_hash_table_lookup_extended (states, GINT_TO_POINTER (fd), &k, (gpointer*) &list)) {
382                                         mono_g_hash_table_remove (states, GINT_TO_POINTER (fd));
383
384                                         for (j = i + 1; j < threadpool_io->updates_size; ++j) {
385                                                 ThreadPoolIOUpdate *update = &threadpool_io->updates [j];
386                                                 if (update->type == UPDATE_ADD && update->data.add.fd == fd)
387                                                         memset (update, 0, sizeof (ThreadPoolIOUpdate));
388                                         }
389
390                                         for (; list; list = mono_mlist_remove_item (list, list)) {
391                                                 mono_threadpool_enqueue_work_item (mono_object_domain (mono_mlist_get_data (list)), mono_mlist_get_data (list), &error);
392                                                 mono_error_assert_ok (&error);
393                                         }
394
395                                         mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: del fd %3d", fd);
396                                         threadpool_io->backend.remove_fd (fd);
397                                 }
398
399                                 break;
400                         }
401                         case UPDATE_REMOVE_DOMAIN: {
402                                 MonoDomain *domain;
403
404                                 domain = update->data.remove_domain.domain;
405                                 g_assert (domain);
406
407                                 FilterSockaresForDomainData user_data = { .domain = domain, .states = states };
408                                 mono_g_hash_table_foreach (states, filter_jobs_for_domain, &user_data);
409
410                                 for (j = i + 1; j < threadpool_io->updates_size; ++j) {
411                                         ThreadPoolIOUpdate *update = &threadpool_io->updates [j];
412                                         if (update->type == UPDATE_ADD && mono_object_domain (update->data.add.job) == domain)
413                                                 memset (update, 0, sizeof (ThreadPoolIOUpdate));
414                                 }
415
416                                 break;
417                         }
418                         default:
419                                 g_assert_not_reached ();
420                         }
421                 }
422
423                 mono_coop_cond_broadcast (&threadpool_io->updates_cond);
424
425                 if (threadpool_io->updates_size > 0) {
426                         threadpool_io->updates_size = 0;
427                         memset (&threadpool_io->updates, 0, UPDATES_CAPACITY * sizeof (ThreadPoolIOUpdate));
428                 }
429
430                 mono_coop_mutex_unlock (&threadpool_io->updates_lock);
431
432                 mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: wai");
433
434                 mono_thread_info_install_interrupt (selector_thread_interrupt, NULL, &interrupted);
435                 if (interrupted)
436                         continue;
437
438                 res = threadpool_io->backend.event_wait (wait_callback, states);
439                 if (res == -1)
440                         break;
441
442                 mono_thread_info_uninstall_interrupt (&interrupted);
443         }
444
445         mono_g_hash_table_destroy (states);
446
447         mono_coop_mutex_lock (&threadpool_io->updates_lock);
448
449         io_selector_running = FALSE;
450         mono_coop_cond_broadcast (&threadpool_io->updates_cond);
451
452         mono_coop_mutex_unlock (&threadpool_io->updates_lock);
453
454         return 0;
455 }
456
457 /* Locking: threadpool_io->updates_lock must be held */
458 static ThreadPoolIOUpdate*
459 update_get_new (void)
460 {
461         ThreadPoolIOUpdate *update = NULL;
462         g_assert (threadpool_io->updates_size <= UPDATES_CAPACITY);
463
464         while (threadpool_io->updates_size == UPDATES_CAPACITY) {
465                 /* we wait for updates to be applied in the selector_thread and we loop
466                  * as long as none are available. if it happends too much, then we need
467                  * to increase UPDATES_CAPACITY */
468                 mono_coop_cond_wait (&threadpool_io->updates_cond, &threadpool_io->updates_lock);
469         }
470
471         g_assert (threadpool_io->updates_size < UPDATES_CAPACITY);
472
473         update = &threadpool_io->updates [threadpool_io->updates_size ++];
474
475         return update;
476 }
477
478 static void
479 wakeup_pipes_init (void)
480 {
481 #if !defined(HOST_WIN32)
482         if (pipe (threadpool_io->wakeup_pipes) == -1)
483                 g_error ("wakeup_pipes_init: pipe () failed, error (%d) %s\n", errno, g_strerror (errno));
484         if (fcntl (threadpool_io->wakeup_pipes [0], F_SETFL, O_NONBLOCK) == -1)
485                 g_error ("wakeup_pipes_init: fcntl () failed, error (%d) %s\n", errno, g_strerror (errno));
486 #else
487         struct sockaddr_in client;
488         struct sockaddr_in server;
489         SOCKET server_sock;
490         gulong arg;
491         gint size;
492
493         server_sock = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
494         g_assert (server_sock != INVALID_SOCKET);
495         threadpool_io->wakeup_pipes [1] = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
496         g_assert (threadpool_io->wakeup_pipes [1] != INVALID_SOCKET);
497
498         server.sin_family = AF_INET;
499         server.sin_addr.s_addr = inet_addr ("127.0.0.1");
500         server.sin_port = 0;
501         if (bind (server_sock, (SOCKADDR*) &server, sizeof (server)) == SOCKET_ERROR) {
502                 closesocket (server_sock);
503                 g_error ("wakeup_pipes_init: bind () failed, error (%d)\n", WSAGetLastError ());
504         }
505
506         size = sizeof (server);
507         if (getsockname (server_sock, (SOCKADDR*) &server, &size) == SOCKET_ERROR) {
508                 closesocket (server_sock);
509                 g_error ("wakeup_pipes_init: getsockname () failed, error (%d)\n", WSAGetLastError ());
510         }
511         if (listen (server_sock, 1024) == SOCKET_ERROR) {
512                 closesocket (server_sock);
513                 g_error ("wakeup_pipes_init: listen () failed, error (%d)\n", WSAGetLastError ());
514         }
515         if (connect ((SOCKET) threadpool_io->wakeup_pipes [1], (SOCKADDR*) &server, sizeof (server)) == SOCKET_ERROR) {
516                 closesocket (server_sock);
517                 g_error ("wakeup_pipes_init: connect () failed, error (%d)\n", WSAGetLastError ());
518         }
519
520         size = sizeof (client);
521         threadpool_io->wakeup_pipes [0] = accept (server_sock, (SOCKADDR *) &client, &size);
522         g_assert (threadpool_io->wakeup_pipes [0] != INVALID_SOCKET);
523
524         arg = 1;
525         if (ioctlsocket (threadpool_io->wakeup_pipes [0], FIONBIO, &arg) == SOCKET_ERROR) {
526                 closesocket (threadpool_io->wakeup_pipes [0]);
527                 closesocket (server_sock);
528                 g_error ("wakeup_pipes_init: ioctlsocket () failed, error (%d)\n", WSAGetLastError ());
529         }
530
531         closesocket (server_sock);
532 #endif
533 }
534
535 static void
536 initialize (void)
537 {
538         g_assert (!threadpool_io);
539         threadpool_io = g_new0 (ThreadPoolIO, 1);
540         g_assert (threadpool_io);
541
542         mono_coop_mutex_init (&threadpool_io->updates_lock);
543         mono_coop_cond_init (&threadpool_io->updates_cond);
544         mono_gc_register_root ((char *)&threadpool_io->updates [0], sizeof (threadpool_io->updates), MONO_GC_DESCRIPTOR_NULL, MONO_ROOT_SOURCE_THREAD_POOL, "i/o thread pool updates list");
545
546         threadpool_io->updates_size = 0;
547
548         threadpool_io->backend = backend_poll;
549         if (g_hasenv ("MONO_ENABLE_AIO")) {
550 #if defined(HAVE_EPOLL)
551                 threadpool_io->backend = backend_epoll;
552 #elif defined(HAVE_KQUEUE)
553                 threadpool_io->backend = backend_kqueue;
554 #endif
555         }
556
557         wakeup_pipes_init ();
558
559         if (!threadpool_io->backend.init (threadpool_io->wakeup_pipes [0]))
560                 g_error ("initialize: backend->init () failed");
561
562         mono_coop_mutex_lock (&threadpool_io->updates_lock);
563
564         io_selector_running = TRUE;
565
566         MonoError error;
567         if (!mono_thread_create_internal (mono_get_root_domain (), selector_thread, NULL, MONO_THREAD_CREATE_FLAGS_THREADPOOL | MONO_THREAD_CREATE_FLAGS_SMALL_STACK, &error))
568                 g_error ("initialize: mono_thread_create_internal () failed due to %s", mono_error_get_message (&error));
569
570         mono_coop_mutex_unlock (&threadpool_io->updates_lock);
571 }
572
573 static void
574 cleanup (void)
575 {
576         // FIXME destroy everything
577 }
578
579 void
580 mono_threadpool_io_cleanup (void)
581 {
582         mono_lazy_cleanup (&io_status, cleanup);
583 }
584
585 void
586 ves_icall_System_IOSelector_Add (gpointer handle, MonoIOSelectorJob *job)
587 {
588         ThreadPoolIOUpdate *update;
589
590         g_assert (handle);
591
592         g_assert ((job->operation == EVENT_IN) ^ (job->operation == EVENT_OUT));
593         g_assert (job->callback);
594
595         if (mono_runtime_is_shutting_down ())
596                 return;
597         if (mono_domain_is_unloading (mono_object_domain (job)))
598                 return;
599
600         mono_lazy_initialize (&io_status, initialize);
601
602         mono_coop_mutex_lock (&threadpool_io->updates_lock);
603
604         if (!io_selector_running) {
605                 mono_coop_mutex_unlock (&threadpool_io->updates_lock);
606                 return;
607         }
608
609         update = update_get_new ();
610         update->type = UPDATE_ADD;
611         update->data.add.fd = GPOINTER_TO_INT (handle);
612         update->data.add.job = job;
613         mono_memory_barrier (); /* Ensure this is safely published before we wake up the selector */
614
615         selector_thread_wakeup ();
616
617         mono_coop_mutex_unlock (&threadpool_io->updates_lock);
618 }
619
620 void
621 ves_icall_System_IOSelector_Remove (gpointer handle)
622 {
623         mono_threadpool_io_remove_socket (GPOINTER_TO_INT (handle));
624 }
625
626 void
627 mono_threadpool_io_remove_socket (int fd)
628 {
629         ThreadPoolIOUpdate *update;
630
631         if (!mono_lazy_is_initialized (&io_status))
632                 return;
633
634         mono_coop_mutex_lock (&threadpool_io->updates_lock);
635
636         if (!io_selector_running) {
637                 mono_coop_mutex_unlock (&threadpool_io->updates_lock);
638                 return;
639         }
640
641         update = update_get_new ();
642         update->type = UPDATE_REMOVE_SOCKET;
643         update->data.add.fd = fd;
644         mono_memory_barrier (); /* Ensure this is safely published before we wake up the selector */
645
646         selector_thread_wakeup ();
647
648         mono_coop_cond_wait (&threadpool_io->updates_cond, &threadpool_io->updates_lock);
649
650         mono_coop_mutex_unlock (&threadpool_io->updates_lock);
651 }
652
653 void
654 mono_threadpool_io_remove_domain_jobs (MonoDomain *domain)
655 {
656         ThreadPoolIOUpdate *update;
657
658         if (!mono_lazy_is_initialized (&io_status))
659                 return;
660
661         mono_coop_mutex_lock (&threadpool_io->updates_lock);
662
663         if (!io_selector_running) {
664                 mono_coop_mutex_unlock (&threadpool_io->updates_lock);
665                 return;
666         }
667
668         update = update_get_new ();
669         update->type = UPDATE_REMOVE_DOMAIN;
670         update->data.remove_domain.domain = domain;
671         mono_memory_barrier (); /* Ensure this is safely published before we wake up the selector */
672
673         selector_thread_wakeup ();
674
675         mono_coop_cond_wait (&threadpool_io->updates_cond, &threadpool_io->updates_lock);
676
677         mono_coop_mutex_unlock (&threadpool_io->updates_lock);
678 }
679
680 #else
681
682 void
683 ves_icall_System_IOSelector_Add (gpointer handle, MonoIOSelectorJob *job)
684 {
685         g_assert_not_reached ();
686 }
687
688 void
689 ves_icall_System_IOSelector_Remove (gpointer handle)
690 {
691         g_assert_not_reached ();
692 }
693
694 void
695 mono_threadpool_io_cleanup (void)
696 {
697         g_assert_not_reached ();
698 }
699
700 void
701 mono_threadpool_io_remove_socket (int fd)
702 {
703         g_assert_not_reached ();
704 }
705
706 void
707 mono_threadpool_io_remove_domain_jobs (MonoDomain *domain)
708 {
709         g_assert_not_reached ();
710 }
711
712 #endif