[sgen] Evacuate from emptier blocks to fuller ones
[mono.git] / mono / metadata / threadpool-ms-io.c
1 /*
2  * threadpool-ms-io.c: Microsoft IO threadpool runtime support
3  *
4  * Author:
5  *      Ludovic Henry (ludovic.henry@xamarin.com)
6  *
7  * Copyright 2015 Xamarin, Inc (http://www.xamarin.com)
8  */
9
10 #include <config.h>
11
12 #ifndef DISABLE_SOCKETS
13
14 #include <glib.h>
15
16 #if defined(HOST_WIN32)
17 #include <windows.h>
18 #else
19 #include <errno.h>
20 #include <fcntl.h>
21 #endif
22
23 #include <mono/metadata/gc-internals.h>
24 #include <mono/metadata/mono-mlist.h>
25 #include <mono/metadata/threadpool-ms.h>
26 #include <mono/metadata/threadpool-ms-io.h>
27 #include <mono/utils/atomic.h>
28 #include <mono/utils/mono-threads.h>
29 #include <mono/utils/mono-lazy-init.h>
30 #include <mono/utils/mono-logger-internals.h>
31
32 typedef struct {
33         gboolean (*init) (gint wakeup_pipe_fd);
34         void     (*cleanup) (void);
35         void     (*register_fd) (gint fd, gint events, gboolean is_new);
36         void     (*remove_fd) (gint fd);
37         gint     (*event_wait) (void (*callback) (gint fd, gint events, gpointer user_data), gpointer user_data);
38 } ThreadPoolIOBackend;
39
40 /* Keep in sync with System.IOOperation in mcs/class/System/System/IOSelector.cs */
41 enum MonoIOOperation {
42         EVENT_IN   = 1 << 0,
43         EVENT_OUT  = 1 << 1,
44         EVENT_ERR  = 1 << 2, /* not in managed */
45 };
46
47 #include "threadpool-ms-io-epoll.c"
48 #include "threadpool-ms-io-kqueue.c"
49 #include "threadpool-ms-io-poll.c"
50
51 #define UPDATES_CAPACITY 128
52
53 /* Keep in sync with System.IOSelectorJob in mcs/class/System/System/IOSelector.cs */
54 struct _MonoIOSelectorJob {
55         MonoObject object;
56         gint32 operation;
57         MonoObject *callback;
58         MonoObject *state;
59 };
60
61 typedef enum {
62         UPDATE_EMPTY = 0,
63         UPDATE_ADD,
64         UPDATE_REMOVE_SOCKET,
65         UPDATE_REMOVE_DOMAIN,
66 } ThreadPoolIOUpdateType;
67
68 typedef struct {
69         gint fd;
70         MonoIOSelectorJob *job;
71 } ThreadPoolIOUpdate_Add;
72
73 typedef struct {
74         gint fd;
75 } ThreadPoolIOUpdate_RemoveSocket;
76
77 typedef struct {
78         MonoDomain *domain;
79 } ThreadPoolIOUpdate_RemoveDomain;
80
81 typedef struct {
82         ThreadPoolIOUpdateType type;
83         union {
84                 ThreadPoolIOUpdate_Add add;
85                 ThreadPoolIOUpdate_RemoveSocket remove_socket;
86                 ThreadPoolIOUpdate_RemoveDomain remove_domain;
87         } data;
88 } ThreadPoolIOUpdate;
89
90 typedef struct {
91         ThreadPoolIOBackend backend;
92
93         ThreadPoolIOUpdate updates [UPDATES_CAPACITY];
94         gint updates_size;
95         MonoCoopMutex updates_lock;
96         MonoCoopCond updates_cond;
97
98 #if !defined(HOST_WIN32)
99         gint wakeup_pipes [2];
100 #else
101         SOCKET wakeup_pipes [2];
102 #endif
103 } ThreadPoolIO;
104
105 static mono_lazy_init_t io_status = MONO_LAZY_INIT_STATUS_NOT_INITIALIZED;
106
107 static gboolean io_selector_running = FALSE;
108
109 static ThreadPoolIO* threadpool_io;
110
111 static MonoIOSelectorJob*
112 get_job_for_event (MonoMList **list, gint32 event)
113 {
114         MonoMList *current;
115
116         g_assert (list);
117
118         for (current = *list; current; current = mono_mlist_next (current)) {
119                 MonoIOSelectorJob *job = (MonoIOSelectorJob*) mono_mlist_get_data (current);
120                 if (job->operation == event) {
121                         *list = mono_mlist_remove_item (*list, current);
122                         return job;
123                 }
124         }
125
126         return NULL;
127 }
128
129 static gint
130 get_operations_for_jobs (MonoMList *list)
131 {
132         MonoMList *current;
133         gint operations = 0;
134
135         for (current = list; current; current = mono_mlist_next (current))
136                 operations |= ((MonoIOSelectorJob*) mono_mlist_get_data (current))->operation;
137
138         return operations;
139 }
140
141 static void
142 selector_thread_wakeup (void)
143 {
144         gchar msg = 'c';
145         gint written;
146
147         for (;;) {
148 #if !defined(HOST_WIN32)
149                 written = write (threadpool_io->wakeup_pipes [1], &msg, 1);
150                 if (written == 1)
151                         break;
152                 if (written == -1) {
153                         g_warning ("selector_thread_wakeup: write () failed, error (%d) %s\n", errno, g_strerror (errno));
154                         break;
155                 }
156 #else
157                 written = send (threadpool_io->wakeup_pipes [1], &msg, 1, 0);
158                 if (written == 1)
159                         break;
160                 if (written == SOCKET_ERROR) {
161                         g_warning ("selector_thread_wakeup: write () failed, error (%d)\n", WSAGetLastError ());
162                         break;
163                 }
164 #endif
165         }
166 }
167
168 static void
169 selector_thread_wakeup_drain_pipes (void)
170 {
171         gchar buffer [128];
172         gint received;
173
174         for (;;) {
175 #if !defined(HOST_WIN32)
176                 received = read (threadpool_io->wakeup_pipes [0], buffer, sizeof (buffer));
177                 if (received == 0)
178                         break;
179                 if (received == -1) {
180                         if (errno != EINTR && errno != EAGAIN)
181                                 g_warning ("selector_thread_wakeup_drain_pipes: read () failed, error (%d) %s\n", errno, g_strerror (errno));
182                         break;
183                 }
184 #else
185                 received = recv (threadpool_io->wakeup_pipes [0], buffer, sizeof (buffer), 0);
186                 if (received == 0)
187                         break;
188                 if (received == SOCKET_ERROR) {
189                         if (WSAGetLastError () != WSAEINTR && WSAGetLastError () != WSAEWOULDBLOCK)
190                                 g_warning ("selector_thread_wakeup_drain_pipes: recv () failed, error (%d) %s\n", WSAGetLastError ());
191                         break;
192                 }
193 #endif
194         }
195 }
196
197 typedef struct {
198         MonoDomain *domain;
199         MonoGHashTable *states;
200 } FilterSockaresForDomainData;
201
202 static void
203 filter_jobs_for_domain (gpointer key, gpointer value, gpointer user_data)
204 {
205         FilterSockaresForDomainData *data;
206         MonoMList *list = (MonoMList *)value, *element;
207         MonoDomain *domain;
208         MonoGHashTable *states;
209
210         g_assert (user_data);
211         data = (FilterSockaresForDomainData *)user_data;
212         domain = data->domain;
213         states = data->states;
214
215         for (element = list; element; element = mono_mlist_next (element)) {
216                 MonoIOSelectorJob *job = (MonoIOSelectorJob*) mono_mlist_get_data (element);
217                 if (mono_object_domain (job) == domain)
218                         mono_mlist_set_data (element, NULL);
219         }
220
221         /* we skip all the first elements which are NULL */
222         for (; list; list = mono_mlist_next (list)) {
223                 if (mono_mlist_get_data (list))
224                         break;
225         }
226
227         if (list) {
228                 g_assert (mono_mlist_get_data (list));
229
230                 /* we delete all the NULL elements after the first one */
231                 for (element = list; element;) {
232                         MonoMList *next;
233                         if (!(next = mono_mlist_next (element)))
234                                 break;
235                         if (mono_mlist_get_data (next))
236                                 element = next;
237                         else
238                                 mono_mlist_set_next (element, mono_mlist_next (next));
239                 }
240         }
241
242         mono_g_hash_table_replace (states, key, list);
243 }
244
245 static void
246 wait_callback (gint fd, gint events, gpointer user_data)
247 {
248         MonoError error;
249
250         if (mono_runtime_is_shutting_down ())
251                 return;
252
253         if (fd == threadpool_io->wakeup_pipes [0]) {
254                 mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: wke");
255                 selector_thread_wakeup_drain_pipes ();
256         } else {
257                 MonoGHashTable *states;
258                 MonoMList *list = NULL;
259                 gpointer k;
260                 gboolean remove_fd = FALSE;
261                 gint operations;
262
263                 g_assert (user_data);
264                 states = (MonoGHashTable *)user_data;
265
266                 mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: cal fd %3d, events = %2s | %2s | %3s",
267                         fd, (events & EVENT_IN) ? "RD" : "..", (events & EVENT_OUT) ? "WR" : "..", (events & EVENT_ERR) ? "ERR" : "...");
268
269                 if (!mono_g_hash_table_lookup_extended (states, GINT_TO_POINTER (fd), &k, (gpointer*) &list))
270                         g_error ("wait_callback: fd %d not found in states table", fd);
271
272                 if (list && (events & EVENT_IN) != 0) {
273                         MonoIOSelectorJob *job = get_job_for_event (&list, EVENT_IN);
274                         if (job) {
275                                 mono_threadpool_ms_enqueue_work_item (((MonoObject*) job)->vtable->domain, (MonoObject*) job, &error);
276                                 mono_error_raise_exception (&error); /* FIXME don't raise here */
277                         }
278
279                 }
280                 if (list && (events & EVENT_OUT) != 0) {
281                         MonoIOSelectorJob *job = get_job_for_event (&list, EVENT_OUT);
282                         if (job) {
283                                 mono_threadpool_ms_enqueue_work_item (((MonoObject*) job)->vtable->domain, (MonoObject*) job, &error);
284                                 mono_error_raise_exception (&error); /* FIXME don't raise here */
285                         }
286                 }
287
288                 remove_fd = (events & EVENT_ERR) == EVENT_ERR;
289                 if (!remove_fd) {
290                         mono_g_hash_table_replace (states, GINT_TO_POINTER (fd), list);
291
292                         operations = get_operations_for_jobs (list);
293
294                         mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: res fd %3d, events = %2s | %2s | %3s",
295                                 fd, (operations & EVENT_IN) ? "RD" : "..", (operations & EVENT_OUT) ? "WR" : "..", (operations & EVENT_ERR) ? "ERR" : "...");
296
297                         threadpool_io->backend.register_fd (fd, operations, FALSE);
298                 } else {
299                         mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: err fd %d", fd);
300
301                         mono_g_hash_table_remove (states, GINT_TO_POINTER (fd));
302
303                         threadpool_io->backend.remove_fd (fd);
304                 }
305         }
306 }
307
308 static void
309 selector_thread (gpointer data)
310 {
311         MonoError error;
312         MonoGHashTable *states;
313
314         io_selector_running = TRUE;
315
316         if (mono_runtime_is_shutting_down ()) {
317                 io_selector_running = FALSE;
318                 return;
319         }
320
321         states = mono_g_hash_table_new_type (g_direct_hash, g_direct_equal, MONO_HASH_VALUE_GC, MONO_ROOT_SOURCE_THREAD_POOL, "i/o thread pool states table");
322
323         for (;;) {
324                 gint i, j;
325                 gint res;
326
327                 mono_coop_mutex_lock (&threadpool_io->updates_lock);
328
329                 for (i = 0; i < threadpool_io->updates_size; ++i) {
330                         ThreadPoolIOUpdate *update = &threadpool_io->updates [i];
331
332                         switch (update->type) {
333                         case UPDATE_EMPTY:
334                                 break;
335                         case UPDATE_ADD: {
336                                 gint fd;
337                                 gint operations;
338                                 gpointer k;
339                                 gboolean exists;
340                                 MonoMList *list = NULL;
341                                 MonoIOSelectorJob *job;
342
343                                 fd = update->data.add.fd;
344                                 g_assert (fd >= 0);
345
346                                 job = update->data.add.job;
347                                 g_assert (job);
348
349                                 exists = mono_g_hash_table_lookup_extended (states, GINT_TO_POINTER (fd), &k, (gpointer*) &list);
350                                 list = mono_mlist_append (list, (MonoObject*) job);
351                                 mono_g_hash_table_replace (states, GINT_TO_POINTER (fd), list);
352
353                                 operations = get_operations_for_jobs (list);
354
355                                 mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: %3s fd %3d, operations = %2s | %2s | %3s",
356                                         exists ? "mod" : "add", fd, (operations & EVENT_IN) ? "RD" : "..", (operations & EVENT_OUT) ? "WR" : "..", (operations & EVENT_ERR) ? "ERR" : "...");
357
358                                 threadpool_io->backend.register_fd (fd, operations, !exists);
359
360                                 break;
361                         }
362                         case UPDATE_REMOVE_SOCKET: {
363                                 gint fd;
364                                 gpointer k;
365                                 MonoMList *list = NULL;
366
367                                 fd = update->data.remove_socket.fd;
368                                 g_assert (fd >= 0);
369
370                                 if (mono_g_hash_table_lookup_extended (states, GINT_TO_POINTER (fd), &k, (gpointer*) &list)) {
371                                         mono_g_hash_table_remove (states, GINT_TO_POINTER (fd));
372
373                                         for (j = i + 1; j < threadpool_io->updates_size; ++j) {
374                                                 ThreadPoolIOUpdate *update = &threadpool_io->updates [j];
375                                                 if (update->type == UPDATE_ADD && update->data.add.fd == fd)
376                                                         memset (update, 0, sizeof (ThreadPoolIOUpdate));
377                                         }
378
379                                         for (; list; list = mono_mlist_remove_item (list, list)) {
380                                                 mono_threadpool_ms_enqueue_work_item (mono_object_domain (mono_mlist_get_data (list)), mono_mlist_get_data (list), &error);
381                                                 mono_error_raise_exception (&error); /* FIXME don't raise here */
382                                         }
383
384                                         mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: del fd %3d", fd);
385                                         threadpool_io->backend.remove_fd (fd);
386                                 }
387
388                                 break;
389                         }
390                         case UPDATE_REMOVE_DOMAIN: {
391                                 MonoDomain *domain;
392
393                                 domain = update->data.remove_domain.domain;
394                                 g_assert (domain);
395
396                                 FilterSockaresForDomainData user_data = { .domain = domain, .states = states };
397                                 mono_g_hash_table_foreach (states, filter_jobs_for_domain, &user_data);
398
399                                 for (j = i + 1; j < threadpool_io->updates_size; ++j) {
400                                         ThreadPoolIOUpdate *update = &threadpool_io->updates [j];
401                                         if (update->type == UPDATE_ADD && mono_object_domain (update->data.add.job) == domain)
402                                                 memset (update, 0, sizeof (ThreadPoolIOUpdate));
403                                 }
404
405                                 break;
406                         }
407                         default:
408                                 g_assert_not_reached ();
409                         }
410                 }
411
412                 mono_coop_cond_broadcast (&threadpool_io->updates_cond);
413
414                 if (threadpool_io->updates_size > 0) {
415                         threadpool_io->updates_size = 0;
416                         memset (&threadpool_io->updates, 0, UPDATES_CAPACITY * sizeof (ThreadPoolIOUpdate));
417                 }
418
419                 mono_coop_mutex_unlock (&threadpool_io->updates_lock);
420
421                 mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_IO_THREADPOOL, "io threadpool: wai");
422
423                 res = threadpool_io->backend.event_wait (wait_callback, states);
424
425                 if (res == -1 || mono_runtime_is_shutting_down ())
426                         break;
427         }
428
429         mono_g_hash_table_destroy (states);
430
431         io_selector_running = FALSE;
432 }
433
434 /* Locking: threadpool_io->updates_lock must be held */
435 static ThreadPoolIOUpdate*
436 update_get_new (void)
437 {
438         ThreadPoolIOUpdate *update = NULL;
439         g_assert (threadpool_io->updates_size <= UPDATES_CAPACITY);
440
441         while (threadpool_io->updates_size == UPDATES_CAPACITY) {
442                 /* we wait for updates to be applied in the selector_thread and we loop
443                  * as long as none are available. if it happends too much, then we need
444                  * to increase UPDATES_CAPACITY */
445                 mono_coop_cond_wait (&threadpool_io->updates_cond, &threadpool_io->updates_lock);
446         }
447
448         g_assert (threadpool_io->updates_size < UPDATES_CAPACITY);
449
450         update = &threadpool_io->updates [threadpool_io->updates_size ++];
451
452         return update;
453 }
454
455 static void
456 wakeup_pipes_init (void)
457 {
458 #if !defined(HOST_WIN32)
459         if (pipe (threadpool_io->wakeup_pipes) == -1)
460                 g_error ("wakeup_pipes_init: pipe () failed, error (%d) %s\n", errno, g_strerror (errno));
461         if (fcntl (threadpool_io->wakeup_pipes [0], F_SETFL, O_NONBLOCK) == -1)
462                 g_error ("wakeup_pipes_init: fcntl () failed, error (%d) %s\n", errno, g_strerror (errno));
463 #else
464         struct sockaddr_in client;
465         struct sockaddr_in server;
466         SOCKET server_sock;
467         gulong arg;
468         gint size;
469
470         server_sock = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
471         g_assert (server_sock != INVALID_SOCKET);
472         threadpool_io->wakeup_pipes [1] = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
473         g_assert (threadpool_io->wakeup_pipes [1] != INVALID_SOCKET);
474
475         server.sin_family = AF_INET;
476         server.sin_addr.s_addr = inet_addr ("127.0.0.1");
477         server.sin_port = 0;
478         if (bind (server_sock, (SOCKADDR*) &server, sizeof (server)) == SOCKET_ERROR) {
479                 closesocket (server_sock);
480                 g_error ("wakeup_pipes_init: bind () failed, error (%d)\n", WSAGetLastError ());
481         }
482
483         size = sizeof (server);
484         if (getsockname (server_sock, (SOCKADDR*) &server, &size) == SOCKET_ERROR) {
485                 closesocket (server_sock);
486                 g_error ("wakeup_pipes_init: getsockname () failed, error (%d)\n", WSAGetLastError ());
487         }
488         if (listen (server_sock, 1024) == SOCKET_ERROR) {
489                 closesocket (server_sock);
490                 g_error ("wakeup_pipes_init: listen () failed, error (%d)\n", WSAGetLastError ());
491         }
492         if (connect ((SOCKET) threadpool_io->wakeup_pipes [1], (SOCKADDR*) &server, sizeof (server)) == SOCKET_ERROR) {
493                 closesocket (server_sock);
494                 g_error ("wakeup_pipes_init: connect () failed, error (%d)\n", WSAGetLastError ());
495         }
496
497         size = sizeof (client);
498         threadpool_io->wakeup_pipes [0] = accept (server_sock, (SOCKADDR *) &client, &size);
499         g_assert (threadpool_io->wakeup_pipes [0] != INVALID_SOCKET);
500
501         arg = 1;
502         if (ioctlsocket (threadpool_io->wakeup_pipes [0], FIONBIO, &arg) == SOCKET_ERROR) {
503                 closesocket (threadpool_io->wakeup_pipes [0]);
504                 closesocket (server_sock);
505                 g_error ("wakeup_pipes_init: ioctlsocket () failed, error (%d)\n", WSAGetLastError ());
506         }
507
508         closesocket (server_sock);
509 #endif
510 }
511
512 static void
513 initialize (void)
514 {
515         g_assert (!threadpool_io);
516         threadpool_io = g_new0 (ThreadPoolIO, 1);
517         g_assert (threadpool_io);
518
519         mono_coop_mutex_init (&threadpool_io->updates_lock);
520         mono_coop_cond_init (&threadpool_io->updates_cond);
521         mono_gc_register_root ((char *)&threadpool_io->updates [0], sizeof (threadpool_io->updates), MONO_GC_DESCRIPTOR_NULL, MONO_ROOT_SOURCE_THREAD_POOL, "i/o thread pool updates list");
522
523         threadpool_io->updates_size = 0;
524
525         threadpool_io->backend = backend_poll;
526         if (g_getenv ("MONO_ENABLE_AIO") != NULL) {
527 #if defined(HAVE_EPOLL)
528                 threadpool_io->backend = backend_epoll;
529 #elif defined(HAVE_KQUEUE)
530                 threadpool_io->backend = backend_kqueue;
531 #endif
532         }
533
534         wakeup_pipes_init ();
535
536         if (!threadpool_io->backend.init (threadpool_io->wakeup_pipes [0]))
537                 g_error ("initialize: backend->init () failed");
538
539         if (!mono_thread_create_internal (mono_get_root_domain (), selector_thread, NULL, TRUE, SMALL_STACK))
540                 g_error ("initialize: mono_thread_create_internal () failed");
541 }
542
543 static void
544 cleanup (void)
545 {
546         /* we make the assumption along the code that we are
547          * cleaning up only if the runtime is shutting down */
548         g_assert (mono_runtime_is_shutting_down ());
549
550         selector_thread_wakeup ();
551         while (io_selector_running)
552                 mono_thread_info_usleep (1000);
553
554         mono_coop_mutex_destroy (&threadpool_io->updates_lock);
555         mono_coop_cond_destroy (&threadpool_io->updates_cond);
556
557         threadpool_io->backend.cleanup ();
558
559 #if !defined(HOST_WIN32)
560         close (threadpool_io->wakeup_pipes [0]);
561         close (threadpool_io->wakeup_pipes [1]);
562 #else
563         closesocket (threadpool_io->wakeup_pipes [0]);
564         closesocket (threadpool_io->wakeup_pipes [1]);
565 #endif
566
567         g_assert (threadpool_io);
568         g_free (threadpool_io);
569         threadpool_io = NULL;
570         g_assert (!threadpool_io);
571 }
572
573 void
574 mono_threadpool_ms_io_cleanup (void)
575 {
576         mono_lazy_cleanup (&io_status, cleanup);
577 }
578
579 void
580 ves_icall_System_IOSelector_Add (gpointer handle, MonoIOSelectorJob *job)
581 {
582         ThreadPoolIOUpdate *update;
583
584         g_assert (handle >= 0);
585
586         g_assert (job->operation == EVENT_IN ^ job->operation == EVENT_OUT);
587         g_assert (job->callback);
588
589         if (mono_runtime_is_shutting_down ())
590                 return;
591         if (mono_domain_is_unloading (mono_object_domain (job)))
592                 return;
593
594         mono_lazy_initialize (&io_status, initialize);
595
596         mono_coop_mutex_lock (&threadpool_io->updates_lock);
597
598         update = update_get_new ();
599         update->type = UPDATE_ADD;
600         update->data.add.fd = GPOINTER_TO_INT (handle);
601         update->data.add.job = job;
602         mono_memory_barrier (); /* Ensure this is safely published before we wake up the selector */
603
604         selector_thread_wakeup ();
605
606         mono_coop_mutex_unlock (&threadpool_io->updates_lock);
607 }
608
609 void
610 ves_icall_System_IOSelector_Remove (gpointer handle)
611 {
612         mono_threadpool_ms_io_remove_socket (GPOINTER_TO_INT (handle));
613 }
614
615 void
616 mono_threadpool_ms_io_remove_socket (int fd)
617 {
618         ThreadPoolIOUpdate *update;
619
620         if (!mono_lazy_is_initialized (&io_status))
621                 return;
622
623         mono_coop_mutex_lock (&threadpool_io->updates_lock);
624
625         update = update_get_new ();
626         update->type = UPDATE_REMOVE_SOCKET;
627         update->data.add.fd = fd;
628         mono_memory_barrier (); /* Ensure this is safely published before we wake up the selector */
629
630         selector_thread_wakeup ();
631
632         mono_coop_cond_wait (&threadpool_io->updates_cond, &threadpool_io->updates_lock);
633
634         mono_coop_mutex_unlock (&threadpool_io->updates_lock);
635 }
636
637 void
638 mono_threadpool_ms_io_remove_domain_jobs (MonoDomain *domain)
639 {
640         ThreadPoolIOUpdate *update;
641
642         if (!mono_lazy_is_initialized (&io_status))
643                 return;
644
645         mono_coop_mutex_lock (&threadpool_io->updates_lock);
646
647         update = update_get_new ();
648         update->type = UPDATE_REMOVE_DOMAIN;
649         update->data.remove_domain.domain = domain;
650         mono_memory_barrier (); /* Ensure this is safely published before we wake up the selector */
651
652         selector_thread_wakeup ();
653
654         mono_coop_cond_wait (&threadpool_io->updates_cond, &threadpool_io->updates_lock);
655
656         mono_coop_mutex_unlock (&threadpool_io->updates_lock);
657 }
658
659 #else
660
661 void
662 ves_icall_System_IOSelector_Add (gpointer handle, MonoIOSelectorJob *job)
663 {
664         g_assert_not_reached ();
665 }
666
667 void
668 ves_icall_System_IOSelector_Remove (gpointer handle)
669 {
670         g_assert_not_reached ();
671 }
672
673 void
674 mono_threadpool_ms_io_cleanup (void)
675 {
676         g_assert_not_reached ();
677 }
678
679 void
680 mono_threadpool_ms_io_remove_socket (int fd)
681 {
682         g_assert_not_reached ();
683 }
684
685 void
686 mono_threadpool_ms_io_remove_domain_jobs (MonoDomain *domain)
687 {
688         g_assert_not_reached ();
689 }
690
691 #endif