First set of licensing changes
[mono.git] / mono / metadata / threadpool-ms-io.c
1 /*
2  * threadpool-ms-io.c: Microsoft IO threadpool runtime support
3  *
4  * Author:
5  *      Ludovic Henry (ludovic.henry@xamarin.com)
6  *
7  * Copyright 2015 Xamarin, Inc (http://www.xamarin.com)
8  * Licensed under the MIT license. See LICENSE file in the project root for full license information.
9  */
10
11 #include <config.h>
12
13 #ifndef DISABLE_SOCKETS
14
15 #include <glib.h>
16
17 #if defined(HOST_WIN32)
18 #include <windows.h>
19 #else
20 #include <errno.h>
21 #include <fcntl.h>
22 #endif
23
24 #include <mono/metadata/gc-internals.h>
25 #include <mono/metadata/mono-mlist.h>
26 #include <mono/metadata/threadpool-ms.h>
27 #include <mono/metadata/threadpool-ms-io.h>
28 #include <mono/utils/atomic.h>
29 #include <mono/utils/mono-threads.h>
30 #include <mono/utils/mono-lazy-init.h>
31 #include <mono/utils/mono-logger-internals.h>
32
33 typedef struct {
34         gboolean (*init) (gint wakeup_pipe_fd);
35         void     (*cleanup) (void);
36         void     (*register_fd) (gint fd, gint events, gboolean is_new);
37         void     (*remove_fd) (gint fd);
38         gint     (*event_wait) (void (*callback) (gint fd, gint events, gpointer user_data), gpointer user_data);
39 } ThreadPoolIOBackend;
40
41 /* Keep in sync with System.IOOperation in mcs/class/System/System/IOSelector.cs */
42 enum MonoIOOperation {
43         EVENT_IN   = 1 << 0,
44         EVENT_OUT  = 1 << 1,
45         EVENT_ERR  = 1 << 2, /* not in managed */
46 };
47
48 #include "threadpool-ms-io-epoll.c"
49 #include "threadpool-ms-io-kqueue.c"
50 #include "threadpool-ms-io-poll.c"
51
52 #define UPDATES_CAPACITY 128
53
54 /* Keep in sync with System.IOSelectorJob in mcs/class/System/System/IOSelector.cs */
55 struct _MonoIOSelectorJob {
56         MonoObject object;
57         gint32 operation;
58         MonoObject *callback;
59         MonoObject *state;
60 };
61
62 typedef enum {
63         UPDATE_EMPTY = 0,
64         UPDATE_ADD,
65         UPDATE_REMOVE_SOCKET,
66         UPDATE_REMOVE_DOMAIN,
67 } ThreadPoolIOUpdateType;
68
69 typedef struct {
70         gint fd;
71         MonoIOSelectorJob *job;
72 } ThreadPoolIOUpdate_Add;
73
74 typedef struct {
75         gint fd;
76 } ThreadPoolIOUpdate_RemoveSocket;
77
78 typedef struct {
79         MonoDomain *domain;
80 } ThreadPoolIOUpdate_RemoveDomain;
81
82 typedef struct {
83         ThreadPoolIOUpdateType type;
84         union {
85                 ThreadPoolIOUpdate_Add add;
86                 ThreadPoolIOUpdate_RemoveSocket remove_socket;
87                 ThreadPoolIOUpdate_RemoveDomain remove_domain;
88         } data;
89 } ThreadPoolIOUpdate;
90
91 typedef struct {
92         ThreadPoolIOBackend backend;
93
94         ThreadPoolIOUpdate updates [UPDATES_CAPACITY];
95         gint updates_size;
96         MonoCoopMutex updates_lock;
97         MonoCoopCond updates_cond;
98
99 #if !defined(HOST_WIN32)
100         gint wakeup_pipes [2];
101 #else
102         SOCKET wakeup_pipes [2];
103 #endif
104 } ThreadPoolIO;
105
106 static mono_lazy_init_t io_status = MONO_LAZY_INIT_STATUS_NOT_INITIALIZED;
107
108 static gboolean io_selector_running = FALSE;
109
110 static ThreadPoolIO* threadpool_io;
111
112 static MonoIOSelectorJob*
113 get_job_for_event (MonoMList **list, gint32 event)
114 {
115         MonoMList *current;
116
117         g_assert (list);
118
119         for (current = *list; current; current = mono_mlist_next (current)) {
120                 MonoIOSelectorJob *job = (MonoIOSelectorJob*) mono_mlist_get_data (current);
121                 if (job->operation == event) {
122                         *list = mono_mlist_remove_item (*list, current);
123                         return job;
124                 }
125         }
126
127         return NULL;
128 }
129
130 static gint
131 get_operations_for_jobs (MonoMList *list)
132 {
133         MonoMList *current;
134         gint operations = 0;
135
136         for (current = list; current; current = mono_mlist_next (current))
137                 operations |= ((MonoIOSelectorJob*) mono_mlist_get_data (current))->operation;
138
139         return operations;
140 }
141
142 static void
143 selector_thread_wakeup (void)
144 {
145         gchar msg = 'c';
146         gint written;
147
148         for (;;) {
149 #if !defined(HOST_WIN32)
150                 written = write (threadpool_io->wakeup_pipes [1], &msg, 1);
151                 if (written == 1)
152                         break;
153                 if (written == -1) {
154                         g_warning ("selector_thread_wakeup: write () failed, error (%d) %s\n", errno, g_strerror (errno));
155                         break;
156                 }
157 #else
158                 written = send (threadpool_io->wakeup_pipes [1], &msg, 1, 0);
159                 if (written == 1)
160                         break;
161                 if (written == SOCKET_ERROR) {
162                         g_warning ("selector_thread_wakeup: write () failed, error (%d)\n", WSAGetLastError ());
163                         break;
164                 }
165 #endif
166         }
167 }
168
169 static void
170 selector_thread_wakeup_drain_pipes (void)
171 {
172         gchar buffer [128];
173         gint received;
174
175         for (;;) {
176 #if !defined(HOST_WIN32)
177                 received = read (threadpool_io->wakeup_pipes [0], buffer, sizeof (buffer));
178                 if (received == 0)
179                         break;
180                 if (received == -1) {
181                         if (errno != EINTR && errno != EAGAIN)
182                                 g_warning ("selector_thread_wakeup_drain_pipes: read () failed, error (%d) %s\n", errno, g_strerror (errno));
183                         break;
184                 }
185 #else
186                 received = recv (threadpool_io->wakeup_pipes [0], buffer, sizeof (buffer), 0);
187                 if (received == 0)
188                         break;
189                 if (received == SOCKET_ERROR) {
190                         if (WSAGetLastError () != WSAEINTR && WSAGetLastError () != WSAEWOULDBLOCK)
191                                 g_warning ("selector_thread_wakeup_drain_pipes: recv () failed, error (%d) %s\n", WSAGetLastError ());
192                         break;
193                 }
194 #endif
195         }
196 }
197
198 typedef struct {
199         MonoDomain *domain;
200         MonoGHashTable *states;
201 } FilterSockaresForDomainData;
202
203 static void
204 filter_jobs_for_domain (gpointer key, gpointer value, gpointer user_data)
205 {
206         FilterSockaresForDomainData *data;
207         MonoMList *list = (MonoMList *)value, *element;
208         MonoDomain *domain;
209         MonoGHashTable *states;
210
211         g_assert (user_data);
212         data = (FilterSockaresForDomainData *)user_data;
213         domain = data->domain;
214         states = data->states;
215
216         for (element = list; element; element = mono_mlist_next (element)) {
217                 MonoIOSelectorJob *job = (MonoIOSelectorJob*) mono_mlist_get_data (element);
218                 if (mono_object_domain (job) == domain)
219                         mono_mlist_set_data (element, NULL);
220         }
221
222         /* we skip all the first elements which are NULL */
223         for (; list; list = mono_mlist_next (list)) {
224                 if (mono_mlist_get_data (list))
225                         break;
226         }
227
228         if (list) {
229                 g_assert (mono_mlist_get_data (list));
230
231                 /* we delete all the NULL elements after the first one */
232                 for (element = list; element;) {
233                         MonoMList *next;
234                         if (!(next = mono_mlist_next (element)))
235                                 break;
236                         if (mono_mlist_get_data (next))
237                                 element = next;
238                         else
239                                 mono_mlist_set_next (element, mono_mlist_next (next));
240                 }
241         }
242
243         mono_g_hash_table_replace (states, key, list);
244 }
245
246 static void
247 wait_callback (gint fd, gint events, gpointer user_data)
248 {
249         MonoError error;
250
251         if (mono_runtime_is_shutting_down ())
252                 return;
253
254         if (fd == threadpool_io->wakeup_pipes [0]) {
255                 mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: wke");
256                 selector_thread_wakeup_drain_pipes ();
257         } else {
258                 MonoGHashTable *states;
259                 MonoMList *list = NULL;
260                 gpointer k;
261                 gboolean remove_fd = FALSE;
262                 gint operations;
263
264                 g_assert (user_data);
265                 states = (MonoGHashTable *)user_data;
266
267                 mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: cal fd %3d, events = %2s | %2s | %3s",
268                         fd, (events & EVENT_IN) ? "RD" : "..", (events & EVENT_OUT) ? "WR" : "..", (events & EVENT_ERR) ? "ERR" : "...");
269
270                 if (!mono_g_hash_table_lookup_extended (states, GINT_TO_POINTER (fd), &k, (gpointer*) &list))
271                         g_error ("wait_callback: fd %d not found in states table", fd);
272
273                 if (list && (events & EVENT_IN) != 0) {
274                         MonoIOSelectorJob *job = get_job_for_event (&list, EVENT_IN);
275                         if (job) {
276                                 mono_threadpool_ms_enqueue_work_item (((MonoObject*) job)->vtable->domain, (MonoObject*) job, &error);
277                                 mono_error_raise_exception (&error); /* FIXME don't raise here */
278                         }
279
280                 }
281                 if (list && (events & EVENT_OUT) != 0) {
282                         MonoIOSelectorJob *job = get_job_for_event (&list, EVENT_OUT);
283                         if (job) {
284                                 mono_threadpool_ms_enqueue_work_item (((MonoObject*) job)->vtable->domain, (MonoObject*) job, &error);
285                                 mono_error_raise_exception (&error); /* FIXME don't raise here */
286                         }
287                 }
288
289                 remove_fd = (events & EVENT_ERR) == EVENT_ERR;
290                 if (!remove_fd) {
291                         mono_g_hash_table_replace (states, GINT_TO_POINTER (fd), list);
292
293                         operations = get_operations_for_jobs (list);
294
295                         mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: res fd %3d, events = %2s | %2s | %3s",
296                                 fd, (operations & EVENT_IN) ? "RD" : "..", (operations & EVENT_OUT) ? "WR" : "..", (operations & EVENT_ERR) ? "ERR" : "...");
297
298                         threadpool_io->backend.register_fd (fd, operations, FALSE);
299                 } else {
300                         mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: err fd %d", fd);
301
302                         mono_g_hash_table_remove (states, GINT_TO_POINTER (fd));
303
304                         threadpool_io->backend.remove_fd (fd);
305                 }
306         }
307 }
308
309 static void
310 selector_thread (gpointer data)
311 {
312         MonoError error;
313         MonoGHashTable *states;
314
315         io_selector_running = TRUE;
316
317         if (mono_runtime_is_shutting_down ()) {
318                 io_selector_running = FALSE;
319                 return;
320         }
321
322         states = mono_g_hash_table_new_type (g_direct_hash, g_direct_equal, MONO_HASH_VALUE_GC, MONO_ROOT_SOURCE_THREAD_POOL, "i/o thread pool states table");
323
324         for (;;) {
325                 gint i, j;
326                 gint res;
327
328                 mono_coop_mutex_lock (&threadpool_io->updates_lock);
329
330                 for (i = 0; i < threadpool_io->updates_size; ++i) {
331                         ThreadPoolIOUpdate *update = &threadpool_io->updates [i];
332
333                         switch (update->type) {
334                         case UPDATE_EMPTY:
335                                 break;
336                         case UPDATE_ADD: {
337                                 gint fd;
338                                 gint operations;
339                                 gpointer k;
340                                 gboolean exists;
341                                 MonoMList *list = NULL;
342                                 MonoIOSelectorJob *job;
343
344                                 fd = update->data.add.fd;
345                                 g_assert (fd >= 0);
346
347                                 job = update->data.add.job;
348                                 g_assert (job);
349
350                                 exists = mono_g_hash_table_lookup_extended (states, GINT_TO_POINTER (fd), &k, (gpointer*) &list);
351                                 list = mono_mlist_append (list, (MonoObject*) job);
352                                 mono_g_hash_table_replace (states, GINT_TO_POINTER (fd), list);
353
354                                 operations = get_operations_for_jobs (list);
355
356                                 mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: %3s fd %3d, operations = %2s | %2s | %3s",
357                                         exists ? "mod" : "add", fd, (operations & EVENT_IN) ? "RD" : "..", (operations & EVENT_OUT) ? "WR" : "..", (operations & EVENT_ERR) ? "ERR" : "...");
358
359                                 threadpool_io->backend.register_fd (fd, operations, !exists);
360
361                                 break;
362                         }
363                         case UPDATE_REMOVE_SOCKET: {
364                                 gint fd;
365                                 gpointer k;
366                                 MonoMList *list = NULL;
367
368                                 fd = update->data.remove_socket.fd;
369                                 g_assert (fd >= 0);
370
371                                 if (mono_g_hash_table_lookup_extended (states, GINT_TO_POINTER (fd), &k, (gpointer*) &list)) {
372                                         mono_g_hash_table_remove (states, GINT_TO_POINTER (fd));
373
374                                         for (j = i + 1; j < threadpool_io->updates_size; ++j) {
375                                                 ThreadPoolIOUpdate *update = &threadpool_io->updates [j];
376                                                 if (update->type == UPDATE_ADD && update->data.add.fd == fd)
377                                                         memset (update, 0, sizeof (ThreadPoolIOUpdate));
378                                         }
379
380                                         for (; list; list = mono_mlist_remove_item (list, list)) {
381                                                 mono_threadpool_ms_enqueue_work_item (mono_object_domain (mono_mlist_get_data (list)), mono_mlist_get_data (list), &error);
382                                                 mono_error_raise_exception (&error); /* FIXME don't raise here */
383                                         }
384
385                                         mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: del fd %3d", fd);
386                                         threadpool_io->backend.remove_fd (fd);
387                                 }
388
389                                 break;
390                         }
391                         case UPDATE_REMOVE_DOMAIN: {
392                                 MonoDomain *domain;
393
394                                 domain = update->data.remove_domain.domain;
395                                 g_assert (domain);
396
397                                 FilterSockaresForDomainData user_data = { .domain = domain, .states = states };
398                                 mono_g_hash_table_foreach (states, filter_jobs_for_domain, &user_data);
399
400                                 for (j = i + 1; j < threadpool_io->updates_size; ++j) {
401                                         ThreadPoolIOUpdate *update = &threadpool_io->updates [j];
402                                         if (update->type == UPDATE_ADD && mono_object_domain (update->data.add.job) == domain)
403                                                 memset (update, 0, sizeof (ThreadPoolIOUpdate));
404                                 }
405
406                                 break;
407                         }
408                         default:
409                                 g_assert_not_reached ();
410                         }
411                 }
412
413                 mono_coop_cond_broadcast (&threadpool_io->updates_cond);
414
415                 if (threadpool_io->updates_size > 0) {
416                         threadpool_io->updates_size = 0;
417                         memset (&threadpool_io->updates, 0, UPDATES_CAPACITY * sizeof (ThreadPoolIOUpdate));
418                 }
419
420                 mono_coop_mutex_unlock (&threadpool_io->updates_lock);
421
422                 mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: wai");
423
424                 res = threadpool_io->backend.event_wait (wait_callback, states);
425
426                 if (res == -1 || mono_runtime_is_shutting_down ())
427                         break;
428         }
429
430         mono_g_hash_table_destroy (states);
431
432         io_selector_running = FALSE;
433 }
434
435 /* Locking: threadpool_io->updates_lock must be held */
436 static ThreadPoolIOUpdate*
437 update_get_new (void)
438 {
439         ThreadPoolIOUpdate *update = NULL;
440         g_assert (threadpool_io->updates_size <= UPDATES_CAPACITY);
441
442         while (threadpool_io->updates_size == UPDATES_CAPACITY) {
443                 /* we wait for updates to be applied in the selector_thread and we loop
444                  * as long as none are available. if it happends too much, then we need
445                  * to increase UPDATES_CAPACITY */
446                 mono_coop_cond_wait (&threadpool_io->updates_cond, &threadpool_io->updates_lock);
447         }
448
449         g_assert (threadpool_io->updates_size < UPDATES_CAPACITY);
450
451         update = &threadpool_io->updates [threadpool_io->updates_size ++];
452
453         return update;
454 }
455
456 static void
457 wakeup_pipes_init (void)
458 {
459 #if !defined(HOST_WIN32)
460         if (pipe (threadpool_io->wakeup_pipes) == -1)
461                 g_error ("wakeup_pipes_init: pipe () failed, error (%d) %s\n", errno, g_strerror (errno));
462         if (fcntl (threadpool_io->wakeup_pipes [0], F_SETFL, O_NONBLOCK) == -1)
463                 g_error ("wakeup_pipes_init: fcntl () failed, error (%d) %s\n", errno, g_strerror (errno));
464 #else
465         struct sockaddr_in client;
466         struct sockaddr_in server;
467         SOCKET server_sock;
468         gulong arg;
469         gint size;
470
471         server_sock = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
472         g_assert (server_sock != INVALID_SOCKET);
473         threadpool_io->wakeup_pipes [1] = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
474         g_assert (threadpool_io->wakeup_pipes [1] != INVALID_SOCKET);
475
476         server.sin_family = AF_INET;
477         server.sin_addr.s_addr = inet_addr ("127.0.0.1");
478         server.sin_port = 0;
479         if (bind (server_sock, (SOCKADDR*) &server, sizeof (server)) == SOCKET_ERROR) {
480                 closesocket (server_sock);
481                 g_error ("wakeup_pipes_init: bind () failed, error (%d)\n", WSAGetLastError ());
482         }
483
484         size = sizeof (server);
485         if (getsockname (server_sock, (SOCKADDR*) &server, &size) == SOCKET_ERROR) {
486                 closesocket (server_sock);
487                 g_error ("wakeup_pipes_init: getsockname () failed, error (%d)\n", WSAGetLastError ());
488         }
489         if (listen (server_sock, 1024) == SOCKET_ERROR) {
490                 closesocket (server_sock);
491                 g_error ("wakeup_pipes_init: listen () failed, error (%d)\n", WSAGetLastError ());
492         }
493         if (connect ((SOCKET) threadpool_io->wakeup_pipes [1], (SOCKADDR*) &server, sizeof (server)) == SOCKET_ERROR) {
494                 closesocket (server_sock);
495                 g_error ("wakeup_pipes_init: connect () failed, error (%d)\n", WSAGetLastError ());
496         }
497
498         size = sizeof (client);
499         threadpool_io->wakeup_pipes [0] = accept (server_sock, (SOCKADDR *) &client, &size);
500         g_assert (threadpool_io->wakeup_pipes [0] != INVALID_SOCKET);
501
502         arg = 1;
503         if (ioctlsocket (threadpool_io->wakeup_pipes [0], FIONBIO, &arg) == SOCKET_ERROR) {
504                 closesocket (threadpool_io->wakeup_pipes [0]);
505                 closesocket (server_sock);
506                 g_error ("wakeup_pipes_init: ioctlsocket () failed, error (%d)\n", WSAGetLastError ());
507         }
508
509         closesocket (server_sock);
510 #endif
511 }
512
513 static void
514 initialize (void)
515 {
516         g_assert (!threadpool_io);
517         threadpool_io = g_new0 (ThreadPoolIO, 1);
518         g_assert (threadpool_io);
519
520         mono_coop_mutex_init (&threadpool_io->updates_lock);
521         mono_coop_cond_init (&threadpool_io->updates_cond);
522         mono_gc_register_root ((char *)&threadpool_io->updates [0], sizeof (threadpool_io->updates), MONO_GC_DESCRIPTOR_NULL, MONO_ROOT_SOURCE_THREAD_POOL, "i/o thread pool updates list");
523
524         threadpool_io->updates_size = 0;
525
526         threadpool_io->backend = backend_poll;
527         if (g_getenv ("MONO_ENABLE_AIO") != NULL) {
528 #if defined(HAVE_EPOLL)
529                 threadpool_io->backend = backend_epoll;
530 #elif defined(HAVE_KQUEUE)
531                 threadpool_io->backend = backend_kqueue;
532 #endif
533         }
534
535         wakeup_pipes_init ();
536
537         if (!threadpool_io->backend.init (threadpool_io->wakeup_pipes [0]))
538                 g_error ("initialize: backend->init () failed");
539
540         if (!mono_thread_create_internal (mono_get_root_domain (), selector_thread, NULL, TRUE, SMALL_STACK))
541                 g_error ("initialize: mono_thread_create_internal () failed");
542 }
543
544 static void
545 cleanup (void)
546 {
547         /* we make the assumption along the code that we are
548          * cleaning up only if the runtime is shutting down */
549         g_assert (mono_runtime_is_shutting_down ());
550
551         selector_thread_wakeup ();
552         while (io_selector_running)
553                 mono_thread_info_usleep (1000);
554
555         mono_coop_mutex_destroy (&threadpool_io->updates_lock);
556         mono_coop_cond_destroy (&threadpool_io->updates_cond);
557
558         threadpool_io->backend.cleanup ();
559
560 #if !defined(HOST_WIN32)
561         close (threadpool_io->wakeup_pipes [0]);
562         close (threadpool_io->wakeup_pipes [1]);
563 #else
564         closesocket (threadpool_io->wakeup_pipes [0]);
565         closesocket (threadpool_io->wakeup_pipes [1]);
566 #endif
567
568         g_assert (threadpool_io);
569         g_free (threadpool_io);
570         threadpool_io = NULL;
571         g_assert (!threadpool_io);
572 }
573
574 void
575 mono_threadpool_ms_io_cleanup (void)
576 {
577         mono_lazy_cleanup (&io_status, cleanup);
578 }
579
580 void
581 ves_icall_System_IOSelector_Add (gpointer handle, MonoIOSelectorJob *job)
582 {
583         ThreadPoolIOUpdate *update;
584
585         g_assert (handle >= 0);
586
587         g_assert (job->operation == EVENT_IN ^ job->operation == EVENT_OUT);
588         g_assert (job->callback);
589
590         if (mono_runtime_is_shutting_down ())
591                 return;
592         if (mono_domain_is_unloading (mono_object_domain (job)))
593                 return;
594
595         mono_lazy_initialize (&io_status, initialize);
596
597         mono_coop_mutex_lock (&threadpool_io->updates_lock);
598
599         update = update_get_new ();
600         update->type = UPDATE_ADD;
601         update->data.add.fd = GPOINTER_TO_INT (handle);
602         update->data.add.job = job;
603         mono_memory_barrier (); /* Ensure this is safely published before we wake up the selector */
604
605         selector_thread_wakeup ();
606
607         mono_coop_mutex_unlock (&threadpool_io->updates_lock);
608 }
609
610 void
611 ves_icall_System_IOSelector_Remove (gpointer handle)
612 {
613         mono_threadpool_ms_io_remove_socket (GPOINTER_TO_INT (handle));
614 }
615
616 void
617 mono_threadpool_ms_io_remove_socket (int fd)
618 {
619         ThreadPoolIOUpdate *update;
620
621         if (!mono_lazy_is_initialized (&io_status))
622                 return;
623
624         mono_coop_mutex_lock (&threadpool_io->updates_lock);
625
626         update = update_get_new ();
627         update->type = UPDATE_REMOVE_SOCKET;
628         update->data.add.fd = fd;
629         mono_memory_barrier (); /* Ensure this is safely published before we wake up the selector */
630
631         selector_thread_wakeup ();
632
633         mono_coop_cond_wait (&threadpool_io->updates_cond, &threadpool_io->updates_lock);
634
635         mono_coop_mutex_unlock (&threadpool_io->updates_lock);
636 }
637
638 void
639 mono_threadpool_ms_io_remove_domain_jobs (MonoDomain *domain)
640 {
641         ThreadPoolIOUpdate *update;
642
643         if (!mono_lazy_is_initialized (&io_status))
644                 return;
645
646         mono_coop_mutex_lock (&threadpool_io->updates_lock);
647
648         update = update_get_new ();
649         update->type = UPDATE_REMOVE_DOMAIN;
650         update->data.remove_domain.domain = domain;
651         mono_memory_barrier (); /* Ensure this is safely published before we wake up the selector */
652
653         selector_thread_wakeup ();
654
655         mono_coop_cond_wait (&threadpool_io->updates_cond, &threadpool_io->updates_lock);
656
657         mono_coop_mutex_unlock (&threadpool_io->updates_lock);
658 }
659
660 #else
661
662 void
663 ves_icall_System_IOSelector_Add (gpointer handle, MonoIOSelectorJob *job)
664 {
665         g_assert_not_reached ();
666 }
667
668 void
669 ves_icall_System_IOSelector_Remove (gpointer handle)
670 {
671         g_assert_not_reached ();
672 }
673
674 void
675 mono_threadpool_ms_io_cleanup (void)
676 {
677         g_assert_not_reached ();
678 }
679
680 void
681 mono_threadpool_ms_io_remove_socket (int fd)
682 {
683         g_assert_not_reached ();
684 }
685
686 void
687 mono_threadpool_ms_io_remove_domain_jobs (MonoDomain *domain)
688 {
689         g_assert_not_reached ();
690 }
691
692 #endif