2009-07-11 Michael Barker <mike@middlesoft.co.uk>
[mono.git] / mono / metadata / threadpool.c
1 /*
2  * threadpool.c: global thread pool
3  *
4  * Authors:
5  *   Dietmar Maurer (dietmar@ximian.com)
6  *   Gonzalo Paniagua Javier (gonzalo@ximian.com)
7  *
8  * Copyright 2001-2003 Ximian, Inc (http://www.ximian.com)
9  * Copyright 2004-2009 Novell, Inc (http://www.novell.com)
10  */
11
12 #include <config.h>
13 #include <glib.h>
14
15 #define THREADS_PER_CPU 10 /* 20 + THREADS_PER_CPU * number of CPUs */
16 #define THREAD_EXIT_TIMEOUT 1000
17
18 #include <mono/metadata/domain-internals.h>
19 #include <mono/metadata/tabledefs.h>
20 #include <mono/metadata/threads.h>
21 #include <mono/metadata/threads-types.h>
22 #include <mono/metadata/threadpool-internals.h>
23 #include <mono/metadata/exception.h>
24 #include <mono/metadata/file-io.h>
25 #include <mono/metadata/monitor.h>
26 #include <mono/metadata/mono-mlist.h>
27 #include <mono/metadata/marshal.h>
28 #include <mono/metadata/socket-io.h>
29 #include <mono/io-layer/io-layer.h>
30 #include <mono/metadata/gc-internal.h>
31 #include <mono/utils/mono-time.h>
32 #include <mono/utils/mono-proclib.h>
33 #include <errno.h>
34 #ifdef HAVE_SYS_TIME_H
35 #include <sys/time.h>
36 #endif
37 #include <sys/types.h>
38 #include <fcntl.h>
39 #ifdef HAVE_UNISTD_H
40 #include <unistd.h>
41 #endif
42 #include <string.h>
43 #ifdef HAVE_SYS_SOCKET_H
44 #include <sys/socket.h>
45 #endif
46 #include <mono/utils/mono-poll.h>
47 #ifdef HAVE_EPOLL
48 #include <sys/epoll.h>
49 #endif
50
51 #ifndef DISABLE_SOCKETS
52 #include "mono/io-layer/socket-wrappers.h"
53 #endif
54
55 #include "threadpool.h"
56
57 #define THREAD_WANTS_A_BREAK(t) ((t->state & (ThreadState_StopRequested | \
58                                                 ThreadState_SuspendRequested)) != 0)
59
60 #undef EPOLL_DEBUG
61
62 /* maximum number of worker threads */
63 static int mono_max_worker_threads;
64 static int mono_min_worker_threads;
65 static int mono_io_max_worker_threads;
66 static int mono_io_min_worker_threads;
67
68 /* current number of worker threads */
69 static int mono_worker_threads = 0;
70 static int io_worker_threads = 0;
71
72 /* current number of busy threads */
73 static int busy_worker_threads = 0;
74 static int busy_io_worker_threads;
75
76 /* mono_thread_pool_init called */
77 static int tp_inited;
78 /* started idle threads */
79 static int tp_idle_started;
80
81
82 /* we use this to store a reference to the AsyncResult to avoid GC */
83 static MonoGHashTable *ares_htable = NULL;
84
85 static CRITICAL_SECTION ares_lock;
86 static CRITICAL_SECTION io_queue_lock;
87 static int pending_io_items;
88
89 typedef struct {
90         CRITICAL_SECTION io_lock; /* access to sock_to_state */
91         int inited;
92         int pipe [2];
93         MonoGHashTable *sock_to_state;
94
95         HANDLE new_sem; /* access to newpfd and write side of the pipe */
96         mono_pollfd *newpfd;
97         gboolean epoll_disabled;
98 #ifdef HAVE_EPOLL
99         int epollfd;
100 #endif
101 } SocketIOData;
102
103 static SocketIOData socket_io_data;
104
105 /* we append a job */
106 static HANDLE job_added;
107 static HANDLE io_job_added;
108
109 /* Keep in sync with the System.MonoAsyncCall class which provides GC tracking */
110 typedef struct {
111         MonoObject         object;
112         MonoMethodMessage *msg;
113         MonoMethod        *cb_method;
114         MonoDelegate      *cb_target;
115         MonoObject        *state;
116         MonoObject        *res;
117         MonoArray         *out_args;
118         /* This is a HANDLE, we use guint64 so the managed object layout remains constant */
119         guint64           wait_event;
120 } ASyncCall;
121
122 typedef struct {
123         MonoArray *array;
124         int first_elem;
125         int next_elem;
126 } TPQueue;
127
128 static void async_invoke_thread (gpointer data);
129 static void append_job (CRITICAL_SECTION *cs, TPQueue *list, MonoObject *ar);
130 static void start_thread_or_queue (MonoAsyncResult *ares);
131 static void start_tpthread (MonoAsyncResult *ares);
132 static void mono_async_invoke (MonoAsyncResult *ares);
133 static MonoObject* dequeue_job (CRITICAL_SECTION *cs, TPQueue *list);
134 static void free_queue (TPQueue *list);
135
136 static TPQueue async_call_queue = {NULL, 0, 0};
137 static TPQueue async_io_queue = {NULL, 0, 0};
138
139 static MonoClass *async_call_klass;
140 static MonoClass *socket_async_call_klass;
141 static MonoClass *process_async_call_klass;
142
143 #define INIT_POLLFD(a, b, c) {(a)->fd = b; (a)->events = c; (a)->revents = 0;}
144 enum {
145         AIO_OP_FIRST,
146         AIO_OP_ACCEPT = 0,
147         AIO_OP_CONNECT,
148         AIO_OP_RECEIVE,
149         AIO_OP_RECEIVEFROM,
150         AIO_OP_SEND,
151         AIO_OP_SENDTO,
152         AIO_OP_RECV_JUST_CALLBACK,
153         AIO_OP_SEND_JUST_CALLBACK,
154         AIO_OP_READPIPE,
155         AIO_OP_LAST
156 };
157
158 #ifdef DISABLE_SOCKETS
159 #define socket_io_cleanup(x)
160 #else
161 static void
162 socket_io_cleanup (SocketIOData *data)
163 {
164         gint release;
165
166         if (data->inited == 0)
167                 return;
168
169         EnterCriticalSection (&data->io_lock);
170         data->inited = 0;
171 #ifdef PLATFORM_WIN32
172         closesocket (data->pipe [0]);
173         closesocket (data->pipe [1]);
174 #else
175         close (data->pipe [0]);
176         close (data->pipe [1]);
177 #endif
178         data->pipe [0] = -1;
179         data->pipe [1] = -1;
180         if (data->new_sem)
181                 CloseHandle (data->new_sem);
182         data->new_sem = NULL;
183         mono_g_hash_table_destroy (data->sock_to_state);
184         data->sock_to_state = NULL;
185         free_queue (&async_io_queue);
186         release = (gint) InterlockedCompareExchange (&io_worker_threads, 0, -1);
187         if (io_job_added)
188                 ReleaseSemaphore (io_job_added, release, NULL);
189         g_free (data->newpfd);
190         data->newpfd = NULL;
191 #ifdef HAVE_EPOLL
192         if (FALSE == data->epoll_disabled)
193                 close (data->epollfd);
194 #endif
195         LeaveCriticalSection (&data->io_lock);
196 }
197
198 static int
199 get_event_from_state (MonoSocketAsyncResult *state)
200 {
201         switch (state->operation) {
202         case AIO_OP_ACCEPT:
203         case AIO_OP_RECEIVE:
204         case AIO_OP_RECV_JUST_CALLBACK:
205         case AIO_OP_RECEIVEFROM:
206         case AIO_OP_READPIPE:
207                 return MONO_POLLIN;
208         case AIO_OP_SEND:
209         case AIO_OP_SEND_JUST_CALLBACK:
210         case AIO_OP_SENDTO:
211         case AIO_OP_CONNECT:
212                 return MONO_POLLOUT;
213         default: /* Should never happen */
214                 g_print ("get_event_from_state: unknown value in switch!!!\n");
215                 return 0;
216         }
217 }
218
219 static int
220 get_events_from_list (MonoMList *list)
221 {
222         MonoSocketAsyncResult *state;
223         int events = 0;
224
225         while (list && (state = (MonoSocketAsyncResult *)mono_mlist_get_data (list))) {
226                 events |= get_event_from_state (state);
227                 list = mono_mlist_next (list);
228         }
229
230         return events;
231 }
232
233 #define ICALL_RECV(x)   ves_icall_System_Net_Sockets_Socket_Receive_internal (\
234                                 (SOCKET)(gssize)x->handle, x->buffer, x->offset, x->size,\
235                                  x->socket_flags, &x->error);
236
237 #define ICALL_SEND(x)   ves_icall_System_Net_Sockets_Socket_Send_internal (\
238                                 (SOCKET)(gssize)x->handle, x->buffer, x->offset, x->size,\
239                                  x->socket_flags, &x->error);
240
241 #endif /* !DISABLE_SOCKETS */
242
243 static void
244 unregister_job (MonoAsyncResult *obj)
245 {
246         EnterCriticalSection (&ares_lock);
247         mono_g_hash_table_remove (ares_htable, obj);
248         LeaveCriticalSection (&ares_lock);      
249 }
250
251 static void
252 threadpool_jobs_inc (MonoObject *obj)
253 {
254         if (obj)
255                 InterlockedIncrement (&obj->vtable->domain->threadpool_jobs);
256 }
257
258 static gboolean
259 threadpool_jobs_dec (MonoObject *obj)
260 {
261         MonoDomain *domain = obj->vtable->domain;
262         int remaining_jobs = InterlockedDecrement (&domain->threadpool_jobs);
263         if (remaining_jobs == 0 && domain->cleanup_semaphore) {
264                 ReleaseSemaphore (domain->cleanup_semaphore, 1, NULL);
265                 return TRUE;
266         }
267         return FALSE;
268 }
269
270 #ifndef DISABLE_SOCKETS
271 static void
272 async_invoke_io_thread (gpointer data)
273 {
274         MonoDomain *domain;
275         MonoThread *thread;
276         const gchar *version;
277
278         thread = mono_thread_current ();
279
280         version = mono_get_runtime_info ()->framework_version;
281         for (;;) {
282                 MonoSocketAsyncResult *state;
283                 MonoAsyncResult *ar;
284
285                 state = (MonoSocketAsyncResult *) data;
286                 if (state) {
287                         InterlockedDecrement (&pending_io_items);
288                         ar = state->ares;
289                         switch (state->operation) {
290                         case AIO_OP_RECEIVE:
291                                 state->total = ICALL_RECV (state);
292                                 break;
293                         case AIO_OP_SEND:
294                                 state->total = ICALL_SEND (state);
295                                 break;
296                         }
297
298                         /* worker threads invokes methods in different domains,
299                          * so we need to set the right domain here */
300                         domain = ((MonoObject *)ar)->vtable->domain;
301
302                         g_assert (domain);
303
304                         if (domain->state == MONO_APPDOMAIN_UNLOADED || domain->state == MONO_APPDOMAIN_UNLOADING) {
305                                 threadpool_jobs_dec ((MonoObject *)ar);
306                                 unregister_job (ar);
307                                 data = NULL;
308                         } else {
309                                 mono_thread_push_appdomain_ref (domain);
310                                 if (threadpool_jobs_dec ((MonoObject *)ar)) {
311                                         unregister_job (ar);
312                                         data = NULL;
313                                         mono_thread_pop_appdomain_ref ();
314                                         continue;
315                                 }
316                                 if (mono_domain_set (domain, FALSE)) {
317                                         ASyncCall *ac;
318
319                                         mono_async_invoke (ar);
320                                         ac = (ASyncCall *) ar->object_data;
321                                         /*
322                                         if (ac->msg->exc != NULL)
323                                                 mono_unhandled_exception (ac->msg->exc);
324                                         */
325                                         mono_domain_set (mono_get_root_domain (), TRUE);
326                                 }
327                                 mono_thread_pop_appdomain_ref ();
328                                 InterlockedDecrement (&busy_io_worker_threads);
329                                 /* If the callee changes the background status, set it back to TRUE */
330                                 if (*version != '1' && !mono_thread_test_state (thread , ThreadState_Background))
331                                         ves_icall_System_Threading_Thread_SetState (thread, ThreadState_Background);
332                         }
333                 }
334
335                 data = dequeue_job (&io_queue_lock, &async_io_queue);
336         
337                 if (!data) {
338                         guint32 wr;
339                         int timeout = THREAD_EXIT_TIMEOUT;
340                         guint32 start_time = mono_msec_ticks ();
341                         
342                         do {
343                                 wr = WaitForSingleObjectEx (io_job_added, (guint32)timeout, TRUE);
344                                 if (THREAD_WANTS_A_BREAK (thread))
345                                         mono_thread_interruption_checkpoint ();
346                         
347                                 timeout -= mono_msec_ticks () - start_time;
348                         
349                                 if (wr != WAIT_TIMEOUT && wr != WAIT_IO_COMPLETION)
350                                         data = dequeue_job (&io_queue_lock, &async_io_queue);
351                         }
352                         while (!data && timeout > 0);
353                 }
354
355                 if (!data) {
356                         if (InterlockedDecrement (&io_worker_threads) < 2) {
357                                 /* If we have pending items, keep the thread alive */
358                                 if (InterlockedCompareExchange (&pending_io_items, 0, 0) != 0) {
359                                         InterlockedIncrement (&io_worker_threads);
360                                         continue;
361                                 }
362                         }
363                         return;
364                 }
365                 
366                 InterlockedIncrement (&busy_io_worker_threads);
367         }
368
369         g_assert_not_reached ();
370 }
371
372 static void
373 start_io_thread_or_queue (MonoSocketAsyncResult *ares)
374 {
375         int busy, worker;
376
377         busy = (int) InterlockedCompareExchange (&busy_io_worker_threads, 0, -1);
378         worker = (int) InterlockedCompareExchange (&io_worker_threads, 0, -1); 
379         if (worker <= ++busy &&
380             worker < mono_io_max_worker_threads) {
381                 InterlockedIncrement (&busy_io_worker_threads);
382                 InterlockedIncrement (&io_worker_threads);
383                 threadpool_jobs_inc ((MonoObject *)ares);
384                 mono_thread_create_internal (mono_get_root_domain (), async_invoke_io_thread, ares, TRUE);
385         } else {
386                 append_job (&io_queue_lock, &async_io_queue, (MonoObject*)ares);
387                 ReleaseSemaphore (io_job_added, 1, NULL);
388         }
389 }
390
391 static MonoMList *
392 process_io_event (MonoMList *list, int event)
393 {
394         MonoSocketAsyncResult *state;
395         MonoMList *oldlist;
396
397         oldlist = list;
398         state = NULL;
399         while (list) {
400                 state = (MonoSocketAsyncResult *) mono_mlist_get_data (list);
401                 if (get_event_from_state (state) == event)
402                         break;
403                 
404                 list = mono_mlist_next (list);
405         }
406
407         if (list != NULL) {
408                 oldlist = mono_mlist_remove_item (oldlist, list);
409 #ifdef EPOLL_DEBUG
410                 g_print ("Dispatching event %d on socket %d\n", event, state->handle);
411 #endif
412                 InterlockedIncrement (&pending_io_items);
413                 start_io_thread_or_queue (state);
414         }
415
416         return oldlist;
417 }
418
419 static int
420 mark_bad_fds (mono_pollfd *pfds, int nfds)
421 {
422         int i, ret;
423         mono_pollfd *pfd;
424         int count = 0;
425
426         for (i = 0; i < nfds; i++) {
427                 pfd = &pfds [i];
428                 if (pfd->fd == -1)
429                         continue;
430
431                 ret = mono_poll (pfd, 1, 0);
432                 if (ret == -1 && errno == EBADF) {
433                         pfd->revents |= MONO_POLLNVAL;
434                         count++;
435                 } else if (ret == 1) {
436                         count++;
437                 }
438         }
439
440         return count;
441 }
442
443 static void
444 socket_io_poll_main (gpointer p)
445 {
446 #define INITIAL_POLLFD_SIZE     1024
447 #define POLL_ERRORS (MONO_POLLERR | MONO_POLLHUP | MONO_POLLNVAL)
448         SocketIOData *data = p;
449         mono_pollfd *pfds;
450         gint maxfd = 1;
451         gint allocated;
452         gint i;
453         MonoThread *thread;
454
455         thread = mono_thread_current ();
456
457         allocated = INITIAL_POLLFD_SIZE;
458         pfds = g_new0 (mono_pollfd, allocated);
459         INIT_POLLFD (pfds, data->pipe [0], MONO_POLLIN);
460         for (i = 1; i < allocated; i++)
461                 INIT_POLLFD (&pfds [i], -1, 0);
462
463         while (1) {
464                 int nsock = 0;
465                 mono_pollfd *pfd;
466                 char one [1];
467                 MonoMList *list;
468
469                 do {
470                         if (nsock == -1) {
471                                 if (THREAD_WANTS_A_BREAK (thread))
472                                         mono_thread_interruption_checkpoint ();
473                         }
474
475                         nsock = mono_poll (pfds, maxfd, -1);
476                 } while (nsock == -1 && errno == EINTR);
477
478                 /* 
479                  * Apart from EINTR, we only check EBADF, for the rest:
480                  *  EINVAL: mono_poll() 'protects' us from descriptor
481                  *      numbers above the limit if using select() by marking
482                  *      then as MONO_POLLERR.  If a system poll() is being
483                  *      used, the number of descriptor we're passing will not
484                  *      be over sysconf(_SC_OPEN_MAX), as the error would have
485                  *      happened when opening.
486                  *
487                  *  EFAULT: we own the memory pointed by pfds.
488                  *  ENOMEM: we're doomed anyway
489                  *
490                  */
491
492                 if (nsock == -1 && errno == EBADF) {
493                         pfds->revents = 0; /* Just in case... */
494                         nsock = mark_bad_fds (pfds, maxfd);
495                 }
496
497                 if ((pfds->revents & POLL_ERRORS) != 0) {
498                         /* We're supposed to die now, as the pipe has been closed */
499                         g_free (pfds);
500                         socket_io_cleanup (data);
501                         return;
502                 }
503
504                 /* Got a new socket */
505                 if ((pfds->revents & MONO_POLLIN) != 0) {
506                         int nread;
507
508                         for (i = 1; i < allocated; i++) {
509                                 pfd = &pfds [i];
510                                 if (pfd->fd == -1 || pfd->fd == data->newpfd->fd)
511                                         break;
512                         }
513
514                         if (i == allocated) {
515                                 mono_pollfd *oldfd;
516
517                                 oldfd = pfds;
518                                 i = allocated;
519                                 allocated = allocated * 2;
520                                 pfds = g_renew (mono_pollfd, oldfd, allocated);
521                                 g_free (oldfd);
522                                 for (; i < allocated; i++)
523                                         INIT_POLLFD (&pfds [i], -1, 0);
524                         }
525 #ifndef PLATFORM_WIN32
526                         nread = read (data->pipe [0], one, 1);
527 #else
528                         nread = recv ((SOCKET) data->pipe [0], one, 1, 0);
529 #endif
530                         if (nread <= 0) {
531                                 g_free (pfds);
532                                 return; /* we're closed */
533                         }
534
535                         INIT_POLLFD (&pfds [i], data->newpfd->fd, data->newpfd->events);
536                         ReleaseSemaphore (data->new_sem, 1, NULL);
537                         if (i >= maxfd)
538                                 maxfd = i + 1;
539                         nsock--;
540                 }
541
542                 if (nsock == 0)
543                         continue;
544
545                 EnterCriticalSection (&data->io_lock);
546                 if (data->inited == 0) {
547                         g_free (pfds);
548                         LeaveCriticalSection (&data->io_lock);
549                         return; /* cleanup called */
550                 }
551
552                 for (i = 1; i < maxfd && nsock > 0; i++) {
553                         pfd = &pfds [i];
554                         if (pfd->fd == -1 || pfd->revents == 0)
555                                 continue;
556
557                         nsock--;
558                         list = mono_g_hash_table_lookup (data->sock_to_state, GINT_TO_POINTER (pfd->fd));
559                         if (list != NULL && (pfd->revents & (MONO_POLLIN | POLL_ERRORS)) != 0) {
560                                 list = process_io_event (list, MONO_POLLIN);
561                         }
562
563                         if (list != NULL && (pfd->revents & (MONO_POLLOUT | POLL_ERRORS)) != 0) {
564                                 list = process_io_event (list, MONO_POLLOUT);
565                         }
566
567                         if (list != NULL) {
568                                 mono_g_hash_table_replace (data->sock_to_state, GINT_TO_POINTER (pfd->fd), list);
569                                 pfd->events = get_events_from_list (list);
570                         } else {
571                                 mono_g_hash_table_remove (data->sock_to_state, GINT_TO_POINTER (pfd->fd));
572                                 pfd->fd = -1;
573                                 if (i == maxfd - 1)
574                                         maxfd--;
575                         }
576                 }
577                 LeaveCriticalSection (&data->io_lock);
578         }
579 }
580
581 #ifdef HAVE_EPOLL
582 #define EPOLL_ERRORS (EPOLLERR | EPOLLHUP)
583 static void
584 socket_io_epoll_main (gpointer p)
585 {
586         SocketIOData *data;
587         int epollfd;
588         MonoThread *thread;
589         struct epoll_event *events, *evt;
590         const int nevents = 512;
591         int ready = 0, i;
592
593         data = p;
594         epollfd = data->epollfd;
595         thread = mono_thread_current ();
596         events = g_new0 (struct epoll_event, nevents);
597
598         while (1) {
599                 do {
600                         if (ready == -1) {
601                                 if (THREAD_WANTS_A_BREAK (thread))
602                                         mono_thread_interruption_checkpoint ();
603                         }
604 #ifdef EPOLL_DEBUG
605                         g_print ("epoll_wait init\n");
606 #endif
607                         ready = epoll_wait (epollfd, events, nevents, -1);
608 #ifdef EPOLL_DEBUG
609                         {
610                         int err = errno;
611                         g_print ("epoll_wait end with %d ready sockets (%d %s).\n", ready, err, (err) ? g_strerror (err) : "");
612                         errno = err;
613                         }
614 #endif
615                 } while (ready == -1 && errno == EINTR);
616
617                 if (ready == -1) {
618                         int err = errno;
619                         g_free (events);
620                         if (err != EBADF)
621                                 g_warning ("epoll_wait: %d %s\n", err, g_strerror (err));
622
623                         close (epollfd);
624                         return;
625                 }
626
627                 EnterCriticalSection (&data->io_lock);
628                 if (data->inited == 0) {
629 #ifdef EPOLL_DEBUG
630                         g_print ("data->inited == 0\n");
631 #endif
632                         g_free (events);
633                         close (epollfd);
634                         return; /* cleanup called */
635                 }
636
637                 for (i = 0; i < ready; i++) {
638                         int fd;
639                         MonoMList *list;
640
641                         evt = &events [i];
642                         fd = evt->data.fd;
643                         list = mono_g_hash_table_lookup (data->sock_to_state, GINT_TO_POINTER (fd));
644 #ifdef EPOLL_DEBUG
645                         g_print ("Event %d on %d list length: %d\n", evt->events, fd, mono_mlist_length (list));
646 #endif
647                         if (list != NULL && (evt->events & (EPOLLIN | EPOLL_ERRORS)) != 0) {
648                                 list = process_io_event (list, MONO_POLLIN);
649                         }
650
651                         if (list != NULL && (evt->events & (EPOLLOUT | EPOLL_ERRORS)) != 0) {
652                                 list = process_io_event (list, MONO_POLLOUT);
653                         }
654
655                         if (list != NULL) {
656                                 mono_g_hash_table_replace (data->sock_to_state, GINT_TO_POINTER (fd), list);
657                                 evt->events = get_events_from_list (list);
658 #ifdef EPOLL_DEBUG
659                                 g_print ("MOD %d to %d\n", fd, evt->events);
660 #endif
661                                 if (epoll_ctl (epollfd, EPOLL_CTL_MOD, fd, evt)) {
662                                         if (epoll_ctl (epollfd, EPOLL_CTL_ADD, fd, evt) == -1) {
663 #ifdef EPOLL_DEBUG
664                                                 int err = errno;
665                                                 g_message ("epoll_ctl(MOD): %d %s fd: %d events: %d", err, g_strerror (err), fd, evt->events);
666                                                 errno = err;
667 #endif
668                                         }
669                                 }
670                         } else {
671                                 mono_g_hash_table_remove (data->sock_to_state, GINT_TO_POINTER (fd));
672 #ifdef EPOLL_DEBUG
673                                 g_print ("DEL %d\n", fd);
674 #endif
675                                 epoll_ctl (epollfd, EPOLL_CTL_DEL, fd, evt);
676                         }
677                 }
678                 LeaveCriticalSection (&data->io_lock);
679         }
680 }
681 #endif
682
683 /*
684  * select/poll wake up when a socket is closed, but epoll just removes
685  * the socket from its internal list without notification.
686  */
687 void
688 mono_thread_pool_remove_socket (int sock)
689 {
690         MonoMList *list, *next;
691         MonoSocketAsyncResult *state;
692
693         if (socket_io_data.inited == FALSE)
694                 return;
695
696         EnterCriticalSection (&socket_io_data.io_lock);
697         list = mono_g_hash_table_lookup (socket_io_data.sock_to_state, GINT_TO_POINTER (sock));
698         if (list) {
699                 mono_g_hash_table_remove (socket_io_data.sock_to_state, GINT_TO_POINTER (sock));
700         }
701         LeaveCriticalSection (&socket_io_data.io_lock);
702         
703         while (list) {
704                 state = (MonoSocketAsyncResult *) mono_mlist_get_data (list);
705                 if (state->operation == AIO_OP_RECEIVE)
706                         state->operation = AIO_OP_RECV_JUST_CALLBACK;
707                 else if (state->operation == AIO_OP_SEND)
708                         state->operation = AIO_OP_SEND_JUST_CALLBACK;
709
710                 next = mono_mlist_remove_item (list, list);
711                 list = process_io_event (list, MONO_POLLIN);
712                 if (list)
713                         process_io_event (list, MONO_POLLOUT);
714
715                 list = next;
716         }
717 }
718
719 #ifdef PLATFORM_WIN32
720 static void
721 connect_hack (gpointer x)
722 {
723         struct sockaddr_in *addr = (struct sockaddr_in *) x;
724         int count = 0;
725
726         while (connect ((SOCKET) socket_io_data.pipe [1], (SOCKADDR *) addr, sizeof (struct sockaddr_in))) {
727                 Sleep (500);
728                 if (++count > 3) {
729                         g_warning ("Error initializing async. sockets %d.\n", WSAGetLastError ());
730                         g_assert (WSAGetLastError ());
731                 }
732         }
733 }
734 #endif
735
736 static void
737 socket_io_init (SocketIOData *data)
738 {
739 #ifdef PLATFORM_WIN32
740         struct sockaddr_in server;
741         struct sockaddr_in client;
742         SOCKET srv;
743         int len;
744 #endif
745         int inited;
746
747         inited = InterlockedCompareExchange (&data->inited, -1, -1);
748         if (inited == 1)
749                 return;
750
751         EnterCriticalSection (&data->io_lock);
752         inited = InterlockedCompareExchange (&data->inited, -1, -1);
753         if (inited == 1) {
754                 LeaveCriticalSection (&data->io_lock);
755                 return;
756         }
757
758 #ifdef HAVE_EPOLL
759         data->epoll_disabled = (g_getenv ("MONO_DISABLE_AIO") != NULL);
760         if (FALSE == data->epoll_disabled) {
761                 data->epollfd = epoll_create (256);
762                 data->epoll_disabled = (data->epollfd == -1);
763                 if (data->epoll_disabled && g_getenv ("MONO_DEBUG"))
764                         g_message ("epoll_create() failed. Using plain poll().");
765         } else {
766                 data->epollfd = -1;
767         }
768 #else
769         data->epoll_disabled = TRUE;
770 #endif
771
772 #ifndef PLATFORM_WIN32
773         if (data->epoll_disabled) {
774                 if (pipe (data->pipe) != 0) {
775                         int err = errno;
776                         perror ("mono");
777                         g_assert (err);
778                 }
779         } else {
780                 data->pipe [0] = -1;
781                 data->pipe [1] = -1;
782         }
783 #else
784         srv = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
785         g_assert (srv != INVALID_SOCKET);
786         data->pipe [1] = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
787         g_assert (data->pipe [1] != INVALID_SOCKET);
788
789         server.sin_family = AF_INET;
790         server.sin_addr.s_addr = inet_addr ("127.0.0.1");
791         server.sin_port = 0;
792         if (bind (srv, (SOCKADDR *) &server, sizeof (server))) {
793                 g_print ("%d\n", WSAGetLastError ());
794                 g_assert (1 != 0);
795         }
796
797         len = sizeof (server);
798         getsockname (srv, (SOCKADDR *) &server, &len);
799         listen (srv, 1);
800         mono_thread_create (mono_get_root_domain (), connect_hack, &server);
801         len = sizeof (server);
802         data->pipe [0] = accept (srv, (SOCKADDR *) &client, &len);
803         g_assert (data->pipe [0] != INVALID_SOCKET);
804         closesocket (srv);
805 #endif
806         mono_io_max_worker_threads = mono_max_worker_threads / 2;
807         if (mono_io_max_worker_threads < 10)
808                 mono_io_max_worker_threads = 10;
809
810         data->sock_to_state = mono_g_hash_table_new_type (g_direct_hash, g_direct_equal, MONO_HASH_VALUE_GC);
811
812         if (data->epoll_disabled) {
813                 data->new_sem = CreateSemaphore (NULL, 1, 1, NULL);
814                 g_assert (data->new_sem != NULL);
815         }
816         io_job_added = CreateSemaphore (NULL, 0, 0x7fffffff, NULL);
817         g_assert (io_job_added != NULL);
818         if (data->epoll_disabled) {
819                 mono_thread_create_internal (mono_get_root_domain (), socket_io_poll_main, data, TRUE);
820         }
821 #ifdef HAVE_EPOLL
822         else {
823                 mono_thread_create_internal (mono_get_root_domain (), socket_io_epoll_main, data, TRUE);
824         }
825 #endif
826         InterlockedCompareExchange (&data->inited, 1, 0);
827         LeaveCriticalSection (&data->io_lock);
828 }
829
830 static void
831 socket_io_add_poll (MonoSocketAsyncResult *state)
832 {
833         int events;
834         char msg [1];
835         MonoMList *list;
836         SocketIOData *data = &socket_io_data;
837
838 #if defined(PLATFORM_MACOSX) || defined(PLATFORM_BSD) || defined(PLATFORM_WIN32) || defined(PLATFORM_SOLARIS)
839         /* select() for connect() does not work well on the Mac. Bug #75436. */
840         /* Bug #77637 for the BSD 6 case */
841         /* Bug #78888 for the Windows case */
842         if (state->operation == AIO_OP_CONNECT && state->blocking == TRUE) {
843                 start_io_thread_or_queue (state);
844                 return;
845         }
846 #endif
847         WaitForSingleObject (data->new_sem, INFINITE);
848         if (data->newpfd == NULL)
849                 data->newpfd = g_new0 (mono_pollfd, 1);
850
851         EnterCriticalSection (&data->io_lock);
852         /* FIXME: 64 bit issue: handle can be a pointer on windows? */
853         list = mono_g_hash_table_lookup (data->sock_to_state, GINT_TO_POINTER (state->handle));
854         if (list == NULL) {
855                 list = mono_mlist_alloc ((MonoObject*)state);
856         } else {
857                 list = mono_mlist_append (list, (MonoObject*)state);
858         }
859
860         events = get_events_from_list (list);
861         INIT_POLLFD (data->newpfd, GPOINTER_TO_INT (state->handle), events);
862         mono_g_hash_table_replace (data->sock_to_state, GINT_TO_POINTER (state->handle), list);
863         LeaveCriticalSection (&data->io_lock);
864         *msg = (char) state->operation;
865 #ifndef PLATFORM_WIN32
866         write (data->pipe [1], msg, 1);
867 #else
868         send ((SOCKET) data->pipe [1], msg, 1, 0);
869 #endif
870 }
871
872 #ifdef HAVE_EPOLL
873 static gboolean
874 socket_io_add_epoll (MonoSocketAsyncResult *state)
875 {
876         MonoMList *list;
877         SocketIOData *data = &socket_io_data;
878         struct epoll_event event;
879         int epoll_op, ievt;
880         int fd;
881
882         memset (&event, 0, sizeof (struct epoll_event));
883         fd = GPOINTER_TO_INT (state->handle);
884         EnterCriticalSection (&data->io_lock);
885         list = mono_g_hash_table_lookup (data->sock_to_state, GINT_TO_POINTER (fd));
886         if (list == NULL) {
887                 list = mono_mlist_alloc ((MonoObject*)state);
888                 epoll_op = EPOLL_CTL_ADD;
889         } else {
890                 list = mono_mlist_append (list, (MonoObject*)state);
891                 epoll_op = EPOLL_CTL_MOD;
892         }
893
894         ievt = get_events_from_list (list);
895         if ((ievt & MONO_POLLIN) != 0)
896                 event.events |= EPOLLIN;
897         if ((ievt & MONO_POLLOUT) != 0)
898                 event.events |= EPOLLOUT;
899
900         mono_g_hash_table_replace (data->sock_to_state, state->handle, list);
901         event.data.fd = fd;
902 #ifdef EPOLL_DEBUG
903         g_print ("%s %d with %d\n", epoll_op == EPOLL_CTL_ADD ? "ADD" : "MOD", fd, event.events);
904 #endif
905         if (epoll_ctl (data->epollfd, epoll_op, fd, &event) == -1) {
906                 int err = errno;
907                 if (epoll_op == EPOLL_CTL_ADD && err == EEXIST) {
908                         epoll_op = EPOLL_CTL_MOD;
909                         if (epoll_ctl (data->epollfd, epoll_op, fd, &event) == -1) {
910                                 g_message ("epoll_ctl(MOD): %d %s\n", err, g_strerror (err));
911                         }
912                 }
913         }
914
915         LeaveCriticalSection (&data->io_lock);
916         return TRUE;
917 }
918 #endif
919
920 static void
921 socket_io_add (MonoAsyncResult *ares, MonoSocketAsyncResult *state)
922 {
923         socket_io_init (&socket_io_data);
924         MONO_OBJECT_SETREF (state, ares, ares);
925 #ifdef HAVE_EPOLL
926         if (socket_io_data.epoll_disabled == FALSE) {
927                 if (socket_io_add_epoll (state))
928                         return;
929         }
930 #endif
931         socket_io_add_poll (state);
932 }
933
934 static gboolean
935 socket_io_filter (MonoObject *target, MonoObject *state)
936 {
937         gint op;
938         MonoSocketAsyncResult *sock_res = (MonoSocketAsyncResult *) state;
939         MonoClass *klass;
940
941         if (target == NULL || state == NULL)
942                 return FALSE;
943
944         if (socket_async_call_klass == NULL) {
945                 klass = target->vtable->klass;
946                 /* Check if it's SocketAsyncCall in System.Net.Sockets
947                  * FIXME: check the assembly is signed correctly for extra care
948                  */
949                 if (klass->name [0] == 'S' && strcmp (klass->name, "SocketAsyncCall") == 0 
950                                 && strcmp (mono_image_get_name (klass->image), "System") == 0
951                                 && klass->nested_in && strcmp (klass->nested_in->name, "Socket") == 0)
952                         socket_async_call_klass = klass;
953         }
954
955         if (process_async_call_klass == NULL) {
956                 klass = target->vtable->klass;
957                 /* Check if it's AsyncReadHandler in System.Diagnostics.Process
958                  * FIXME: check the assembly is signed correctly for extra care
959                  */
960                 if (klass->name [0] == 'A' && strcmp (klass->name, "AsyncReadHandler") == 0 
961                                 && strcmp (mono_image_get_name (klass->image), "System") == 0
962                                 && klass->nested_in && strcmp (klass->nested_in->name, "Process") == 0)
963                         process_async_call_klass = klass;
964         }
965         /* return both when socket_async_call_klass has not been seen yet and when
966          * the object is not an instance of the class.
967          */
968         if (target->vtable->klass != socket_async_call_klass && target->vtable->klass != process_async_call_klass)
969                 return FALSE;
970
971         op = sock_res->operation;
972         if (op < AIO_OP_FIRST || op >= AIO_OP_LAST)
973                 return FALSE;
974
975         return TRUE;
976 }
977 #endif /* !DISABLE_SOCKETS */
978
979 static void
980 mono_async_invoke (MonoAsyncResult *ares)
981 {
982         ASyncCall *ac = (ASyncCall *)ares->object_data;
983         MonoThread *thread = NULL;
984         MonoObject *res, *exc = NULL;
985         MonoArray *out_args = NULL;
986
987         if (ares->execution_context) {
988                 /* use captured ExecutionContext (if available) */
989                 thread = mono_thread_current ();
990                 MONO_OBJECT_SETREF (ares, original_context, mono_thread_get_execution_context ());
991                 mono_thread_set_execution_context (ares->execution_context);
992         } else {
993                 ares->original_context = NULL;
994         }
995
996         ac->msg->exc = NULL;
997         res = mono_message_invoke (ares->async_delegate, ac->msg, &exc, &out_args);
998         MONO_OBJECT_SETREF (ac, res, res);
999         MONO_OBJECT_SETREF (ac, msg->exc, exc);
1000         MONO_OBJECT_SETREF (ac, out_args, out_args);
1001
1002         ares->completed = 1;
1003
1004         /* call async callback if cb_method != null*/
1005         if (ac->cb_method) {
1006                 MonoObject *exc = NULL;
1007                 void *pa = &ares;
1008                 mono_runtime_invoke (ac->cb_method, ac->cb_target, pa, &exc);
1009                 /* 'exc' will be the previous ac->msg->exc if not NULL and not
1010                  * catched. If catched, this will be set to NULL and the
1011                  * exception will not be printed. */
1012                 MONO_OBJECT_SETREF (ac->msg, exc, exc);
1013         }
1014
1015         /* restore original thread execution context if flow isn't suppressed, i.e. non null */
1016         if (ares->original_context) {
1017                 mono_thread_set_execution_context (ares->original_context);
1018                 ares->original_context = NULL;
1019         }
1020
1021         /* notify listeners */
1022         mono_monitor_enter ((MonoObject *) ares);
1023         if (ares->handle != NULL) {
1024                 ac->wait_event = (gsize) mono_wait_handle_get_handle ((MonoWaitHandle *) ares->handle);
1025                 SetEvent ((gpointer)(gsize)ac->wait_event);
1026         }
1027         mono_monitor_exit ((MonoObject *) ares);
1028
1029         EnterCriticalSection (&ares_lock);
1030         mono_g_hash_table_remove (ares_htable, ares);
1031         LeaveCriticalSection (&ares_lock);
1032 }
1033
1034 static void
1035 start_idle_threads (MonoAsyncResult *data)
1036 {
1037         int needed;
1038         int existing;
1039
1040         needed = (int) InterlockedCompareExchange (&mono_min_worker_threads, 0, -1); 
1041         do {
1042                 existing = (int) InterlockedCompareExchange (&mono_worker_threads, 0, -1); 
1043                 if ((needed - existing) > 0) {
1044                         start_tpthread (data);
1045                         if (data) 
1046                                 threadpool_jobs_dec ((MonoObject*)data);
1047                         data = NULL;
1048                         Sleep (500);
1049                 }
1050         } while ((needed - existing) > 0);
1051
1052         /* If we don't start any thread here, make sure 'data' is processed. */
1053         if (data != NULL) {
1054                 start_thread_or_queue (data);
1055                 threadpool_jobs_dec ((MonoObject*)data);
1056         }
1057 }
1058
1059 static void
1060 start_tpthread (MonoAsyncResult *data)
1061 {
1062         InterlockedIncrement (&mono_worker_threads);
1063         InterlockedIncrement (&busy_worker_threads);
1064         threadpool_jobs_inc ((MonoObject *)data);
1065         mono_thread_create_internal (mono_get_root_domain (), async_invoke_thread, data, TRUE);
1066 }
1067
1068 void
1069 mono_thread_pool_init ()
1070 {
1071         int threads_per_cpu = THREADS_PER_CPU;
1072         int cpu_count;
1073
1074         if ((int) InterlockedCompareExchange (&tp_inited, 1, 0) == 1)
1075                 return;
1076
1077         MONO_GC_REGISTER_ROOT (ares_htable);
1078         MONO_GC_REGISTER_ROOT (socket_io_data.sock_to_state);
1079         InitializeCriticalSection (&socket_io_data.io_lock);
1080         InitializeCriticalSection (&ares_lock);
1081         InitializeCriticalSection (&io_queue_lock);
1082         ares_htable = mono_g_hash_table_new_type (NULL, NULL, MONO_HASH_KEY_VALUE_GC);
1083         job_added = CreateSemaphore (NULL, 0, 0x7fffffff, NULL);
1084         g_assert (job_added != NULL);
1085         if (g_getenv ("MONO_THREADS_PER_CPU") != NULL) {
1086                 threads_per_cpu = atoi (g_getenv ("MONO_THREADS_PER_CPU"));
1087                 if (threads_per_cpu <= 0)
1088                         threads_per_cpu = THREADS_PER_CPU;
1089         }
1090
1091         cpu_count = mono_cpu_count ();
1092         mono_max_worker_threads = 20 + threads_per_cpu * cpu_count;
1093         mono_min_worker_threads = cpu_count; /* 1 idle thread per cpu */
1094
1095         async_call_klass = mono_class_from_name (mono_defaults.corlib, "System", "MonoAsyncCall");
1096         g_assert (async_call_klass);
1097 }
1098
1099 MonoAsyncResult *
1100 mono_thread_pool_add (MonoObject *target, MonoMethodMessage *msg, MonoDelegate *async_callback,
1101                       MonoObject *state)
1102 {
1103         MonoDomain *domain = mono_domain_get ();
1104         MonoAsyncResult *ares;
1105         ASyncCall *ac;
1106
1107         ac = (ASyncCall*)mono_object_new (mono_domain_get (), async_call_klass);
1108         MONO_OBJECT_SETREF (ac, msg, msg);
1109         MONO_OBJECT_SETREF (ac, state, state);
1110
1111         if (async_callback) {
1112                 ac->cb_method = mono_get_delegate_invoke (((MonoObject *)async_callback)->vtable->klass);
1113                 MONO_OBJECT_SETREF (ac, cb_target, async_callback);
1114         }
1115
1116         ares = mono_async_result_new (domain, NULL, ac->state, NULL, (MonoObject*)ac);
1117         MONO_OBJECT_SETREF (ares, async_delegate, target);
1118
1119         if (domain->state == MONO_APPDOMAIN_UNLOADED || domain->state == MONO_APPDOMAIN_UNLOADING)
1120                 return ares;
1121
1122         EnterCriticalSection (&ares_lock);
1123         mono_g_hash_table_insert (ares_htable, ares, ares);
1124         LeaveCriticalSection (&ares_lock);
1125
1126 #ifndef DISABLE_SOCKETS
1127         if (socket_io_filter (target, state)) {
1128                 socket_io_add (ares, (MonoSocketAsyncResult *) state);
1129                 return ares;
1130         }
1131 #endif
1132         
1133         start_thread_or_queue (ares);
1134         return ares;
1135 }
1136
1137 static void
1138 start_thread_or_queue (MonoAsyncResult *ares)
1139 {
1140         int busy, worker;
1141
1142         if ((int) InterlockedCompareExchange (&tp_idle_started, 1, 0) == 0) {
1143                 threadpool_jobs_inc ((MonoObject*)ares);
1144                 mono_thread_create_internal (mono_get_root_domain (), start_idle_threads, ares, TRUE);
1145                 return;
1146         }
1147
1148         busy = (int) InterlockedCompareExchange (&busy_worker_threads, 0, -1);
1149         worker = (int) InterlockedCompareExchange (&mono_worker_threads, 0, -1); 
1150         if (worker <= ++busy &&
1151             worker < mono_max_worker_threads) {
1152                 start_tpthread (ares);
1153         } else {
1154                 append_job (&mono_delegate_section, &async_call_queue, (MonoObject*)ares);
1155                 ReleaseSemaphore (job_added, 1, NULL);
1156         }
1157 }
1158
1159 MonoObject *
1160 mono_thread_pool_finish (MonoAsyncResult *ares, MonoArray **out_args, MonoObject **exc)
1161 {
1162         ASyncCall *ac;
1163
1164         *exc = NULL;
1165         *out_args = NULL;
1166
1167         /* check if already finished */
1168         mono_monitor_enter ((MonoObject *) ares);
1169         
1170         if (ares->endinvoke_called) {
1171                 *exc = (MonoObject *)mono_exception_from_name (mono_defaults.corlib, "System", 
1172                                               "InvalidOperationException");
1173                 mono_monitor_exit ((MonoObject *) ares);
1174                 return NULL;
1175         }
1176
1177         ares->endinvoke_called = 1;
1178         ac = (ASyncCall *)ares->object_data;
1179
1180         g_assert (ac != NULL);
1181
1182         /* wait until we are really finished */
1183         if (!ares->completed) {
1184                 if (ares->handle == NULL) {
1185                         ac->wait_event = (gsize)CreateEvent (NULL, TRUE, FALSE, NULL);
1186                         g_assert(ac->wait_event != 0);
1187                         MONO_OBJECT_SETREF (ares, handle, (MonoObject *) mono_wait_handle_new (mono_object_domain (ares), (gpointer)(gsize)ac->wait_event));
1188                 }
1189                 mono_monitor_exit ((MonoObject *) ares);
1190                 WaitForSingleObjectEx ((gpointer)(gsize)ac->wait_event, INFINITE, TRUE);
1191         } else {
1192                 mono_monitor_exit ((MonoObject *) ares);
1193         }
1194
1195         *exc = ac->msg->exc; /* FIXME: GC add write barrier */
1196         *out_args = ac->out_args;
1197
1198         return ac->res;
1199 }
1200
1201 void
1202 mono_thread_pool_cleanup (void)
1203 {
1204         gint release;
1205
1206         EnterCriticalSection (&mono_delegate_section);
1207         free_queue (&async_call_queue);
1208         release = (gint) InterlockedCompareExchange (&mono_worker_threads, 0, -1);
1209         LeaveCriticalSection (&mono_delegate_section);
1210         if (job_added)
1211                 ReleaseSemaphore (job_added, release, NULL);
1212
1213         socket_io_cleanup (&socket_io_data);
1214 }
1215
1216 static void
1217 append_job (CRITICAL_SECTION *cs, TPQueue *list, MonoObject *ar)
1218 {
1219         threadpool_jobs_inc (ar); 
1220
1221         EnterCriticalSection (cs);
1222         if (list->array && (list->next_elem < mono_array_length (list->array))) {
1223                 mono_array_setref (list->array, list->next_elem, ar);
1224                 list->next_elem++;
1225                 LeaveCriticalSection (cs);
1226                 return;
1227         }
1228         if (!list->array) {
1229                 MONO_GC_REGISTER_ROOT (list->array);
1230                 list->array = mono_array_new_cached (mono_get_root_domain (), mono_defaults.object_class, 16);
1231         } else {
1232                 int count = list->next_elem - list->first_elem;
1233                 /* slide the array or create a larger one if it's full */
1234                 if (list->first_elem) {
1235                         mono_array_memcpy_refs (list->array, 0, list->array, list->first_elem, count);
1236                 } else {
1237                         MonoArray *newa = mono_array_new_cached (mono_get_root_domain (), mono_defaults.object_class, mono_array_length (list->array) * 2);
1238                         mono_array_memcpy_refs (newa, 0, list->array, list->first_elem, count);
1239                         list->array = newa;
1240                 }
1241                 list->first_elem = 0;
1242                 list->next_elem = count;
1243         }
1244         mono_array_setref (list->array, list->next_elem, ar);
1245         list->next_elem++;
1246         LeaveCriticalSection (cs);
1247 }
1248
1249
1250 static void
1251 clear_queue (CRITICAL_SECTION *cs, TPQueue *list, MonoDomain *domain)
1252 {
1253         int i, count = 0;
1254         EnterCriticalSection (cs);
1255         /*remove*/
1256         for (i = list->first_elem; i < list->next_elem; ++i) {
1257                 MonoObject *obj = mono_array_get (list->array, MonoObject*, i);
1258                 if (obj->vtable->domain == domain) {
1259                         unregister_job ((MonoAsyncResult*)obj);
1260
1261                         mono_array_set (list->array, MonoObject*, i, NULL);
1262                         InterlockedDecrement (&domain->threadpool_jobs);
1263                         ++count;
1264                 }
1265         }
1266         /*compact*/
1267         if (count) {
1268                 int idx = 0;
1269                 for (i = list->first_elem; i < list->next_elem; ++i) {
1270                         MonoObject *obj = mono_array_get (list->array, MonoObject*, i);
1271                         if (obj)
1272                                 mono_array_set (list->array, MonoObject*, idx++, obj);
1273                 }
1274                 list->first_elem = 0;
1275                 list->next_elem = count;
1276         }
1277         LeaveCriticalSection (cs);
1278 }
1279
1280 /*
1281  * Clean up the threadpool of all domain jobs.
1282  * Can only be called as part of the domain unloading process as
1283  * it will wait for all jobs to be visible to the interruption code. 
1284  */
1285 gboolean
1286 mono_thread_pool_remove_domain_jobs (MonoDomain *domain, int timeout)
1287 {
1288         HANDLE sem_handle;
1289         int result = TRUE;
1290         guint32 start_time = 0;
1291
1292         clear_queue (&mono_delegate_section, &async_call_queue, domain);
1293         clear_queue (&io_queue_lock, &async_io_queue, domain);
1294
1295         /*
1296          * There might be some threads out that could be about to execute stuff from the given domain.
1297          * We avoid that by setting up a semaphore to be pulsed by the thread that reaches zero.
1298          */
1299         sem_handle = CreateSemaphore (NULL, 0, 1, NULL);
1300         
1301         domain->cleanup_semaphore = sem_handle;
1302         /*
1303          * The memory barrier here is required to have global ordering between assigning to cleanup_semaphone
1304          * and reading threadpool_jobs.
1305          * Otherwise this thread could read a stale version of threadpool_jobs and wait forever.
1306          */
1307         mono_memory_write_barrier ();
1308
1309         if (domain->threadpool_jobs && timeout != -1)
1310                 start_time = mono_msec_ticks ();
1311         while (domain->threadpool_jobs) {
1312                 WaitForSingleObject (sem_handle, timeout);
1313                 if (timeout != -1 && (mono_msec_ticks () - start_time) > timeout) {
1314                         result = FALSE;
1315                         break;
1316                 }
1317         }
1318
1319         domain->cleanup_semaphore = NULL;
1320         CloseHandle (sem_handle);
1321         return result;
1322 }
1323
1324
1325 static MonoObject*
1326 dequeue_job (CRITICAL_SECTION *cs, TPQueue *list)
1327 {
1328         MonoObject *ar;
1329         int count;
1330
1331         EnterCriticalSection (cs);
1332         if (!list->array || list->first_elem == list->next_elem) {
1333                 LeaveCriticalSection (cs);
1334                 return NULL;
1335         }
1336         ar = mono_array_get (list->array, MonoObject*, list->first_elem);
1337         mono_array_setref (list->array, list->first_elem, NULL);
1338         list->first_elem++;
1339         count = list->next_elem - list->first_elem;
1340         /* reduce the size of the array if it's mostly empty */
1341         if (mono_array_length (list->array) > 16 && count < (mono_array_length (list->array) / 3)) {
1342                 MonoArray *newa = mono_array_new_cached (mono_get_root_domain (), mono_defaults.object_class, mono_array_length (list->array) / 2);
1343                 mono_array_memcpy_refs (newa, 0, list->array, list->first_elem, count);
1344                 list->array = newa;
1345                 list->first_elem = 0;
1346                 list->next_elem = count;
1347         }
1348         LeaveCriticalSection (cs);
1349
1350         return ar;
1351 }
1352
1353 static void
1354 free_queue (TPQueue *list)
1355 {
1356         list->array = NULL;
1357         list->first_elem = list->next_elem = 0;
1358 }
1359
1360 static void
1361 async_invoke_thread (gpointer data)
1362 {
1363         MonoDomain *domain;
1364         MonoThread *thread;
1365         int workers, min;
1366         const gchar *version;
1367  
1368         thread = mono_thread_current ();
1369         version = mono_get_runtime_info ()->framework_version;
1370         for (;;) {
1371                 MonoAsyncResult *ar;
1372
1373                 ar = (MonoAsyncResult *) data;
1374                 if (ar) {
1375                         /* worker threads invokes methods in different domains,
1376                          * so we need to set the right domain here */
1377                         domain = ((MonoObject *)ar)->vtable->domain;
1378
1379                         g_assert (domain);
1380
1381                         if (domain->state == MONO_APPDOMAIN_UNLOADED || domain->state == MONO_APPDOMAIN_UNLOADING) {
1382                                 threadpool_jobs_dec ((MonoObject *)ar);
1383                                 unregister_job (ar);
1384                                 data = NULL;
1385                         } else {
1386                                 mono_thread_push_appdomain_ref (domain);
1387                                 if (threadpool_jobs_dec ((MonoObject *)ar)) {
1388                                         unregister_job (ar);
1389                                         data = NULL;
1390                                         mono_thread_pop_appdomain_ref ();
1391                                         continue;
1392                                 }
1393
1394                                 if (mono_domain_set (domain, FALSE)) {
1395                                         ASyncCall *ac;
1396
1397                                         mono_async_invoke (ar);
1398                                         ac = (ASyncCall *) ar->object_data;
1399                                         /*
1400                                         if (ac->msg->exc != NULL)
1401                                                 mono_unhandled_exception (ac->msg->exc);
1402                                         */
1403                                         mono_domain_set (mono_get_root_domain (), TRUE);
1404                                 }
1405                                 mono_thread_pop_appdomain_ref ();
1406                                 InterlockedDecrement (&busy_worker_threads);
1407                                 /* If the callee changes the background status, set it back to TRUE */
1408                                 if (*version != '1' && !mono_thread_test_state (thread , ThreadState_Background))
1409                                         ves_icall_System_Threading_Thread_SetState (thread, ThreadState_Background);
1410                         }
1411                 }
1412
1413                 data = dequeue_job (&mono_delegate_section, &async_call_queue);
1414
1415                 if (!data) {
1416                         guint32 wr;
1417                         int timeout = THREAD_EXIT_TIMEOUT;
1418                         guint32 start_time = mono_msec_ticks ();
1419                         
1420                         do {
1421                                 wr = WaitForSingleObjectEx (job_added, (guint32)timeout, TRUE);
1422                                 if (THREAD_WANTS_A_BREAK (thread))
1423                                         mono_thread_interruption_checkpoint ();
1424                         
1425                                 timeout -= mono_msec_ticks () - start_time;
1426                         
1427                                 if (wr != WAIT_TIMEOUT && wr != WAIT_IO_COMPLETION)
1428                                         data = dequeue_job (&mono_delegate_section, &async_call_queue);
1429                         }
1430                         while (!data && timeout > 0);
1431                 }
1432
1433                 if (!data) {
1434                         workers = (int) InterlockedCompareExchange (&mono_worker_threads, 0, -1); 
1435                         min = (int) InterlockedCompareExchange (&mono_min_worker_threads, 0, -1); 
1436         
1437                         while (!data && workers <= min) {
1438                                 WaitForSingleObjectEx (job_added, INFINITE, TRUE);
1439                                 if (THREAD_WANTS_A_BREAK (thread))
1440                                         mono_thread_interruption_checkpoint ();
1441                         
1442                                 data = dequeue_job (&mono_delegate_section, &async_call_queue);
1443                                 workers = (int) InterlockedCompareExchange (&mono_worker_threads, 0, -1); 
1444                                 min = (int) InterlockedCompareExchange (&mono_min_worker_threads, 0, -1); 
1445                         }
1446                 }
1447         
1448                 if (!data) {
1449                         InterlockedDecrement (&mono_worker_threads);
1450                         return;
1451                 }
1452                 
1453                 InterlockedIncrement (&busy_worker_threads);
1454         }
1455
1456         g_assert_not_reached ();
1457 }
1458
1459 void
1460 ves_icall_System_Threading_ThreadPool_GetAvailableThreads (gint *workerThreads, gint *completionPortThreads)
1461 {
1462         gint busy, busy_io;
1463
1464         MONO_ARCH_SAVE_REGS;
1465
1466         busy = (gint) InterlockedCompareExchange (&busy_worker_threads, 0, -1);
1467         busy_io = (gint) InterlockedCompareExchange (&busy_io_worker_threads, 0, -1);
1468         *workerThreads = mono_max_worker_threads - busy;
1469         *completionPortThreads = mono_io_max_worker_threads - busy_io;
1470 }
1471
1472 void
1473 ves_icall_System_Threading_ThreadPool_GetMaxThreads (gint *workerThreads, gint *completionPortThreads)
1474 {
1475         MONO_ARCH_SAVE_REGS;
1476
1477         *workerThreads = mono_max_worker_threads;
1478         *completionPortThreads = mono_io_max_worker_threads;
1479 }
1480
1481 void
1482 ves_icall_System_Threading_ThreadPool_GetMinThreads (gint *workerThreads, gint *completionPortThreads)
1483 {
1484         gint workers, workers_io;
1485
1486         MONO_ARCH_SAVE_REGS;
1487
1488         workers = (gint) InterlockedCompareExchange (&mono_min_worker_threads, 0, -1);
1489         workers_io = (gint) InterlockedCompareExchange (&mono_io_min_worker_threads, 0, -1);
1490
1491         *workerThreads = workers;
1492         *completionPortThreads = workers_io;
1493 }
1494
1495 MonoBoolean
1496 ves_icall_System_Threading_ThreadPool_SetMinThreads (gint workerThreads, gint completionPortThreads)
1497 {
1498         MONO_ARCH_SAVE_REGS;
1499
1500         if (workerThreads < 0 || workerThreads > mono_max_worker_threads)
1501                 return FALSE;
1502
1503         if (completionPortThreads < 0 || completionPortThreads > mono_io_max_worker_threads)
1504                 return FALSE;
1505
1506         InterlockedExchange (&mono_min_worker_threads, workerThreads);
1507         InterlockedExchange (&mono_io_min_worker_threads, completionPortThreads);
1508         mono_thread_create_internal (mono_get_root_domain (), start_idle_threads, NULL, TRUE);
1509         return TRUE;
1510 }
1511
1512 MonoBoolean
1513 ves_icall_System_Threading_ThreadPool_SetMaxThreads (gint workerThreads, gint completionPortThreads)
1514 {
1515         MONO_ARCH_SAVE_REGS;
1516
1517         if (workerThreads < mono_max_worker_threads)
1518                 return FALSE;
1519
1520         if (completionPortThreads < mono_io_max_worker_threads)
1521                 return FALSE;
1522
1523         InterlockedExchange (&mono_max_worker_threads, workerThreads);
1524         InterlockedExchange (&mono_io_max_worker_threads, completionPortThreads);
1525         return TRUE;
1526 }
1527