[threadpool-ms-io] Remove unnecessary asserts. Fixes #32947.
[mono.git] / mono / metadata / threadpool-ms-io.c
1 /*
2  * threadpool-ms-io.c: Microsoft IO threadpool runtime support
3  *
4  * Author:
5  *      Ludovic Henry (ludovic.henry@xamarin.com)
6  *
7  * Copyright 2015 Xamarin, Inc (http://www.xamarin.com)
8  */
9
10 #include <config.h>
11
12 #ifndef DISABLE_SOCKETS
13
14 #include <glib.h>
15
16 #if defined(HOST_WIN32)
17 #include <windows.h>
18 #else
19 #include <errno.h>
20 #include <fcntl.h>
21 #endif
22
23 #include <mono/metadata/gc-internal.h>
24 #include <mono/metadata/mono-mlist.h>
25 #include <mono/metadata/threadpool-ms.h>
26 #include <mono/metadata/threadpool-ms-io.h>
27 #include <mono/utils/atomic.h>
28 #include <mono/utils/mono-poll.h>
29 #include <mono/utils/mono-threads.h>
30 #include <mono/utils/mono-lazy-init.h>
31
32 typedef struct {
33         gboolean (*init) (gint wakeup_pipe_fd);
34         void     (*cleanup) (void);
35         void     (*register_fd) (gint fd, gint events, gboolean is_new);
36         gint     (*event_wait) (void);
37         gint     (*event_get_fd_max) (void);
38         gint     (*event_get_fd_at) (gint i, gint *events);
39 } ThreadPoolIOBackend;
40
41 #include "threadpool-ms-io-epoll.c"
42 #include "threadpool-ms-io-kqueue.c"
43 #include "threadpool-ms-io-poll.c"
44
45 /* Keep in sync with System.Net.Sockets.Socket.SocketOperation */
46 enum {
47         AIO_OP_FIRST,
48         AIO_OP_ACCEPT = 0,
49         AIO_OP_CONNECT,
50         AIO_OP_RECEIVE,
51         AIO_OP_RECEIVEFROM,
52         AIO_OP_SEND,
53         AIO_OP_SENDTO,
54         AIO_OP_RECV_JUST_CALLBACK,
55         AIO_OP_SEND_JUST_CALLBACK,
56         AIO_OP_READPIPE,
57         AIO_OP_CONSOLE2,
58         AIO_OP_DISCONNECT,
59         AIO_OP_ACCEPTRECEIVE,
60         AIO_OP_RECEIVE_BUFFERS,
61         AIO_OP_SEND_BUFFERS,
62         AIO_OP_LAST
63 };
64
65 typedef struct {
66         gint fd;
67         MonoSocketAsyncResult *sockares;
68 } ThreadPoolIOUpdate;
69
70 typedef struct {
71         ThreadPoolIOBackend backend;
72
73         mono_mutex_t lock;
74
75         mono_cond_t updates_signal;
76
77         MonoGHashTable *states;
78
79         ThreadPoolIOUpdate *updates;
80         guint updates_size;
81         guint updates_capacity;
82
83 #if !defined(HOST_WIN32)
84         gint wakeup_pipes [2];
85 #else
86         SOCKET wakeup_pipes [2];
87 #endif
88 } ThreadPoolIO;
89
90 static mono_lazy_init_t io_status = MONO_LAZY_INIT_STATUS_NOT_INITIALIZED;
91
92 static gboolean io_selector_running = FALSE;
93
94 static ThreadPoolIO* threadpool_io;
95
96 static int
97 get_events_from_sockares (MonoSocketAsyncResult *ares)
98 {
99         switch (ares->operation) {
100         case AIO_OP_ACCEPT:
101         case AIO_OP_RECEIVE:
102         case AIO_OP_RECV_JUST_CALLBACK:
103         case AIO_OP_RECEIVEFROM:
104         case AIO_OP_READPIPE:
105         case AIO_OP_ACCEPTRECEIVE:
106         case AIO_OP_RECEIVE_BUFFERS:
107                 return MONO_POLLIN;
108         case AIO_OP_SEND:
109         case AIO_OP_SEND_JUST_CALLBACK:
110         case AIO_OP_SENDTO:
111         case AIO_OP_CONNECT:
112         case AIO_OP_SEND_BUFFERS:
113         case AIO_OP_DISCONNECT:
114                 return MONO_POLLOUT;
115         default:
116                 g_assert_not_reached ();
117         }
118 }
119
120 static MonoSocketAsyncResult*
121 get_sockares_for_event (MonoMList **list, gint event)
122 {
123         MonoMList *current;
124
125         g_assert (list);
126
127         for (current = *list; current; current = mono_mlist_next (current)) {
128                 MonoSocketAsyncResult *ares = (MonoSocketAsyncResult*) mono_mlist_get_data (current);
129                 if (get_events_from_sockares (ares) == event) {
130                         *list = mono_mlist_remove_item (*list, current);
131                         return ares;
132                 }
133         }
134
135         return NULL;
136 }
137
138 static gint
139 get_events (MonoMList *list)
140 {
141         MonoMList *current;
142         gint events = 0;
143
144         for (current = list; current; current = mono_mlist_next (current)) {
145                 MonoSocketAsyncResult *ares = (MonoSocketAsyncResult*) mono_mlist_get_data (current);
146                 if (ares)
147                         events |= get_events_from_sockares (ares);
148         }
149
150         return events;
151 }
152
153 static void
154 selector_thread_wakeup (void);
155
156 /*
157  * If sockares is NULL, then it means we want to delete the corresponding fd
158  */
159 static void
160 update_add (gint fd, MonoSocketAsyncResult *sockares)
161 {
162         ThreadPoolIOUpdate *update;
163
164         mono_mutex_lock (&threadpool_io->lock);
165
166         threadpool_io->updates_size += 1;
167         if (threadpool_io->updates_size > threadpool_io->updates_capacity) {
168                 ThreadPoolIOUpdate *updates_new, *updates_old;
169                 gint updates_new_capacity, updates_old_capacity;
170
171                 updates_old_capacity = threadpool_io->updates_capacity;
172                 updates_new_capacity = updates_old_capacity + 16;
173
174                 updates_old = threadpool_io->updates;
175                 updates_new = mono_gc_alloc_fixed (sizeof (ThreadPoolIOUpdate) * updates_new_capacity, MONO_GC_DESCRIPTOR_NULL);
176                 g_assert (updates_new);
177
178                 if (updates_old)
179                         memcpy (updates_new, updates_old, sizeof (ThreadPoolIOUpdate) * updates_old_capacity);
180
181                 threadpool_io->updates = updates_new;
182                 threadpool_io->updates_capacity = updates_new_capacity;
183
184                 if (updates_old)
185                         mono_gc_free_fixed (updates_old);
186         }
187
188         update = &threadpool_io->updates [threadpool_io->updates_size - 1];
189         update->fd = fd;
190         update->sockares = sockares;
191
192         selector_thread_wakeup ();
193
194         mono_cond_wait (&threadpool_io->updates_signal, &threadpool_io->lock);
195
196         mono_mutex_unlock (&threadpool_io->lock);
197 }
198
199 static void
200 update_drain (void (*callback) (gint fd, gint events, gboolean is_new))
201 {
202         gint i;
203
204         mono_mutex_lock (&threadpool_io->lock);
205
206         for (i = 0; i < threadpool_io->updates_size; ++i) {
207                 ThreadPoolIOUpdate *update;
208                 MonoMList *list = NULL;
209                 gpointer k;
210                 gboolean is_new;
211
212                 update = &threadpool_io->updates [i];
213
214                 is_new = !mono_g_hash_table_lookup_extended (threadpool_io->states, GINT_TO_POINTER (update->fd), &k, (gpointer*) &list);
215
216                 if (!update->sockares) {
217                         callback (update->fd, 0, is_new);
218                 } else {
219                         list = mono_mlist_append (list, (MonoObject*) update->sockares);
220                         mono_g_hash_table_replace (threadpool_io->states, update->sockares->handle, list);
221
222                         callback (update->fd, get_events (list), is_new);
223                 }
224         }
225
226         mono_cond_broadcast (&threadpool_io->updates_signal);
227
228         if (threadpool_io->updates_size > 0) {
229                 ThreadPoolIOUpdate *updates_old;
230
231                 threadpool_io->updates_size = 0;
232                 threadpool_io->updates_capacity = 16;
233
234                 updates_old = threadpool_io->updates;
235
236                 threadpool_io->updates = mono_gc_alloc_fixed (sizeof (ThreadPoolIOUpdate) * threadpool_io->updates_capacity, MONO_GC_DESCRIPTOR_NULL);
237                 g_assert (threadpool_io->updates);
238
239                 mono_gc_free_fixed (updates_old);
240         }
241
242         mono_mutex_unlock (&threadpool_io->lock);
243 }
244
245 static void
246 update_remove (gboolean (*predicate) (ThreadPoolIOUpdate *update, gpointer user_data), gpointer user_data)
247 {
248         gint i;
249
250         mono_mutex_lock (&threadpool_io->lock);
251
252         for (i = 0; i < threadpool_io->updates_size; ++i) {
253                 if (predicate (&threadpool_io->updates [i], user_data)) {
254                         if (i < threadpool_io->updates_size - 1)
255                                 memmove (threadpool_io->updates + i, threadpool_io->updates + i + 1, sizeof (ThreadPoolIOUpdate) * threadpool_io->updates_size - i - 1);
256                         memset (threadpool_io->updates + threadpool_io->updates_size - 1, 0, sizeof (ThreadPoolIOUpdate));
257
258                         threadpool_io->updates_size --;
259                         i --;
260                 }
261         }
262
263         mono_mutex_unlock (&threadpool_io->lock);
264 }
265
266 static void
267 selector_thread_wakeup (void)
268 {
269         gchar msg = 'c';
270         gint written;
271
272         for (;;) {
273 #if !defined(HOST_WIN32)
274                 written = write (threadpool_io->wakeup_pipes [1], &msg, 1);
275                 if (written == 1)
276                         break;
277                 if (written == -1) {
278                         g_warning ("selector_thread_wakeup: write () failed, error (%d) %s\n", errno, g_strerror (errno));
279                         break;
280                 }
281 #else
282                 written = send (threadpool_io->wakeup_pipes [1], &msg, 1, 0);
283                 if (written == 1)
284                         break;
285                 if (written == SOCKET_ERROR) {
286                         g_warning ("selector_thread_wakeup: write () failed, error (%d)\n", WSAGetLastError ());
287                         break;
288                 }
289 #endif
290         }
291 }
292
293 static void
294 selector_thread_wakeup_drain_pipes (void)
295 {
296         gchar buffer [128];
297         gint received;
298
299         for (;;) {
300 #if !defined(HOST_WIN32)
301                 received = read (threadpool_io->wakeup_pipes [0], buffer, sizeof (buffer));
302                 if (received == 0)
303                         break;
304                 if (received == -1) {
305                         if (errno != EINTR && errno != EAGAIN)
306                                 g_warning ("selector_thread_wakeup_drain_pipes: read () failed, error (%d) %s\n", errno, g_strerror (errno));
307                         break;
308                 }
309 #else
310                 received = recv (threadpool_io->wakeup_pipes [0], buffer, sizeof (buffer), 0);
311                 if (received == 0)
312                         break;
313                 if (received == SOCKET_ERROR) {
314                         if (WSAGetLastError () != WSAEINTR && WSAGetLastError () != WSAEWOULDBLOCK)
315                                 g_warning ("selector_thread_wakeup_drain_pipes: recv () failed, error (%d) %s\n", WSAGetLastError ());
316                         break;
317                 }
318 #endif
319         }
320 }
321
322 static void
323 selector_thread (gpointer data)
324 {
325         io_selector_running = TRUE;
326
327         if (mono_runtime_is_shutting_down ()) {
328                 io_selector_running = FALSE;
329                 return;
330         }
331
332         mono_mutex_lock (&threadpool_io->lock);
333
334         for (;;) {
335                 guint i;
336                 guint max;
337                 gint ready = 0;
338
339                 update_drain (threadpool_io->backend.register_fd);
340
341                 mono_mutex_unlock (&threadpool_io->lock);
342
343                 mono_gc_set_skip_thread (TRUE);
344
345                 ready = threadpool_io->backend.event_wait ();
346
347                 mono_gc_set_skip_thread (FALSE);
348
349                 mono_mutex_lock (&threadpool_io->lock);
350
351                 if (ready == -1 || mono_runtime_is_shutting_down ())
352                         break;
353
354                 max = threadpool_io->backend.event_get_fd_max ();
355
356                 for (i = 0; i < max && ready > 0; ++i) {
357                         gint events;
358                         gint fd = threadpool_io->backend.event_get_fd_at (i, &events);
359
360                         if (fd == -1)
361                                 continue;
362
363                         if (fd == threadpool_io->wakeup_pipes [0]) {
364                                 selector_thread_wakeup_drain_pipes ();
365                         } else {
366                                 MonoMList *list = NULL;
367                                 gpointer k;
368
369                                 if (mono_g_hash_table_lookup_extended (threadpool_io->states, GINT_TO_POINTER (fd), &k, (gpointer*) &list)) {
370                                         if (list && (events & MONO_POLLIN) != 0) {
371                                                 MonoSocketAsyncResult *sockares = get_sockares_for_event (&list, MONO_POLLIN);
372                                                 if (sockares)
373                                                         mono_threadpool_ms_enqueue_work_item (((MonoObject*) sockares)->vtable->domain, (MonoObject*) sockares);
374                                         }
375                                         if (list && (events & MONO_POLLOUT) != 0) {
376                                                 MonoSocketAsyncResult *sockares = get_sockares_for_event (&list, MONO_POLLOUT);
377                                                 if (sockares)
378                                                         mono_threadpool_ms_enqueue_work_item (((MonoObject*) sockares)->vtable->domain, (MonoObject*) sockares);
379                                         }
380
381                                         if (!list)
382                                                 mono_g_hash_table_remove (threadpool_io->states, GINT_TO_POINTER (fd));
383                                         else
384                                                 mono_g_hash_table_replace (threadpool_io->states, GINT_TO_POINTER (fd), list);
385
386                                         threadpool_io->backend.register_fd (fd, get_events (list), FALSE);
387                                 }
388                         }
389
390                         ready -= 1;
391                 }
392         }
393
394         mono_mutex_unlock (&threadpool_io->lock);
395
396         io_selector_running = FALSE;
397 }
398
399 static void
400 wakeup_pipes_init (void)
401 {
402 #if !defined(HOST_WIN32)
403         if (pipe (threadpool_io->wakeup_pipes) == -1)
404                 g_error ("wakeup_pipes_init: pipe () failed, error (%d) %s\n", errno, g_strerror (errno));
405         if (fcntl (threadpool_io->wakeup_pipes [0], F_SETFL, O_NONBLOCK) == -1)
406                 g_error ("wakeup_pipes_init: fcntl () failed, error (%d) %s\n", errno, g_strerror (errno));
407 #else
408         struct sockaddr_in client;
409         struct sockaddr_in server;
410         SOCKET server_sock;
411         gulong arg;
412         gint size;
413
414         server_sock = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
415         g_assert (server_sock != INVALID_SOCKET);
416         threadpool_io->wakeup_pipes [1] = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
417         g_assert (threadpool_io->wakeup_pipes [1] != INVALID_SOCKET);
418
419         server.sin_family = AF_INET;
420         server.sin_addr.s_addr = inet_addr ("127.0.0.1");
421         server.sin_port = 0;
422         if (bind (server_sock, (SOCKADDR*) &server, sizeof (server)) == SOCKET_ERROR) {
423                 closesocket (server_sock);
424                 g_error ("wakeup_pipes_init: bind () failed, error (%d)\n", WSAGetLastError ());
425         }
426
427         size = sizeof (server);
428         if (getsockname (server_sock, (SOCKADDR*) &server, &size) == SOCKET_ERROR) {
429                 closesocket (server_sock);
430                 g_error ("wakeup_pipes_init: getsockname () failed, error (%d)\n", WSAGetLastError ());
431         }
432         if (listen (server_sock, 1024) == SOCKET_ERROR) {
433                 closesocket (server_sock);
434                 g_error ("wakeup_pipes_init: listen () failed, error (%d)\n", WSAGetLastError ());
435         }
436         if (connect ((SOCKET) threadpool_io->wakeup_pipes [1], (SOCKADDR*) &server, sizeof (server)) == SOCKET_ERROR) {
437                 closesocket (server_sock);
438                 g_error ("wakeup_pipes_init: connect () failed, error (%d)\n", WSAGetLastError ());
439         }
440
441         size = sizeof (client);
442         threadpool_io->wakeup_pipes [0] = accept (server_sock, (SOCKADDR *) &client, &size);
443         g_assert (threadpool_io->wakeup_pipes [0] != INVALID_SOCKET);
444
445         arg = 1;
446         if (ioctlsocket (threadpool_io->wakeup_pipes [0], FIONBIO, &arg) == SOCKET_ERROR) {
447                 closesocket (threadpool_io->wakeup_pipes [0]);
448                 closesocket (server_sock);
449                 g_error ("wakeup_pipes_init: ioctlsocket () failed, error (%d)\n", WSAGetLastError ());
450         }
451
452         closesocket (server_sock);
453 #endif
454 }
455
456 static void
457 initialize (void)
458 {
459         g_assert (!threadpool_io);
460         threadpool_io = g_new0 (ThreadPoolIO, 1);
461         g_assert (threadpool_io);
462
463         mono_mutex_init_recursive (&threadpool_io->lock);
464
465         mono_cond_init (&threadpool_io->updates_signal, NULL);
466
467         threadpool_io->states = mono_g_hash_table_new_type (g_direct_hash, g_direct_equal, MONO_HASH_VALUE_GC);
468         MONO_GC_REGISTER_ROOT_FIXED (threadpool_io->states);
469
470         threadpool_io->updates = NULL;
471         threadpool_io->updates_size = 0;
472         threadpool_io->updates_capacity = 0;
473
474 #if defined(HAVE_EPOLL)
475         threadpool_io->backend = backend_epoll;
476 #elif defined(HAVE_KQUEUE)
477         threadpool_io->backend = backend_kqueue;
478 #else
479         threadpool_io->backend = backend_poll;
480 #endif
481         if (g_getenv ("MONO_DISABLE_AIO") != NULL)
482                 threadpool_io->backend = backend_poll;
483
484         wakeup_pipes_init ();
485
486         if (!threadpool_io->backend.init (threadpool_io->wakeup_pipes [0]))
487                 g_error ("initialize: backend->init () failed");
488
489         if (!mono_thread_create_internal (mono_get_root_domain (), selector_thread, NULL, TRUE, SMALL_STACK))
490                 g_error ("initialize: mono_thread_create_internal () failed");
491 }
492
493 static void
494 cleanup (void)
495 {
496         /* we make the assumption along the code that we are
497          * cleaning up only if the runtime is shutting down */
498         g_assert (mono_runtime_is_shutting_down ());
499
500         selector_thread_wakeup ();
501         while (io_selector_running)
502                 g_usleep (1000);
503
504         mono_mutex_destroy (&threadpool_io->lock);
505
506         mono_cond_destroy (&threadpool_io->updates_signal);
507
508         MONO_GC_UNREGISTER_ROOT (threadpool_io->states);
509         mono_g_hash_table_destroy (threadpool_io->states);
510
511         if (threadpool_io->updates)
512                 mono_gc_free_fixed (threadpool_io->updates);
513
514         threadpool_io->backend.cleanup ();
515
516 #if !defined(HOST_WIN32)
517         close (threadpool_io->wakeup_pipes [0]);
518         close (threadpool_io->wakeup_pipes [1]);
519 #else
520         closesocket (threadpool_io->wakeup_pipes [0]);
521         closesocket (threadpool_io->wakeup_pipes [1]);
522 #endif
523
524         g_assert (threadpool_io);
525         g_free (threadpool_io);
526         threadpool_io = NULL;
527         g_assert (!threadpool_io);
528 }
529
530 static gboolean
531 is_socket_async_callback (MonoImage *system_image, MonoClass *class)
532 {
533         MonoClass *socket_async_callback_class = NULL;
534
535         socket_async_callback_class = mono_class_from_name (system_image, "System.Net.Sockets", "SocketAsyncCallback");
536
537         return class == socket_async_callback_class;
538 }
539
540 static gboolean
541 is_async_read_handler (MonoImage *system_image, MonoClass *class)
542 {
543         MonoClass *async_read_handler_class = NULL;
544
545         async_read_handler_class = mono_class_from_name (system_image, "System.Diagnostics", "Process/AsyncReadHandler");
546
547         return class == async_read_handler_class;
548 }
549
550 gboolean
551 mono_threadpool_ms_is_io (MonoObject *target, MonoObject *state)
552 {
553         MonoImage *system_image;
554         MonoSocketAsyncResult *sockares;
555
556         system_image = mono_image_loaded ("System");
557         if (!system_image)
558                 return FALSE;
559
560         if (!is_socket_async_callback (system_image, target->vtable->klass) && !is_async_read_handler (system_image, target->vtable->klass))
561                 return FALSE;
562
563         sockares = (MonoSocketAsyncResult*) state;
564         if (sockares->operation < AIO_OP_FIRST || sockares->operation >= AIO_OP_LAST)
565                 return FALSE;
566
567         return TRUE;
568 }
569
570 void
571 mono_threadpool_ms_io_cleanup (void)
572 {
573         mono_lazy_cleanup (&io_status, cleanup);
574 }
575
576 MonoAsyncResult *
577 mono_threadpool_ms_io_add (MonoAsyncResult *ares, MonoSocketAsyncResult *sockares)
578 {
579         g_assert (ares);
580         g_assert (sockares);
581
582         if (mono_runtime_is_shutting_down ())
583                 return NULL;
584
585         mono_lazy_initialize (&io_status, initialize);
586
587         MONO_OBJECT_SETREF (sockares, ares, ares);
588
589         update_add (GPOINTER_TO_INT (sockares->handle), sockares);
590
591         return ares;
592 }
593
594 static gboolean
595 remove_update_for_socket (ThreadPoolIOUpdate *update, gpointer user_data)
596 {
597         if (!update->sockares)
598                 return FALSE;
599
600         return GPOINTER_TO_INT (update->sockares->handle) == GPOINTER_TO_INT (user_data);
601 }
602
603 void
604 mono_threadpool_ms_io_remove_socket (int fd)
605 {
606         MonoMList *list = NULL;
607         gpointer k;
608
609         if (!mono_lazy_is_initialized (&io_status))
610                 return;
611
612         mono_mutex_lock (&threadpool_io->lock);
613
614         g_assert (threadpool_io->states);
615
616         if (mono_g_hash_table_lookup_extended (threadpool_io->states, GINT_TO_POINTER (fd), &k, (gpointer*) &list))
617                 mono_g_hash_table_remove (threadpool_io->states, GINT_TO_POINTER (fd));
618
619         update_remove (remove_update_for_socket, GINT_TO_POINTER (fd));
620
621         mono_mutex_unlock (&threadpool_io->lock);
622
623         for (; list; list = mono_mlist_remove_item (list, list)) {
624                 MonoSocketAsyncResult *sockares = (MonoSocketAsyncResult*) mono_mlist_get_data (list);
625
626                 if (!sockares)
627                         continue;
628
629                 switch (sockares->operation) {
630                 case AIO_OP_RECEIVE:
631                         sockares->operation = AIO_OP_RECV_JUST_CALLBACK;
632                         break;
633                 case AIO_OP_SEND:
634                         sockares->operation = AIO_OP_SEND_JUST_CALLBACK;
635                         break;
636                 }
637
638                 mono_threadpool_ms_enqueue_work_item (((MonoObject*) sockares)->vtable->domain, (MonoObject*) sockares);
639         }
640
641         update_add (fd, NULL);
642 }
643
644 static gboolean
645 remove_sockstate_for_domain (gpointer key, gpointer value, gpointer user_data)
646 {
647         MonoMList *list;
648         gboolean remove = FALSE;
649
650         for (list = value; list; list = mono_mlist_next (list)) {
651                 MonoObject *data = mono_mlist_get_data (list);
652                 if (mono_object_domain (data) == user_data) {
653                         remove = TRUE;
654                         mono_mlist_set_data (list, NULL);
655                 }
656         }
657
658         //FIXME is there some sort of additional unregistration we need to perform here?
659         return remove;
660 }
661
662 static gboolean
663 remove_update_for_domain (ThreadPoolIOUpdate *update, gpointer user_data)
664 {
665         if (!update->sockares)
666                 return FALSE;
667
668         return mono_object_domain (update->sockares) == (MonoDomain*) user_data;
669 }
670
671 void
672 mono_threadpool_ms_io_remove_domain_jobs (MonoDomain *domain)
673 {
674         if (!mono_lazy_is_initialized (&io_status))
675                 return;
676
677         mono_mutex_lock (&threadpool_io->lock);
678
679         mono_g_hash_table_foreach_remove (threadpool_io->states, remove_sockstate_for_domain, domain);
680
681         update_remove (remove_update_for_domain, domain);
682
683         mono_mutex_unlock (&threadpool_io->lock);
684 }
685
686 void
687 icall_append_io_job (MonoObject *target, MonoSocketAsyncResult *state)
688 {
689         MonoAsyncResult *ares;
690
691         /* Don't call mono_async_result_new() to avoid capturing the context */
692         ares = (MonoAsyncResult *) mono_object_new (mono_domain_get (), mono_defaults.asyncresult_class);
693         MONO_OBJECT_SETREF (ares, async_delegate, target);
694         MONO_OBJECT_SETREF (ares, async_state, state);
695
696         mono_threadpool_ms_io_add (ares, state);
697         return;
698 }
699
700 #else
701
702 gboolean
703 mono_threadpool_ms_is_io (MonoObject *target, MonoObject *state)
704 {
705         return FALSE;
706 }
707
708 void
709 mono_threadpool_ms_io_cleanup (void)
710 {
711         g_assert_not_reached ();
712 }
713
714 MonoAsyncResult *
715 mono_threadpool_ms_io_add (MonoAsyncResult *ares, MonoSocketAsyncResult *sockares)
716 {
717         g_assert_not_reached ();
718 }
719
720 void
721 mono_threadpool_ms_io_remove_socket (int fd)
722 {
723         g_assert_not_reached ();
724 }
725
726 void
727 mono_threadpool_ms_io_remove_domain_jobs (MonoDomain *domain)
728 {
729         g_assert_not_reached ();
730 }
731
732 void
733 icall_append_io_job (MonoObject *target, MonoSocketAsyncResult *state)
734 {
735         g_assert_not_reached ();
736 }
737
738 #endif