Merge pull request #4381 from BrzVlad/feature-generational-hash
[mono.git] / mono / metadata / threadpool-io.c
1 /*
2  * threadpool-io.c: Microsoft IO threadpool runtime support
3  *
4  * Author:
5  *      Ludovic Henry (ludovic.henry@xamarin.com)
6  *
7  * Copyright 2015 Xamarin, Inc (http://www.xamarin.com)
8  * Licensed under the MIT license. See LICENSE file in the project root for full license information.
9  */
10
11 #include <config.h>
12
13 #ifndef DISABLE_SOCKETS
14
15 #include <glib.h>
16
17 #if defined(HOST_WIN32)
18 #include <windows.h>
19 #else
20 #include <errno.h>
21 #include <fcntl.h>
22 #endif
23
24 #include <mono/metadata/gc-internals.h>
25 #include <mono/metadata/mono-mlist.h>
26 #include <mono/metadata/threadpool.h>
27 #include <mono/metadata/threadpool-io.h>
28 #include <mono/utils/atomic.h>
29 #include <mono/utils/mono-threads.h>
30 #include <mono/utils/mono-lazy-init.h>
31 #include <mono/utils/mono-logger-internals.h>
32 #include <mono/utils/w32api.h>
33
34 typedef struct {
35         gboolean (*init) (gint wakeup_pipe_fd);
36         void     (*register_fd) (gint fd, gint events, gboolean is_new);
37         void     (*remove_fd) (gint fd);
38         gint     (*event_wait) (void (*callback) (gint fd, gint events, gpointer user_data), gpointer user_data);
39 } ThreadPoolIOBackend;
40
41 /* Keep in sync with System.IOOperation in mcs/class/System/System/IOSelector.cs */
42 enum MonoIOOperation {
43         EVENT_IN   = 1 << 0,
44         EVENT_OUT  = 1 << 1,
45         EVENT_ERR  = 1 << 2, /* not in managed */
46 };
47
48 #include "threadpool-io-epoll.c"
49 #include "threadpool-io-kqueue.c"
50 #include "threadpool-io-poll.c"
51
52 #define UPDATES_CAPACITY 128
53
54 /* Keep in sync with System.IOSelectorJob in mcs/class/System/System/IOSelector.cs */
55 struct _MonoIOSelectorJob {
56         MonoObject object;
57         gint32 operation;
58         MonoObject *callback;
59         MonoObject *state;
60 };
61
62 typedef enum {
63         UPDATE_EMPTY = 0,
64         UPDATE_ADD,
65         UPDATE_REMOVE_SOCKET,
66         UPDATE_REMOVE_DOMAIN,
67 } ThreadPoolIOUpdateType;
68
69 typedef struct {
70         gint fd;
71         MonoIOSelectorJob *job;
72 } ThreadPoolIOUpdate_Add;
73
74 typedef struct {
75         gint fd;
76 } ThreadPoolIOUpdate_RemoveSocket;
77
78 typedef struct {
79         MonoDomain *domain;
80 } ThreadPoolIOUpdate_RemoveDomain;
81
82 typedef struct {
83         ThreadPoolIOUpdateType type;
84         union {
85                 ThreadPoolIOUpdate_Add add;
86                 ThreadPoolIOUpdate_RemoveSocket remove_socket;
87                 ThreadPoolIOUpdate_RemoveDomain remove_domain;
88         } data;
89 } ThreadPoolIOUpdate;
90
91 typedef struct {
92         ThreadPoolIOBackend backend;
93
94         ThreadPoolIOUpdate updates [UPDATES_CAPACITY];
95         gint updates_size;
96         MonoCoopMutex updates_lock;
97         MonoCoopCond updates_cond;
98
99 #if !defined(HOST_WIN32)
100         gint wakeup_pipes [2];
101 #else
102         SOCKET wakeup_pipes [2];
103 #endif
104 } ThreadPoolIO;
105
106 static mono_lazy_init_t io_status = MONO_LAZY_INIT_STATUS_NOT_INITIALIZED;
107
108 static gboolean io_selector_running = FALSE;
109
110 static ThreadPoolIO* threadpool_io;
111
112 static MonoIOSelectorJob*
113 get_job_for_event (MonoMList **list, gint32 event)
114 {
115         MonoMList *current;
116
117         g_assert (list);
118
119         for (current = *list; current; current = mono_mlist_next (current)) {
120                 MonoIOSelectorJob *job = (MonoIOSelectorJob*) mono_mlist_get_data (current);
121                 if (job->operation == event) {
122                         *list = mono_mlist_remove_item (*list, current);
123                         return job;
124                 }
125         }
126
127         return NULL;
128 }
129
130 static gint
131 get_operations_for_jobs (MonoMList *list)
132 {
133         MonoMList *current;
134         gint operations = 0;
135
136         for (current = list; current; current = mono_mlist_next (current))
137                 operations |= ((MonoIOSelectorJob*) mono_mlist_get_data (current))->operation;
138
139         return operations;
140 }
141
142 static void
143 selector_thread_wakeup (void)
144 {
145         gchar msg = 'c';
146         gint written;
147
148         for (;;) {
149 #if !defined(HOST_WIN32)
150                 written = write (threadpool_io->wakeup_pipes [1], &msg, 1);
151                 if (written == 1)
152                         break;
153                 if (written == -1) {
154                         g_warning ("selector_thread_wakeup: write () failed, error (%d) %s\n", errno, g_strerror (errno));
155                         break;
156                 }
157 #else
158                 written = send (threadpool_io->wakeup_pipes [1], &msg, 1, 0);
159                 if (written == 1)
160                         break;
161                 if (written == SOCKET_ERROR) {
162                         g_warning ("selector_thread_wakeup: write () failed, error (%d)\n", WSAGetLastError ());
163                         break;
164                 }
165 #endif
166         }
167 }
168
169 static void
170 selector_thread_wakeup_drain_pipes (void)
171 {
172         gchar buffer [128];
173         gint received;
174
175         for (;;) {
176 #if !defined(HOST_WIN32)
177                 received = read (threadpool_io->wakeup_pipes [0], buffer, sizeof (buffer));
178                 if (received == 0)
179                         break;
180                 if (received == -1) {
181                         if (errno != EINTR && errno != EAGAIN)
182                                 g_warning ("selector_thread_wakeup_drain_pipes: read () failed, error (%d) %s\n", errno, g_strerror (errno));
183                         break;
184                 }
185 #else
186                 received = recv (threadpool_io->wakeup_pipes [0], buffer, sizeof (buffer), 0);
187                 if (received == 0)
188                         break;
189                 if (received == SOCKET_ERROR) {
190                         if (WSAGetLastError () != WSAEINTR && WSAGetLastError () != WSAEWOULDBLOCK)
191                                 g_warning ("selector_thread_wakeup_drain_pipes: recv () failed, error (%d) %s\n", WSAGetLastError ());
192                         break;
193                 }
194 #endif
195         }
196 }
197
198 typedef struct {
199         MonoDomain *domain;
200         MonoGHashTable *states;
201 } FilterSockaresForDomainData;
202
203 static void
204 filter_jobs_for_domain (gpointer key, gpointer value, gpointer user_data)
205 {
206         FilterSockaresForDomainData *data;
207         MonoMList *list = (MonoMList *)value, *element;
208         MonoDomain *domain;
209         MonoGHashTable *states;
210
211         g_assert (user_data);
212         data = (FilterSockaresForDomainData *)user_data;
213         domain = data->domain;
214         states = data->states;
215
216         for (element = list; element; element = mono_mlist_next (element)) {
217                 MonoIOSelectorJob *job = (MonoIOSelectorJob*) mono_mlist_get_data (element);
218                 if (mono_object_domain (job) == domain)
219                         mono_mlist_set_data (element, NULL);
220         }
221
222         /* we skip all the first elements which are NULL */
223         for (; list; list = mono_mlist_next (list)) {
224                 if (mono_mlist_get_data (list))
225                         break;
226         }
227
228         if (list) {
229                 g_assert (mono_mlist_get_data (list));
230
231                 /* we delete all the NULL elements after the first one */
232                 for (element = list; element;) {
233                         MonoMList *next;
234                         if (!(next = mono_mlist_next (element)))
235                                 break;
236                         if (mono_mlist_get_data (next))
237                                 element = next;
238                         else
239                                 mono_mlist_set_next (element, mono_mlist_next (next));
240                 }
241         }
242
243         mono_g_hash_table_replace (states, key, list);
244 }
245
246 static void
247 wait_callback (gint fd, gint events, gpointer user_data)
248 {
249         MonoError error;
250
251         if (mono_runtime_is_shutting_down ())
252                 return;
253
254         if (fd == threadpool_io->wakeup_pipes [0]) {
255                 mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: wke");
256                 selector_thread_wakeup_drain_pipes ();
257         } else {
258                 MonoGHashTable *states;
259                 MonoMList *list = NULL;
260                 gpointer k;
261                 gboolean remove_fd = FALSE;
262                 gint operations;
263
264                 g_assert (user_data);
265                 states = (MonoGHashTable *)user_data;
266
267                 mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: cal fd %3d, events = %2s | %2s | %3s",
268                         fd, (events & EVENT_IN) ? "RD" : "..", (events & EVENT_OUT) ? "WR" : "..", (events & EVENT_ERR) ? "ERR" : "...");
269
270                 if (!mono_g_hash_table_lookup_extended (states, GINT_TO_POINTER (fd), &k, (gpointer*) &list))
271                         g_error ("wait_callback: fd %d not found in states table", fd);
272
273                 if (list && (events & EVENT_IN) != 0) {
274                         MonoIOSelectorJob *job = get_job_for_event (&list, EVENT_IN);
275                         if (job) {
276                                 mono_threadpool_enqueue_work_item (((MonoObject*) job)->vtable->domain, (MonoObject*) job, &error);
277                                 mono_error_assert_ok (&error);
278                         }
279
280                 }
281                 if (list && (events & EVENT_OUT) != 0) {
282                         MonoIOSelectorJob *job = get_job_for_event (&list, EVENT_OUT);
283                         if (job) {
284                                 mono_threadpool_enqueue_work_item (((MonoObject*) job)->vtable->domain, (MonoObject*) job, &error);
285                                 mono_error_assert_ok (&error);
286                         }
287                 }
288
289                 remove_fd = (events & EVENT_ERR) == EVENT_ERR;
290                 if (!remove_fd) {
291                         mono_g_hash_table_replace (states, GINT_TO_POINTER (fd), list);
292
293                         operations = get_operations_for_jobs (list);
294
295                         mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: res fd %3d, events = %2s | %2s | %3s",
296                                 fd, (operations & EVENT_IN) ? "RD" : "..", (operations & EVENT_OUT) ? "WR" : "..", (operations & EVENT_ERR) ? "ERR" : "...");
297
298                         threadpool_io->backend.register_fd (fd, operations, FALSE);
299                 } else {
300                         mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: err fd %d", fd);
301
302                         mono_g_hash_table_remove (states, GINT_TO_POINTER (fd));
303
304                         threadpool_io->backend.remove_fd (fd);
305                 }
306         }
307 }
308
309 static void
310 selector_thread_interrupt (gpointer unused)
311 {
312         selector_thread_wakeup ();
313 }
314
315 static gsize WINAPI
316 selector_thread (gpointer data)
317 {
318         MonoError error;
319         MonoGHashTable *states;
320
321         io_selector_running = TRUE;
322
323         if (mono_runtime_is_shutting_down ()) {
324                 io_selector_running = FALSE;
325                 return 0;
326         }
327
328         states = mono_g_hash_table_new_type (g_direct_hash, NULL, MONO_HASH_VALUE_GC, MONO_ROOT_SOURCE_THREAD_POOL, "i/o thread pool states table");
329
330         while (!mono_runtime_is_shutting_down ()) {
331                 gint i, j;
332                 gint res;
333                 gboolean interrupted = FALSE;
334
335                 if (mono_thread_interruption_checkpoint ())
336                         continue;
337
338                 mono_coop_mutex_lock (&threadpool_io->updates_lock);
339
340                 for (i = 0; i < threadpool_io->updates_size; ++i) {
341                         ThreadPoolIOUpdate *update = &threadpool_io->updates [i];
342
343                         switch (update->type) {
344                         case UPDATE_EMPTY:
345                                 break;
346                         case UPDATE_ADD: {
347                                 gint fd;
348                                 gint operations;
349                                 gpointer k;
350                                 gboolean exists;
351                                 MonoMList *list = NULL;
352                                 MonoIOSelectorJob *job;
353
354                                 fd = update->data.add.fd;
355                                 g_assert (fd >= 0);
356
357                                 job = update->data.add.job;
358                                 g_assert (job);
359
360                                 exists = mono_g_hash_table_lookup_extended (states, GINT_TO_POINTER (fd), &k, (gpointer*) &list);
361                                 list = mono_mlist_append_checked (list, (MonoObject*) job, &error);
362                                 mono_error_assert_ok (&error);
363                                 mono_g_hash_table_replace (states, GINT_TO_POINTER (fd), list);
364
365                                 operations = get_operations_for_jobs (list);
366
367                                 mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: %3s fd %3d, operations = %2s | %2s | %3s",
368                                         exists ? "mod" : "add", fd, (operations & EVENT_IN) ? "RD" : "..", (operations & EVENT_OUT) ? "WR" : "..", (operations & EVENT_ERR) ? "ERR" : "...");
369
370                                 threadpool_io->backend.register_fd (fd, operations, !exists);
371
372                                 break;
373                         }
374                         case UPDATE_REMOVE_SOCKET: {
375                                 gint fd;
376                                 gpointer k;
377                                 MonoMList *list = NULL;
378
379                                 fd = update->data.remove_socket.fd;
380                                 g_assert (fd >= 0);
381
382                                 if (mono_g_hash_table_lookup_extended (states, GINT_TO_POINTER (fd), &k, (gpointer*) &list)) {
383                                         mono_g_hash_table_remove (states, GINT_TO_POINTER (fd));
384
385                                         for (j = i + 1; j < threadpool_io->updates_size; ++j) {
386                                                 ThreadPoolIOUpdate *update = &threadpool_io->updates [j];
387                                                 if (update->type == UPDATE_ADD && update->data.add.fd == fd)
388                                                         memset (update, 0, sizeof (ThreadPoolIOUpdate));
389                                         }
390
391                                         for (; list; list = mono_mlist_remove_item (list, list)) {
392                                                 mono_threadpool_enqueue_work_item (mono_object_domain (mono_mlist_get_data (list)), mono_mlist_get_data (list), &error);
393                                                 mono_error_assert_ok (&error);
394                                         }
395
396                                         mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: del fd %3d", fd);
397                                         threadpool_io->backend.remove_fd (fd);
398                                 }
399
400                                 break;
401                         }
402                         case UPDATE_REMOVE_DOMAIN: {
403                                 MonoDomain *domain;
404
405                                 domain = update->data.remove_domain.domain;
406                                 g_assert (domain);
407
408                                 FilterSockaresForDomainData user_data = { .domain = domain, .states = states };
409                                 mono_g_hash_table_foreach (states, filter_jobs_for_domain, &user_data);
410
411                                 for (j = i + 1; j < threadpool_io->updates_size; ++j) {
412                                         ThreadPoolIOUpdate *update = &threadpool_io->updates [j];
413                                         if (update->type == UPDATE_ADD && mono_object_domain (update->data.add.job) == domain)
414                                                 memset (update, 0, sizeof (ThreadPoolIOUpdate));
415                                 }
416
417                                 break;
418                         }
419                         default:
420                                 g_assert_not_reached ();
421                         }
422                 }
423
424                 mono_coop_cond_broadcast (&threadpool_io->updates_cond);
425
426                 if (threadpool_io->updates_size > 0) {
427                         threadpool_io->updates_size = 0;
428                         memset (&threadpool_io->updates, 0, UPDATES_CAPACITY * sizeof (ThreadPoolIOUpdate));
429                 }
430
431                 mono_coop_mutex_unlock (&threadpool_io->updates_lock);
432
433                 mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: wai");
434
435                 mono_thread_info_install_interrupt (selector_thread_interrupt, NULL, &interrupted);
436                 if (interrupted)
437                         continue;
438
439                 res = threadpool_io->backend.event_wait (wait_callback, states);
440                 if (res == -1)
441                         break;
442
443                 mono_thread_info_uninstall_interrupt (&interrupted);
444         }
445
446         mono_g_hash_table_destroy (states);
447
448         io_selector_running = FALSE;
449
450         return 0;
451 }
452
453 /* Locking: threadpool_io->updates_lock must be held */
454 static ThreadPoolIOUpdate*
455 update_get_new (void)
456 {
457         ThreadPoolIOUpdate *update = NULL;
458         g_assert (threadpool_io->updates_size <= UPDATES_CAPACITY);
459
460         while (threadpool_io->updates_size == UPDATES_CAPACITY) {
461                 /* we wait for updates to be applied in the selector_thread and we loop
462                  * as long as none are available. if it happends too much, then we need
463                  * to increase UPDATES_CAPACITY */
464                 mono_coop_cond_wait (&threadpool_io->updates_cond, &threadpool_io->updates_lock);
465         }
466
467         g_assert (threadpool_io->updates_size < UPDATES_CAPACITY);
468
469         update = &threadpool_io->updates [threadpool_io->updates_size ++];
470
471         return update;
472 }
473
474 static void
475 wakeup_pipes_init (void)
476 {
477 #if !defined(HOST_WIN32)
478         if (pipe (threadpool_io->wakeup_pipes) == -1)
479                 g_error ("wakeup_pipes_init: pipe () failed, error (%d) %s\n", errno, g_strerror (errno));
480         if (fcntl (threadpool_io->wakeup_pipes [0], F_SETFL, O_NONBLOCK) == -1)
481                 g_error ("wakeup_pipes_init: fcntl () failed, error (%d) %s\n", errno, g_strerror (errno));
482 #else
483         struct sockaddr_in client;
484         struct sockaddr_in server;
485         SOCKET server_sock;
486         gulong arg;
487         gint size;
488
489         server_sock = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
490         g_assert (server_sock != INVALID_SOCKET);
491         threadpool_io->wakeup_pipes [1] = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
492         g_assert (threadpool_io->wakeup_pipes [1] != INVALID_SOCKET);
493
494         server.sin_family = AF_INET;
495         server.sin_addr.s_addr = inet_addr ("127.0.0.1");
496         server.sin_port = 0;
497         if (bind (server_sock, (SOCKADDR*) &server, sizeof (server)) == SOCKET_ERROR) {
498                 closesocket (server_sock);
499                 g_error ("wakeup_pipes_init: bind () failed, error (%d)\n", WSAGetLastError ());
500         }
501
502         size = sizeof (server);
503         if (getsockname (server_sock, (SOCKADDR*) &server, &size) == SOCKET_ERROR) {
504                 closesocket (server_sock);
505                 g_error ("wakeup_pipes_init: getsockname () failed, error (%d)\n", WSAGetLastError ());
506         }
507         if (listen (server_sock, 1024) == SOCKET_ERROR) {
508                 closesocket (server_sock);
509                 g_error ("wakeup_pipes_init: listen () failed, error (%d)\n", WSAGetLastError ());
510         }
511         if (connect ((SOCKET) threadpool_io->wakeup_pipes [1], (SOCKADDR*) &server, sizeof (server)) == SOCKET_ERROR) {
512                 closesocket (server_sock);
513                 g_error ("wakeup_pipes_init: connect () failed, error (%d)\n", WSAGetLastError ());
514         }
515
516         size = sizeof (client);
517         threadpool_io->wakeup_pipes [0] = accept (server_sock, (SOCKADDR *) &client, &size);
518         g_assert (threadpool_io->wakeup_pipes [0] != INVALID_SOCKET);
519
520         arg = 1;
521         if (ioctlsocket (threadpool_io->wakeup_pipes [0], FIONBIO, &arg) == SOCKET_ERROR) {
522                 closesocket (threadpool_io->wakeup_pipes [0]);
523                 closesocket (server_sock);
524                 g_error ("wakeup_pipes_init: ioctlsocket () failed, error (%d)\n", WSAGetLastError ());
525         }
526
527         closesocket (server_sock);
528 #endif
529 }
530
531 static void
532 initialize (void)
533 {
534         g_assert (!threadpool_io);
535         threadpool_io = g_new0 (ThreadPoolIO, 1);
536         g_assert (threadpool_io);
537
538         mono_coop_mutex_init (&threadpool_io->updates_lock);
539         mono_coop_cond_init (&threadpool_io->updates_cond);
540         mono_gc_register_root ((char *)&threadpool_io->updates [0], sizeof (threadpool_io->updates), MONO_GC_DESCRIPTOR_NULL, MONO_ROOT_SOURCE_THREAD_POOL, "i/o thread pool updates list");
541
542         threadpool_io->updates_size = 0;
543
544         threadpool_io->backend = backend_poll;
545         if (g_getenv ("MONO_ENABLE_AIO") != NULL) {
546 #if defined(HAVE_EPOLL)
547                 threadpool_io->backend = backend_epoll;
548 #elif defined(HAVE_KQUEUE)
549                 threadpool_io->backend = backend_kqueue;
550 #endif
551         }
552
553         wakeup_pipes_init ();
554
555         if (!threadpool_io->backend.init (threadpool_io->wakeup_pipes [0]))
556                 g_error ("initialize: backend->init () failed");
557
558         MonoError error;
559         if (!mono_thread_create_internal (mono_get_root_domain (), selector_thread, NULL, TRUE, SMALL_STACK, &error))
560                 g_error ("initialize: mono_thread_create_internal () failed due to %s", mono_error_get_message (&error));
561 }
562
563 static void
564 cleanup (void)
565 {
566         // FIXME destroy everything
567 }
568
569 void
570 mono_threadpool_io_cleanup (void)
571 {
572         mono_lazy_cleanup (&io_status, cleanup);
573 }
574
575 void
576 ves_icall_System_IOSelector_Add (gpointer handle, MonoIOSelectorJob *job)
577 {
578         ThreadPoolIOUpdate *update;
579
580         g_assert (handle);
581
582         g_assert ((job->operation == EVENT_IN) ^ (job->operation == EVENT_OUT));
583         g_assert (job->callback);
584
585         if (mono_runtime_is_shutting_down ())
586                 return;
587         if (mono_domain_is_unloading (mono_object_domain (job)))
588                 return;
589
590         mono_lazy_initialize (&io_status, initialize);
591
592         mono_coop_mutex_lock (&threadpool_io->updates_lock);
593
594         update = update_get_new ();
595         update->type = UPDATE_ADD;
596         update->data.add.fd = GPOINTER_TO_INT (handle);
597         update->data.add.job = job;
598         mono_memory_barrier (); /* Ensure this is safely published before we wake up the selector */
599
600         selector_thread_wakeup ();
601
602         mono_coop_mutex_unlock (&threadpool_io->updates_lock);
603 }
604
605 void
606 ves_icall_System_IOSelector_Remove (gpointer handle)
607 {
608         mono_threadpool_io_remove_socket (GPOINTER_TO_INT (handle));
609 }
610
611 void
612 mono_threadpool_io_remove_socket (int fd)
613 {
614         ThreadPoolIOUpdate *update;
615
616         if (!mono_lazy_is_initialized (&io_status))
617                 return;
618
619         mono_coop_mutex_lock (&threadpool_io->updates_lock);
620
621         update = update_get_new ();
622         update->type = UPDATE_REMOVE_SOCKET;
623         update->data.add.fd = fd;
624         mono_memory_barrier (); /* Ensure this is safely published before we wake up the selector */
625
626         selector_thread_wakeup ();
627
628         mono_coop_cond_wait (&threadpool_io->updates_cond, &threadpool_io->updates_lock);
629
630         mono_coop_mutex_unlock (&threadpool_io->updates_lock);
631 }
632
633 void
634 mono_threadpool_io_remove_domain_jobs (MonoDomain *domain)
635 {
636         ThreadPoolIOUpdate *update;
637
638         if (!mono_lazy_is_initialized (&io_status))
639                 return;
640
641         mono_coop_mutex_lock (&threadpool_io->updates_lock);
642
643         update = update_get_new ();
644         update->type = UPDATE_REMOVE_DOMAIN;
645         update->data.remove_domain.domain = domain;
646         mono_memory_barrier (); /* Ensure this is safely published before we wake up the selector */
647
648         selector_thread_wakeup ();
649
650         mono_coop_cond_wait (&threadpool_io->updates_cond, &threadpool_io->updates_lock);
651
652         mono_coop_mutex_unlock (&threadpool_io->updates_lock);
653 }
654
655 #else
656
657 void
658 ves_icall_System_IOSelector_Add (gpointer handle, MonoIOSelectorJob *job)
659 {
660         g_assert_not_reached ();
661 }
662
663 void
664 ves_icall_System_IOSelector_Remove (gpointer handle)
665 {
666         g_assert_not_reached ();
667 }
668
669 void
670 mono_threadpool_io_cleanup (void)
671 {
672         g_assert_not_reached ();
673 }
674
675 void
676 mono_threadpool_io_remove_socket (int fd)
677 {
678         g_assert_not_reached ();
679 }
680
681 void
682 mono_threadpool_io_remove_domain_jobs (MonoDomain *domain)
683 {
684         g_assert_not_reached ();
685 }
686
687 #endif