Merge pull request #2338 from BogdanovKirill/httpwritefix3
[mono.git] / mono / metadata / threadpool-ms-io.c
1 /*
2  * threadpool-ms-io.c: Microsoft IO threadpool runtime support
3  *
4  * Author:
5  *      Ludovic Henry (ludovic.henry@xamarin.com)
6  *
7  * Copyright 2015 Xamarin, Inc (http://www.xamarin.com)
8  */
9
10 #include <config.h>
11
12 #ifndef DISABLE_SOCKETS
13
14 #include <glib.h>
15
16 #if defined(HOST_WIN32)
17 #include <windows.h>
18 #else
19 #include <errno.h>
20 #include <fcntl.h>
21 #endif
22
23 #include <mono/metadata/gc-internals.h>
24 #include <mono/metadata/mono-mlist.h>
25 #include <mono/metadata/threadpool-ms.h>
26 #include <mono/metadata/threadpool-ms-io.h>
27 #include <mono/utils/atomic.h>
28 #include <mono/utils/mono-threads.h>
29 #include <mono/utils/mono-lazy-init.h>
30 #include <mono/utils/mono-logger-internals.h>
31
32 typedef struct {
33         gboolean (*init) (gint wakeup_pipe_fd);
34         void     (*cleanup) (void);
35         void     (*register_fd) (gint fd, gint events, gboolean is_new);
36         void     (*remove_fd) (gint fd);
37         gint     (*event_wait) (void (*callback) (gint fd, gint events, gpointer user_data), gpointer user_data);
38 } ThreadPoolIOBackend;
39
40 /* Keep in sync with System.IOOperation in mcs/class/System/System/IOSelector.cs */
41 enum MonoIOOperation {
42         EVENT_IN   = 1 << 0,
43         EVENT_OUT  = 1 << 1,
44         EVENT_ERR  = 1 << 2, /* not in managed */
45 };
46
47 #include "threadpool-ms-io-epoll.c"
48 #include "threadpool-ms-io-kqueue.c"
49 #include "threadpool-ms-io-poll.c"
50
51 #define UPDATES_CAPACITY 128
52
53 /* Keep in sync with System.IOSelectorJob in mcs/class/System/System/IOSelector.cs */
54 struct _MonoIOSelectorJob {
55         MonoObject object;
56         gint32 operation;
57         MonoObject *callback;
58         MonoObject *state;
59 };
60
61 typedef enum {
62         UPDATE_EMPTY = 0,
63         UPDATE_ADD,
64         UPDATE_REMOVE_SOCKET,
65         UPDATE_REMOVE_DOMAIN,
66 } ThreadPoolIOUpdateType;
67
68 typedef struct {
69         gint fd;
70         MonoIOSelectorJob *job;
71 } ThreadPoolIOUpdate_Add;
72
73 typedef struct {
74         gint fd;
75 } ThreadPoolIOUpdate_RemoveSocket;
76
77 typedef struct {
78         MonoDomain *domain;
79 } ThreadPoolIOUpdate_RemoveDomain;
80
81 typedef struct {
82         ThreadPoolIOUpdateType type;
83         union {
84                 ThreadPoolIOUpdate_Add add;
85                 ThreadPoolIOUpdate_RemoveSocket remove_socket;
86                 ThreadPoolIOUpdate_RemoveDomain remove_domain;
87         } data;
88 } ThreadPoolIOUpdate;
89
90 typedef struct {
91         ThreadPoolIOBackend backend;
92
93         ThreadPoolIOUpdate updates [UPDATES_CAPACITY];
94         gint updates_size;
95         MonoCoopMutex updates_lock;
96         MonoCoopCond updates_cond;
97
98 #if !defined(HOST_WIN32)
99         gint wakeup_pipes [2];
100 #else
101         SOCKET wakeup_pipes [2];
102 #endif
103 } ThreadPoolIO;
104
105 static mono_lazy_init_t io_status = MONO_LAZY_INIT_STATUS_NOT_INITIALIZED;
106
107 static gboolean io_selector_running = FALSE;
108
109 static ThreadPoolIO* threadpool_io;
110
111 static MonoIOSelectorJob*
112 get_job_for_event (MonoMList **list, gint32 event)
113 {
114         MonoMList *current;
115
116         g_assert (list);
117
118         for (current = *list; current; current = mono_mlist_next (current)) {
119                 MonoIOSelectorJob *job = (MonoIOSelectorJob*) mono_mlist_get_data (current);
120                 if (job->operation == event) {
121                         *list = mono_mlist_remove_item (*list, current);
122                         return job;
123                 }
124         }
125
126         return NULL;
127 }
128
129 static gint
130 get_operations_for_jobs (MonoMList *list)
131 {
132         MonoMList *current;
133         gint operations = 0;
134
135         for (current = list; current; current = mono_mlist_next (current))
136                 operations |= ((MonoIOSelectorJob*) mono_mlist_get_data (current))->operation;
137
138         return operations;
139 }
140
141 static void
142 selector_thread_wakeup (void)
143 {
144         gchar msg = 'c';
145         gint written;
146
147         for (;;) {
148 #if !defined(HOST_WIN32)
149                 written = write (threadpool_io->wakeup_pipes [1], &msg, 1);
150                 if (written == 1)
151                         break;
152                 if (written == -1) {
153                         g_warning ("selector_thread_wakeup: write () failed, error (%d) %s\n", errno, g_strerror (errno));
154                         break;
155                 }
156 #else
157                 written = send (threadpool_io->wakeup_pipes [1], &msg, 1, 0);
158                 if (written == 1)
159                         break;
160                 if (written == SOCKET_ERROR) {
161                         g_warning ("selector_thread_wakeup: write () failed, error (%d)\n", WSAGetLastError ());
162                         break;
163                 }
164 #endif
165         }
166 }
167
168 static void
169 selector_thread_wakeup_drain_pipes (void)
170 {
171         gchar buffer [128];
172         gint received;
173
174         for (;;) {
175 #if !defined(HOST_WIN32)
176                 received = read (threadpool_io->wakeup_pipes [0], buffer, sizeof (buffer));
177                 if (received == 0)
178                         break;
179                 if (received == -1) {
180                         if (errno != EINTR && errno != EAGAIN)
181                                 g_warning ("selector_thread_wakeup_drain_pipes: read () failed, error (%d) %s\n", errno, g_strerror (errno));
182                         break;
183                 }
184 #else
185                 received = recv (threadpool_io->wakeup_pipes [0], buffer, sizeof (buffer), 0);
186                 if (received == 0)
187                         break;
188                 if (received == SOCKET_ERROR) {
189                         if (WSAGetLastError () != WSAEINTR && WSAGetLastError () != WSAEWOULDBLOCK)
190                                 g_warning ("selector_thread_wakeup_drain_pipes: recv () failed, error (%d) %s\n", WSAGetLastError ());
191                         break;
192                 }
193 #endif
194         }
195 }
196
197 typedef struct {
198         MonoDomain *domain;
199         MonoGHashTable *states;
200 } FilterSockaresForDomainData;
201
202 static void
203 filter_jobs_for_domain (gpointer key, gpointer value, gpointer user_data)
204 {
205         FilterSockaresForDomainData *data;
206         MonoMList *list = (MonoMList *)value, *element;
207         MonoDomain *domain;
208         MonoGHashTable *states;
209
210         g_assert (user_data);
211         data = (FilterSockaresForDomainData *)user_data;
212         domain = data->domain;
213         states = data->states;
214
215         for (element = list; element; element = mono_mlist_next (element)) {
216                 MonoIOSelectorJob *job = (MonoIOSelectorJob*) mono_mlist_get_data (element);
217                 if (mono_object_domain (job) == domain)
218                         mono_mlist_set_data (element, NULL);
219         }
220
221         /* we skip all the first elements which are NULL */
222         for (; list; list = mono_mlist_next (list)) {
223                 if (mono_mlist_get_data (list))
224                         break;
225         }
226
227         if (list) {
228                 g_assert (mono_mlist_get_data (list));
229
230                 /* we delete all the NULL elements after the first one */
231                 for (element = list; element;) {
232                         MonoMList *next;
233                         if (!(next = mono_mlist_next (element)))
234                                 break;
235                         if (mono_mlist_get_data (next))
236                                 element = next;
237                         else
238                                 mono_mlist_set_next (element, mono_mlist_next (next));
239                 }
240         }
241
242         mono_g_hash_table_replace (states, key, list);
243 }
244
245 static void
246 wait_callback (gint fd, gint events, gpointer user_data)
247 {
248         if (mono_runtime_is_shutting_down ())
249                 return;
250
251         if (fd == threadpool_io->wakeup_pipes [0]) {
252                 mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: wke");
253                 selector_thread_wakeup_drain_pipes ();
254         } else {
255                 MonoGHashTable *states;
256                 MonoMList *list = NULL;
257                 gpointer k;
258                 gboolean remove_fd = FALSE;
259                 gint operations;
260
261                 g_assert (user_data);
262                 states = (MonoGHashTable *)user_data;
263
264                 mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: cal fd %3d, events = %2s | %2s | %3s",
265                         fd, (events & EVENT_IN) ? "RD" : "..", (events & EVENT_OUT) ? "WR" : "..", (events & EVENT_ERR) ? "ERR" : "...");
266
267                 if (!mono_g_hash_table_lookup_extended (states, GINT_TO_POINTER (fd), &k, (gpointer*) &list))
268                         g_error ("wait_callback: fd %d not found in states table", fd);
269
270                 if (list && (events & EVENT_IN) != 0) {
271                         MonoIOSelectorJob *job = get_job_for_event (&list, EVENT_IN);
272                         if (job)
273                                 mono_threadpool_ms_enqueue_work_item (((MonoObject*) job)->vtable->domain, (MonoObject*) job);
274                 }
275                 if (list && (events & EVENT_OUT) != 0) {
276                         MonoIOSelectorJob *job = get_job_for_event (&list, EVENT_OUT);
277                         if (job)
278                                 mono_threadpool_ms_enqueue_work_item (((MonoObject*) job)->vtable->domain, (MonoObject*) job);
279                 }
280
281                 remove_fd = (events & EVENT_ERR) == EVENT_ERR;
282                 if (!remove_fd) {
283                         mono_g_hash_table_replace (states, GINT_TO_POINTER (fd), list);
284
285                         operations = get_operations_for_jobs (list);
286
287                         mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: res fd %3d, events = %2s | %2s | %3s",
288                                 fd, (operations & EVENT_IN) ? "RD" : "..", (operations & EVENT_OUT) ? "WR" : "..", (operations & EVENT_ERR) ? "ERR" : "...");
289
290                         threadpool_io->backend.register_fd (fd, operations, FALSE);
291                 } else {
292                         mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: err fd %d", fd);
293
294                         mono_g_hash_table_remove (states, GINT_TO_POINTER (fd));
295
296                         threadpool_io->backend.remove_fd (fd);
297                 }
298         }
299 }
300
301 static void
302 selector_thread (gpointer data)
303 {
304         MonoGHashTable *states;
305
306         io_selector_running = TRUE;
307
308         if (mono_runtime_is_shutting_down ()) {
309                 io_selector_running = FALSE;
310                 return;
311         }
312
313         states = mono_g_hash_table_new_type (g_direct_hash, g_direct_equal, MONO_HASH_VALUE_GC, MONO_ROOT_SOURCE_THREAD_POOL, "i/o thread pool states table");
314
315         for (;;) {
316                 gint i, j;
317                 gint res;
318
319                 mono_coop_mutex_lock (&threadpool_io->updates_lock);
320
321                 for (i = 0; i < threadpool_io->updates_size; ++i) {
322                         ThreadPoolIOUpdate *update = &threadpool_io->updates [i];
323
324                         switch (update->type) {
325                         case UPDATE_EMPTY:
326                                 break;
327                         case UPDATE_ADD: {
328                                 gint fd;
329                                 gint operations;
330                                 gpointer k;
331                                 gboolean exists;
332                                 MonoMList *list = NULL;
333                                 MonoIOSelectorJob *job;
334
335                                 fd = update->data.add.fd;
336                                 g_assert (fd >= 0);
337
338                                 job = update->data.add.job;
339                                 g_assert (job);
340
341                                 exists = mono_g_hash_table_lookup_extended (states, GINT_TO_POINTER (fd), &k, (gpointer*) &list);
342                                 list = mono_mlist_append (list, (MonoObject*) job);
343                                 mono_g_hash_table_replace (states, GINT_TO_POINTER (fd), list);
344
345                                 operations = get_operations_for_jobs (list);
346
347                                 mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: %3s fd %3d, operations = %2s | %2s | %3s",
348                                         exists ? "mod" : "add", fd, (operations & EVENT_IN) ? "RD" : "..", (operations & EVENT_OUT) ? "WR" : "..", (operations & EVENT_ERR) ? "ERR" : "...");
349
350                                 threadpool_io->backend.register_fd (fd, operations, !exists);
351
352                                 break;
353                         }
354                         case UPDATE_REMOVE_SOCKET: {
355                                 gint fd;
356                                 gpointer k;
357                                 MonoMList *list = NULL;
358
359                                 fd = update->data.remove_socket.fd;
360                                 g_assert (fd >= 0);
361
362                                 if (mono_g_hash_table_lookup_extended (states, GINT_TO_POINTER (fd), &k, (gpointer*) &list)) {
363                                         mono_g_hash_table_remove (states, GINT_TO_POINTER (fd));
364
365                                         for (j = i + 1; j < threadpool_io->updates_size; ++j) {
366                                                 ThreadPoolIOUpdate *update = &threadpool_io->updates [j];
367                                                 if (update->type == UPDATE_ADD && update->data.add.fd == fd)
368                                                         memset (update, 0, sizeof (ThreadPoolIOUpdate));
369                                         }
370
371                                         for (; list; list = mono_mlist_remove_item (list, list))
372                                                 mono_threadpool_ms_enqueue_work_item (mono_object_domain (mono_mlist_get_data (list)), mono_mlist_get_data (list));
373
374                                         mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: del fd %3d", fd);
375                                         threadpool_io->backend.remove_fd (fd);
376                                 }
377
378                                 break;
379                         }
380                         case UPDATE_REMOVE_DOMAIN: {
381                                 MonoDomain *domain;
382
383                                 domain = update->data.remove_domain.domain;
384                                 g_assert (domain);
385
386                                 FilterSockaresForDomainData user_data = { .domain = domain, .states = states };
387                                 mono_g_hash_table_foreach (states, filter_jobs_for_domain, &user_data);
388
389                                 for (j = i + 1; j < threadpool_io->updates_size; ++j) {
390                                         ThreadPoolIOUpdate *update = &threadpool_io->updates [j];
391                                         if (update->type == UPDATE_ADD && mono_object_domain (update->data.add.job) == domain)
392                                                 memset (update, 0, sizeof (ThreadPoolIOUpdate));
393                                 }
394
395                                 break;
396                         }
397                         default:
398                                 g_assert_not_reached ();
399                         }
400                 }
401
402                 mono_coop_cond_broadcast (&threadpool_io->updates_cond);
403
404                 if (threadpool_io->updates_size > 0) {
405                         threadpool_io->updates_size = 0;
406                         memset (&threadpool_io->updates, 0, UPDATES_CAPACITY * sizeof (ThreadPoolIOUpdate));
407                 }
408
409                 mono_coop_mutex_unlock (&threadpool_io->updates_lock);
410
411                 mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: wai");
412
413                 res = threadpool_io->backend.event_wait (wait_callback, states);
414
415                 if (res == -1 || mono_runtime_is_shutting_down ())
416                         break;
417         }
418
419         mono_g_hash_table_destroy (states);
420
421         io_selector_running = FALSE;
422 }
423
424 /* Locking: threadpool_io->updates_lock must be held */
425 static ThreadPoolIOUpdate*
426 update_get_new (void)
427 {
428         ThreadPoolIOUpdate *update = NULL;
429         g_assert (threadpool_io->updates_size <= UPDATES_CAPACITY);
430
431         while (threadpool_io->updates_size == UPDATES_CAPACITY) {
432                 /* we wait for updates to be applied in the selector_thread and we loop
433                  * as long as none are available. if it happends too much, then we need
434                  * to increase UPDATES_CAPACITY */
435                 mono_coop_cond_wait (&threadpool_io->updates_cond, &threadpool_io->updates_lock);
436         }
437
438         g_assert (threadpool_io->updates_size < UPDATES_CAPACITY);
439
440         update = &threadpool_io->updates [threadpool_io->updates_size ++];
441
442         return update;
443 }
444
445 static void
446 wakeup_pipes_init (void)
447 {
448 #if !defined(HOST_WIN32)
449         if (pipe (threadpool_io->wakeup_pipes) == -1)
450                 g_error ("wakeup_pipes_init: pipe () failed, error (%d) %s\n", errno, g_strerror (errno));
451         if (fcntl (threadpool_io->wakeup_pipes [0], F_SETFL, O_NONBLOCK) == -1)
452                 g_error ("wakeup_pipes_init: fcntl () failed, error (%d) %s\n", errno, g_strerror (errno));
453 #else
454         struct sockaddr_in client;
455         struct sockaddr_in server;
456         SOCKET server_sock;
457         gulong arg;
458         gint size;
459
460         server_sock = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
461         g_assert (server_sock != INVALID_SOCKET);
462         threadpool_io->wakeup_pipes [1] = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
463         g_assert (threadpool_io->wakeup_pipes [1] != INVALID_SOCKET);
464
465         server.sin_family = AF_INET;
466         server.sin_addr.s_addr = inet_addr ("127.0.0.1");
467         server.sin_port = 0;
468         if (bind (server_sock, (SOCKADDR*) &server, sizeof (server)) == SOCKET_ERROR) {
469                 closesocket (server_sock);
470                 g_error ("wakeup_pipes_init: bind () failed, error (%d)\n", WSAGetLastError ());
471         }
472
473         size = sizeof (server);
474         if (getsockname (server_sock, (SOCKADDR*) &server, &size) == SOCKET_ERROR) {
475                 closesocket (server_sock);
476                 g_error ("wakeup_pipes_init: getsockname () failed, error (%d)\n", WSAGetLastError ());
477         }
478         if (listen (server_sock, 1024) == SOCKET_ERROR) {
479                 closesocket (server_sock);
480                 g_error ("wakeup_pipes_init: listen () failed, error (%d)\n", WSAGetLastError ());
481         }
482         if (connect ((SOCKET) threadpool_io->wakeup_pipes [1], (SOCKADDR*) &server, sizeof (server)) == SOCKET_ERROR) {
483                 closesocket (server_sock);
484                 g_error ("wakeup_pipes_init: connect () failed, error (%d)\n", WSAGetLastError ());
485         }
486
487         size = sizeof (client);
488         threadpool_io->wakeup_pipes [0] = accept (server_sock, (SOCKADDR *) &client, &size);
489         g_assert (threadpool_io->wakeup_pipes [0] != INVALID_SOCKET);
490
491         arg = 1;
492         if (ioctlsocket (threadpool_io->wakeup_pipes [0], FIONBIO, &arg) == SOCKET_ERROR) {
493                 closesocket (threadpool_io->wakeup_pipes [0]);
494                 closesocket (server_sock);
495                 g_error ("wakeup_pipes_init: ioctlsocket () failed, error (%d)\n", WSAGetLastError ());
496         }
497
498         closesocket (server_sock);
499 #endif
500 }
501
502 static void
503 initialize (void)
504 {
505         g_assert (!threadpool_io);
506         threadpool_io = g_new0 (ThreadPoolIO, 1);
507         g_assert (threadpool_io);
508
509         mono_coop_mutex_init (&threadpool_io->updates_lock);
510         mono_coop_cond_init (&threadpool_io->updates_cond);
511         mono_gc_register_root ((char *)&threadpool_io->updates [0], sizeof (threadpool_io->updates), MONO_GC_DESCRIPTOR_NULL, MONO_ROOT_SOURCE_THREAD_POOL, "i/o thread pool updates list");
512
513         threadpool_io->updates_size = 0;
514
515         threadpool_io->backend = backend_poll;
516         if (g_getenv ("MONO_ENABLE_AIO") != NULL) {
517 #if defined(HAVE_EPOLL)
518                 threadpool_io->backend = backend_epoll;
519 #elif defined(HAVE_KQUEUE)
520                 threadpool_io->backend = backend_kqueue;
521 #endif
522         }
523
524         wakeup_pipes_init ();
525
526         if (!threadpool_io->backend.init (threadpool_io->wakeup_pipes [0]))
527                 g_error ("initialize: backend->init () failed");
528
529         if (!mono_thread_create_internal (mono_get_root_domain (), selector_thread, NULL, TRUE, SMALL_STACK))
530                 g_error ("initialize: mono_thread_create_internal () failed");
531 }
532
533 static void
534 cleanup (void)
535 {
536         /* we make the assumption along the code that we are
537          * cleaning up only if the runtime is shutting down */
538         g_assert (mono_runtime_is_shutting_down ());
539
540         selector_thread_wakeup ();
541         while (io_selector_running)
542                 g_usleep (1000);
543
544         mono_coop_mutex_destroy (&threadpool_io->updates_lock);
545         mono_coop_cond_destroy (&threadpool_io->updates_cond);
546
547         threadpool_io->backend.cleanup ();
548
549 #if !defined(HOST_WIN32)
550         close (threadpool_io->wakeup_pipes [0]);
551         close (threadpool_io->wakeup_pipes [1]);
552 #else
553         closesocket (threadpool_io->wakeup_pipes [0]);
554         closesocket (threadpool_io->wakeup_pipes [1]);
555 #endif
556
557         g_assert (threadpool_io);
558         g_free (threadpool_io);
559         threadpool_io = NULL;
560         g_assert (!threadpool_io);
561 }
562
563 void
564 mono_threadpool_ms_io_cleanup (void)
565 {
566         mono_lazy_cleanup (&io_status, cleanup);
567 }
568
569 void
570 ves_icall_System_IOSelector_Add (gpointer handle, MonoIOSelectorJob *job)
571 {
572         ThreadPoolIOUpdate *update;
573
574         g_assert (handle >= 0);
575
576         g_assert (job->operation == EVENT_IN ^ job->operation == EVENT_OUT);
577         g_assert (job->callback);
578
579         if (mono_runtime_is_shutting_down ())
580                 return;
581         if (mono_domain_is_unloading (mono_object_domain (job)))
582                 return;
583
584         mono_lazy_initialize (&io_status, initialize);
585
586         mono_coop_mutex_lock (&threadpool_io->updates_lock);
587
588         update = update_get_new ();
589         update->type = UPDATE_ADD;
590         update->data.add.fd = GPOINTER_TO_INT (handle);
591         update->data.add.job = job;
592         mono_memory_barrier (); /* Ensure this is safely published before we wake up the selector */
593
594         selector_thread_wakeup ();
595
596         mono_coop_mutex_unlock (&threadpool_io->updates_lock);
597 }
598
599 void
600 ves_icall_System_IOSelector_Remove (gpointer handle)
601 {
602         mono_threadpool_ms_io_remove_socket (GPOINTER_TO_INT (handle));
603 }
604
605 void
606 mono_threadpool_ms_io_remove_socket (int fd)
607 {
608         ThreadPoolIOUpdate *update;
609
610         if (!mono_lazy_is_initialized (&io_status))
611                 return;
612
613         mono_coop_mutex_lock (&threadpool_io->updates_lock);
614
615         update = update_get_new ();
616         update->type = UPDATE_REMOVE_SOCKET;
617         update->data.add.fd = fd;
618         mono_memory_barrier (); /* Ensure this is safely published before we wake up the selector */
619
620         selector_thread_wakeup ();
621
622         mono_coop_cond_wait (&threadpool_io->updates_cond, &threadpool_io->updates_lock);
623
624         mono_coop_mutex_unlock (&threadpool_io->updates_lock);
625 }
626
627 void
628 mono_threadpool_ms_io_remove_domain_jobs (MonoDomain *domain)
629 {
630         ThreadPoolIOUpdate *update;
631
632         if (!mono_lazy_is_initialized (&io_status))
633                 return;
634
635         mono_coop_mutex_lock (&threadpool_io->updates_lock);
636
637         update = update_get_new ();
638         update->type = UPDATE_REMOVE_DOMAIN;
639         update->data.remove_domain.domain = domain;
640         mono_memory_barrier (); /* Ensure this is safely published before we wake up the selector */
641
642         selector_thread_wakeup ();
643
644         mono_coop_cond_wait (&threadpool_io->updates_cond, &threadpool_io->updates_lock);
645
646         mono_coop_mutex_unlock (&threadpool_io->updates_lock);
647 }
648
649 #else
650
651 void
652 ves_icall_System_IOSelector_Add (gpointer handle, MonoIOSelectorJob *job)
653 {
654         g_assert_not_reached ();
655 }
656
657 void
658 ves_icall_System_IOSelector_Remove (gpointer handle)
659 {
660         g_assert_not_reached ();
661 }
662
663 void
664 mono_threadpool_ms_io_cleanup (void)
665 {
666         g_assert_not_reached ();
667 }
668
669 void
670 mono_threadpool_ms_io_remove_socket (int fd)
671 {
672         g_assert_not_reached ();
673 }
674
675 void
676 mono_threadpool_ms_io_remove_domain_jobs (MonoDomain *domain)
677 {
678         g_assert_not_reached ();
679 }
680
681 #endif