2010-05-21 Zoltan Varga <vargaz@gmail.com>
[mono.git] / mono / metadata / threadpool.c
1 /*
2  * threadpool.c: global thread pool
3  *
4  * Authors:
5  *   Dietmar Maurer (dietmar@ximian.com)
6  *   Gonzalo Paniagua Javier (gonzalo@ximian.com)
7  *
8  * Copyright 2001-2003 Ximian, Inc (http://www.ximian.com)
9  * Copyright 2004-2010 Novell, Inc (http://www.novell.com)
10  */
11
12 #include <config.h>
13 #include <glib.h>
14
15 #ifdef MONO_SMALL_CONFIG
16 #define QUEUE_LENGTH 16 /* Must be 2^N */
17 #else
18 #define QUEUE_LENGTH 64 /* Must be 2^N */
19 #endif
20
21 #include <mono/metadata/domain-internals.h>
22 #include <mono/metadata/tabledefs.h>
23 #include <mono/metadata/threads.h>
24 #include <mono/metadata/threads-types.h>
25 #include <mono/metadata/threadpool-internals.h>
26 #include <mono/metadata/exception.h>
27 #include <mono/metadata/file-io.h>
28 #include <mono/metadata/monitor.h>
29 #include <mono/metadata/mono-mlist.h>
30 #include <mono/metadata/marshal.h>
31 #include <mono/metadata/mono-perfcounters.h>
32 #include <mono/metadata/socket-io.h>
33 #include <mono/metadata/mono-wsq.h>
34 #include <mono/io-layer/io-layer.h>
35 #include <mono/metadata/gc-internal.h>
36 #include <mono/utils/mono-time.h>
37 #include <mono/utils/mono-proclib.h>
38 #include <mono/utils/mono-semaphore.h>
39 #include <errno.h>
40 #ifdef HAVE_SYS_TIME_H
41 #include <sys/time.h>
42 #endif
43 #include <sys/types.h>
44 #include <fcntl.h>
45 #ifdef HAVE_UNISTD_H
46 #include <unistd.h>
47 #endif
48 #include <string.h>
49 #ifdef HAVE_SYS_SOCKET_H
50 #include <sys/socket.h>
51 #endif
52 #include <mono/utils/mono-poll.h>
53 #ifdef HAVE_EPOLL
54 #include <sys/epoll.h>
55 #endif
56
57 #ifndef DISABLE_SOCKETS
58 #include "mono/io-layer/socket-wrappers.h"
59 #endif
60
61 #include "threadpool.h"
62
63 #define THREAD_WANTS_A_BREAK(t) ((t->state & (ThreadState_StopRequested | \
64                                                 ThreadState_SuspendRequested)) != 0)
65
66 #define SPIN_TRYLOCK(i) (InterlockedCompareExchange (&(i), 1, 0) == 0)
67 #define SPIN_LOCK(i) do { \
68                                 if (SPIN_TRYLOCK (i)) \
69                                         break; \
70                         } while (1)
71
72 #define SPIN_UNLOCK(i) i = 0
73
74 #define EPOLL_DEBUG(...)
75 #define EPOLL_DEBUG_STMT(...)
76 #define TP_DEBUG(...)
77 #define TP_DEBUG_STMT(...)
78
79 /* DEBUG: prints tp data every 2s */
80 #undef DEBUG 
81
82 /*
83 #define EPOLL_DEBUG(...) g_message(__VA_ARGS__)
84 #define EPOLL_DEBUG_STMT(...) do { __VA_ARGS__ } while (0)
85 #define TP_DEBUG(...) g_message(__VA_ARGS__)
86 #define TP_DEBUG_STMT(...) do { __VA_ARGS__ } while (0)
87 */
88
89 /* map of CounterSample.cs */
90 struct _MonoCounterSample {
91         gint64 rawValue;
92         gint64 baseValue;
93         gint64 counterFrequency;
94         gint64 systemFrequency;
95         gint64 timeStamp;
96         gint64 timeStamp100nSec;
97         gint64 counterTimeStamp;
98         int counterType;
99 };
100
101 /* mono_thread_pool_init called */
102 static volatile int tp_inited;
103
104 typedef struct {
105         CRITICAL_SECTION io_lock; /* access to sock_to_state */
106         int inited; // 0 -> not initialized , 1->initializing, 2->initialized, 3->cleaned up
107         int pipe [2];
108         MonoGHashTable *sock_to_state;
109
110         HANDLE new_sem; /* access to newpfd and write side of the pipe */
111         mono_pollfd *newpfd;
112         gboolean epoll_disabled;
113 #ifdef HAVE_EPOLL
114         int epollfd;
115 #endif
116 } SocketIOData;
117
118 static SocketIOData socket_io_data;
119
120 /* Keep in sync with the System.MonoAsyncCall class which provides GC tracking */
121 typedef struct {
122         MonoObject         object;
123         MonoMethodMessage *msg;
124         MonoMethod        *cb_method;
125         MonoDelegate      *cb_target;
126         MonoObject        *state;
127         MonoObject        *res;
128         MonoArray         *out_args;
129 } ASyncCall;
130
131 typedef struct {
132         MonoSemType lock;
133         MonoMList *first; /* GC root */
134         MonoMList *last;
135         MonoMList *unused; /* Up to 20 chunks. GC root */
136         gint head;
137         gint tail;
138         MonoSemType new_job;
139         volatile gint waiting; /* threads waiting for a work item */
140
141         /**/
142         volatile gint pool_status; /* 0 -> not initialized, 1 -> initialized, 2 -> cleaning up */
143         /* min, max, n and busy -> Interlocked */
144         volatile gint min_threads;
145         volatile gint max_threads;
146         volatile gint nthreads;
147         volatile gint busy_threads;
148
149         void (*async_invoke) (gpointer data);
150         void *pc_nitems; /* Performance counter for total number of items in added */
151         void *pc_nthreads; /* Performance counter for total number of active threads */
152         /**/
153         volatile gint destroy_thread;
154         volatile gint ignore_times; /* Used when there's a thread being created or destroyed */
155         volatile gint sp_lock; /* spin lock used to protect ignore_times */
156         volatile gint64 last_check;
157         volatile gint64 time_sum;
158         volatile gint n_sum;
159         gint64 averages [2];
160         /**/
161         //TP_DEBUG_ONLY (gint nodes_created);
162         //TP_DEBUG_ONLY (gint nodes_reused);
163         gboolean is_io;
164 } ThreadPool;
165
166 static ThreadPool async_tp;
167 static ThreadPool async_io_tp;
168
169 static void async_invoke_thread (gpointer data);
170 static MonoObject *mono_async_invoke (ThreadPool *tp, MonoAsyncResult *ares);
171 static void threadpool_free_queue (ThreadPool *tp);
172 static void threadpool_append_job (ThreadPool *tp, MonoObject *ar);
173 static void threadpool_append_jobs (ThreadPool *tp, MonoObject **jobs, gint njobs);
174 static void threadpool_init (ThreadPool *tp, int min_threads, int max_threads, void (*async_invoke) (gpointer));
175 static void threadpool_start_idle_threads (ThreadPool *tp);
176 static void threadpool_kill_idle_threads (ThreadPool *tp);
177
178 static MonoClass *async_call_klass;
179 static MonoClass *socket_async_call_klass;
180 static MonoClass *process_async_call_klass;
181
182 static GPtrArray *wsqs;
183 CRITICAL_SECTION wsqs_lock;
184
185 /* Hooks */
186 static MonoThreadPoolFunc tp_start_func;
187 static MonoThreadPoolFunc tp_finish_func;
188 static gpointer tp_hooks_user_data;
189 static MonoThreadPoolItemFunc tp_item_begin_func;
190 static MonoThreadPoolItemFunc tp_item_end_func;
191 static gpointer tp_item_user_data;
192
193 #define INIT_POLLFD(a, b, c) {(a)->fd = b; (a)->events = c; (a)->revents = 0;}
194 enum {
195         AIO_OP_FIRST,
196         AIO_OP_ACCEPT = 0,
197         AIO_OP_CONNECT,
198         AIO_OP_RECEIVE,
199         AIO_OP_RECEIVEFROM,
200         AIO_OP_SEND,
201         AIO_OP_SENDTO,
202         AIO_OP_RECV_JUST_CALLBACK,
203         AIO_OP_SEND_JUST_CALLBACK,
204         AIO_OP_READPIPE,
205         AIO_OP_LAST
206 };
207
208 #ifdef DISABLE_SOCKETS
209
210 #define socket_io_cleanup(x)
211
212 static int
213 get_event_from_state (MonoSocketAsyncResult *state)
214 {
215         g_assert_not_reached ();
216         return -1;
217 }
218
219 static int
220 get_events_from_list (MonoMList *list)
221 {
222         return 0;
223 }
224
225 #else
226
227 static void
228 socket_io_cleanup (SocketIOData *data)
229 {
230         EnterCriticalSection (&data->io_lock);
231         if (data->inited != 2) {
232                 LeaveCriticalSection (&data->io_lock);
233                 return;
234         }
235         data->inited = 3;
236
237 #ifdef HOST_WIN32
238         closesocket (data->pipe [0]);
239         closesocket (data->pipe [1]);
240 #else
241         if (data->pipe [0] > -1)
242                 close (data->pipe [0]);
243         if (data->pipe [1] > -1)
244                 close (data->pipe [1]);
245 #endif
246         data->pipe [0] = -1;
247         data->pipe [1] = -1;
248         if (data->new_sem)
249                 CloseHandle (data->new_sem);
250         data->new_sem = NULL;
251         mono_g_hash_table_destroy (data->sock_to_state);
252         data->sock_to_state = NULL;
253         g_free (data->newpfd);
254         data->newpfd = NULL;
255 #ifdef HAVE_EPOLL
256         if (FALSE == data->epoll_disabled)
257                 close (data->epollfd);
258 #endif
259         LeaveCriticalSection (&data->io_lock);
260 }
261
262 static int
263 get_event_from_state (MonoSocketAsyncResult *state)
264 {
265         switch (state->operation) {
266         case AIO_OP_ACCEPT:
267         case AIO_OP_RECEIVE:
268         case AIO_OP_RECV_JUST_CALLBACK:
269         case AIO_OP_RECEIVEFROM:
270         case AIO_OP_READPIPE:
271                 return MONO_POLLIN;
272         case AIO_OP_SEND:
273         case AIO_OP_SEND_JUST_CALLBACK:
274         case AIO_OP_SENDTO:
275         case AIO_OP_CONNECT:
276                 return MONO_POLLOUT;
277         default: /* Should never happen */
278                 g_message ("get_event_from_state: unknown value in switch!!!");
279                 return 0;
280         }
281 }
282
283 static int
284 get_events_from_list (MonoMList *list)
285 {
286         MonoSocketAsyncResult *state;
287         int events = 0;
288
289         while (list && (state = (MonoSocketAsyncResult *)mono_mlist_get_data (list))) {
290                 events |= get_event_from_state (state);
291                 list = mono_mlist_next (list);
292         }
293
294         return events;
295 }
296
297 #define ICALL_RECV(x)   ves_icall_System_Net_Sockets_Socket_Receive_internal (\
298                                 (SOCKET)(gssize)x->handle, x->buffer, x->offset, x->size,\
299                                  x->socket_flags, &x->error);
300
301 #define ICALL_SEND(x)   ves_icall_System_Net_Sockets_Socket_Send_internal (\
302                                 (SOCKET)(gssize)x->handle, x->buffer, x->offset, x->size,\
303                                  x->socket_flags, &x->error);
304
305 #endif /* !DISABLE_SOCKETS */
306
307 static void
308 threadpool_jobs_inc (MonoObject *obj)
309 {
310         if (obj)
311                 InterlockedIncrement (&obj->vtable->domain->threadpool_jobs);
312 }
313
314 static gboolean
315 threadpool_jobs_dec (MonoObject *obj)
316 {
317         MonoDomain *domain = obj->vtable->domain;
318         int remaining_jobs = InterlockedDecrement (&domain->threadpool_jobs);
319         if (remaining_jobs == 0 && domain->cleanup_semaphore) {
320                 ReleaseSemaphore (domain->cleanup_semaphore, 1, NULL);
321                 return TRUE;
322         }
323         return FALSE;
324 }
325
326 #ifdef HAVE_EPOLL
327 static MonoObject *
328 get_io_event (MonoMList **list, gint event)
329 {
330         MonoObject *state;
331         MonoMList *current;
332         MonoMList *prev;
333
334         current = *list;
335         prev = NULL;
336         state = NULL;
337         while (current) {
338                 state = mono_mlist_get_data (current);
339                 if (get_event_from_state ((MonoSocketAsyncResult *) state) == event)
340                         break;
341
342                 state = NULL;
343                 prev = current;
344                 current = mono_mlist_next (current);
345         }
346
347         if (current) {
348                 if (prev) {
349                         mono_mlist_set_next (prev, mono_mlist_next (current));
350                 } else {
351                         *list = mono_mlist_next (*list);
352                 }
353         }
354
355         return state;
356 }
357 #endif
358
359 static MonoMList *
360 process_io_event (MonoMList *list, int event)
361 {
362         MonoSocketAsyncResult *state;
363         MonoMList *oldlist;
364
365         oldlist = list;
366         state = NULL;
367         while (list) {
368                 state = (MonoSocketAsyncResult *) mono_mlist_get_data (list);
369                 if (get_event_from_state (state) == event)
370                         break;
371                 
372                 list = mono_mlist_next (list);
373         }
374
375         if (list != NULL) {
376                 oldlist = mono_mlist_remove_item (oldlist, list);
377                 EPOLL_DEBUG ("Dispatching event %d on socket %p", event, state->handle);
378                 threadpool_append_job (&async_io_tp, (MonoObject *) state);
379         }
380
381         return oldlist;
382 }
383
384 static int
385 mark_bad_fds (mono_pollfd *pfds, int nfds)
386 {
387         int i, ret;
388         mono_pollfd *pfd;
389         int count = 0;
390
391         for (i = 0; i < nfds; i++) {
392                 pfd = &pfds [i];
393                 if (pfd->fd == -1)
394                         continue;
395
396                 ret = mono_poll (pfd, 1, 0);
397                 if (ret == -1 && errno == EBADF) {
398                         pfd->revents |= MONO_POLLNVAL;
399                         count++;
400                 } else if (ret == 1) {
401                         count++;
402                 }
403         }
404
405         return count;
406 }
407
408 static void
409 socket_io_poll_main (gpointer p)
410 {
411 #if MONO_SMALL_CONFIG
412 #define INITIAL_POLLFD_SIZE     128
413 #else
414 #define INITIAL_POLLFD_SIZE     1024
415 #endif
416 #define POLL_ERRORS (MONO_POLLERR | MONO_POLLHUP | MONO_POLLNVAL)
417         SocketIOData *data = p;
418         mono_pollfd *pfds;
419         gint maxfd = 1;
420         gint allocated;
421         gint i;
422         MonoInternalThread *thread;
423
424         thread = mono_thread_internal_current ();
425
426         allocated = INITIAL_POLLFD_SIZE;
427         pfds = g_new0 (mono_pollfd, allocated);
428         INIT_POLLFD (pfds, data->pipe [0], MONO_POLLIN);
429         for (i = 1; i < allocated; i++)
430                 INIT_POLLFD (&pfds [i], -1, 0);
431
432         while (1) {
433                 int nsock = 0;
434                 mono_pollfd *pfd;
435                 char one [1];
436                 MonoMList *list;
437
438                 do {
439                         if (nsock == -1) {
440                                 if (THREAD_WANTS_A_BREAK (thread))
441                                         mono_thread_interruption_checkpoint ();
442                         }
443
444                         nsock = mono_poll (pfds, maxfd, -1);
445                 } while (nsock == -1 && errno == EINTR);
446
447                 /* 
448                  * Apart from EINTR, we only check EBADF, for the rest:
449                  *  EINVAL: mono_poll() 'protects' us from descriptor
450                  *      numbers above the limit if using select() by marking
451                  *      then as MONO_POLLERR.  If a system poll() is being
452                  *      used, the number of descriptor we're passing will not
453                  *      be over sysconf(_SC_OPEN_MAX), as the error would have
454                  *      happened when opening.
455                  *
456                  *  EFAULT: we own the memory pointed by pfds.
457                  *  ENOMEM: we're doomed anyway
458                  *
459                  */
460
461                 if (nsock == -1 && errno == EBADF) {
462                         pfds->revents = 0; /* Just in case... */
463                         nsock = mark_bad_fds (pfds, maxfd);
464                 }
465
466                 if ((pfds->revents & POLL_ERRORS) != 0) {
467                         /* We're supposed to die now, as the pipe has been closed */
468                         g_free (pfds);
469                         socket_io_cleanup (data);
470                         return;
471                 }
472
473                 /* Got a new socket */
474                 if ((pfds->revents & MONO_POLLIN) != 0) {
475                         int nread;
476
477                         for (i = 1; i < allocated; i++) {
478                                 pfd = &pfds [i];
479                                 if (pfd->fd == -1 || data->newpfd == NULL ||
480                                         pfd->fd == data->newpfd->fd)
481                                         break;
482                         }
483
484                         if (i == allocated) {
485                                 mono_pollfd *oldfd;
486
487                                 oldfd = pfds;
488                                 i = allocated;
489                                 allocated = allocated * 2;
490                                 pfds = g_renew (mono_pollfd, oldfd, allocated);
491                                 g_free (oldfd);
492                                 for (; i < allocated; i++)
493                                         INIT_POLLFD (&pfds [i], -1, 0);
494                         }
495 #ifndef HOST_WIN32
496                         nread = read (data->pipe [0], one, 1);
497 #else
498                         nread = recv ((SOCKET) data->pipe [0], one, 1, 0);
499 #endif
500                         if (nread <= 0) {
501                                 g_free (pfds);
502                                 return; /* we're closed */
503                         }
504
505                         INIT_POLLFD (&pfds [i], data->newpfd->fd, data->newpfd->events);
506                         ReleaseSemaphore (data->new_sem, 1, NULL);
507                         if (i >= maxfd)
508                                 maxfd = i + 1;
509                         nsock--;
510                 }
511
512                 if (nsock == 0)
513                         continue;
514
515                 EnterCriticalSection (&data->io_lock);
516                 if (data->inited == 3) {
517                         g_free (pfds);
518                         LeaveCriticalSection (&data->io_lock);
519                         return; /* cleanup called */
520                 }
521
522                 for (i = 1; i < maxfd && nsock > 0; i++) {
523                         pfd = &pfds [i];
524                         if (pfd->fd == -1 || pfd->revents == 0)
525                                 continue;
526
527                         nsock--;
528                         list = mono_g_hash_table_lookup (data->sock_to_state, GINT_TO_POINTER (pfd->fd));
529                         if (list != NULL && (pfd->revents & (MONO_POLLIN | POLL_ERRORS)) != 0) {
530                                 list = process_io_event (list, MONO_POLLIN);
531                         }
532
533                         if (list != NULL && (pfd->revents & (MONO_POLLOUT | POLL_ERRORS)) != 0) {
534                                 list = process_io_event (list, MONO_POLLOUT);
535                         }
536
537                         if (list != NULL) {
538                                 mono_g_hash_table_replace (data->sock_to_state, GINT_TO_POINTER (pfd->fd), list);
539                                 pfd->events = get_events_from_list (list);
540                         } else {
541                                 mono_g_hash_table_remove (data->sock_to_state, GINT_TO_POINTER (pfd->fd));
542                                 pfd->fd = -1;
543                                 if (i == maxfd - 1)
544                                         maxfd--;
545                         }
546                 }
547                 LeaveCriticalSection (&data->io_lock);
548         }
549 }
550
551 #ifdef HAVE_EPOLL
552 #define EPOLL_ERRORS (EPOLLERR | EPOLLHUP)
553 #define EPOLL_NEVENTS   128
554 static void
555 socket_io_epoll_main (gpointer p)
556 {
557         SocketIOData *data;
558         int epollfd;
559         MonoInternalThread *thread;
560         struct epoll_event *events, *evt;
561         int ready = 0, i;
562         gpointer async_results [EPOLL_NEVENTS * 2]; // * 2 because each loop can add up to 2 results here
563         gint nresults;
564
565         data = p;
566         epollfd = data->epollfd;
567         thread = mono_thread_internal_current ();
568         events = g_new0 (struct epoll_event, EPOLL_NEVENTS);
569
570         while (1) {
571                 do {
572                         if (ready == -1) {
573                                 if (THREAD_WANTS_A_BREAK (thread))
574                                         mono_thread_interruption_checkpoint ();
575                         }
576                         EPOLL_DEBUG ("epoll_wait init");
577                         ready = epoll_wait (epollfd, events, EPOLL_NEVENTS, -1);
578                         EPOLL_DEBUG_STMT(
579                                 int err = errno;
580                                 EPOLL_DEBUG ("epoll_wait end with %d ready sockets (%d %s).", ready, err, (ready == -1) ? g_strerror (err) : "");
581                                 errno = err;
582                         );
583                 } while (ready == -1 && errno == EINTR);
584
585                 if (ready == -1) {
586                         int err = errno;
587                         g_free (events);
588                         if (err != EBADF)
589                                 g_warning ("epoll_wait: %d %s", err, g_strerror (err));
590
591                         close (epollfd);
592                         return;
593                 }
594
595                 EnterCriticalSection (&data->io_lock);
596                 if (data->inited == 3) {
597                         g_free (events);
598                         close (epollfd);
599                         LeaveCriticalSection (&data->io_lock);
600                         return; /* cleanup called */
601                 }
602
603                 nresults = 0;
604                 for (i = 0; i < ready; i++) {
605                         int fd;
606                         MonoMList *list;
607                         MonoObject *ares;
608
609                         evt = &events [i];
610                         fd = evt->data.fd;
611                         list = mono_g_hash_table_lookup (data->sock_to_state, GINT_TO_POINTER (fd));
612                         EPOLL_DEBUG ("Event %d on %d list length: %d", evt->events, fd, mono_mlist_length (list));
613                         if (list != NULL && (evt->events & (EPOLLIN | EPOLL_ERRORS)) != 0) {
614                                 ares = get_io_event (&list, MONO_POLLIN);
615                                 if (ares != NULL)
616                                         async_results [nresults++] = ares;
617                         }
618
619                         if (list != NULL && (evt->events & (EPOLLOUT | EPOLL_ERRORS)) != 0) {
620                                 ares = get_io_event (&list, MONO_POLLOUT);
621                                 if (ares != NULL)
622                                         async_results [nresults++] = ares;
623                         }
624
625                         if (list != NULL) {
626                                 mono_g_hash_table_replace (data->sock_to_state, GINT_TO_POINTER (fd), list);
627                                 evt->events = get_events_from_list (list);
628                                 EPOLL_DEBUG ("MOD %d to %d", fd, evt->events);
629                                 if (epoll_ctl (epollfd, EPOLL_CTL_MOD, fd, evt)) {
630                                         if (epoll_ctl (epollfd, EPOLL_CTL_ADD, fd, evt) == -1) {
631                                                 EPOLL_DEBUG_STMT (
632                                                         int err = errno;
633                                                         EPOLL_DEBUG ("epoll_ctl(MOD): %d %s fd: %d events: %d", err, g_strerror (err), fd, evt->events);
634                                                         errno = err;
635                                                 );
636                                         }
637                                 }
638                         } else {
639                                 mono_g_hash_table_remove (data->sock_to_state, GINT_TO_POINTER (fd));
640                                 EPOLL_DEBUG ("DEL %d", fd);
641                                 epoll_ctl (epollfd, EPOLL_CTL_DEL, fd, evt);
642                         }
643                 }
644                 LeaveCriticalSection (&data->io_lock);
645                 threadpool_append_jobs (&async_io_tp, (MonoObject **) async_results, nresults);
646                 memset (async_results, 0, sizeof (gpointer) * nresults);
647         }
648 }
649 #undef EPOLL_NEVENTS
650 #endif
651
652 /*
653  * select/poll wake up when a socket is closed, but epoll just removes
654  * the socket from its internal list without notification.
655  */
656 void
657 mono_thread_pool_remove_socket (int sock)
658 {
659         MonoMList *list, *next;
660         MonoSocketAsyncResult *state;
661
662         if (socket_io_data.inited == 0)
663                 return;
664
665         EnterCriticalSection (&socket_io_data.io_lock);
666         if (socket_io_data.sock_to_state == NULL) {
667                 LeaveCriticalSection (&socket_io_data.io_lock);
668                 return;
669         }
670         list = mono_g_hash_table_lookup (socket_io_data.sock_to_state, GINT_TO_POINTER (sock));
671         if (list)
672                 mono_g_hash_table_remove (socket_io_data.sock_to_state, GINT_TO_POINTER (sock));
673         LeaveCriticalSection (&socket_io_data.io_lock);
674         
675         while (list) {
676                 state = (MonoSocketAsyncResult *) mono_mlist_get_data (list);
677                 if (state->operation == AIO_OP_RECEIVE)
678                         state->operation = AIO_OP_RECV_JUST_CALLBACK;
679                 else if (state->operation == AIO_OP_SEND)
680                         state->operation = AIO_OP_SEND_JUST_CALLBACK;
681
682                 next = mono_mlist_remove_item (list, list);
683                 list = process_io_event (list, MONO_POLLIN);
684                 if (list)
685                         process_io_event (list, MONO_POLLOUT);
686
687                 list = next;
688         }
689 }
690
691 #ifdef HOST_WIN32
692 static void
693 connect_hack (gpointer x)
694 {
695         struct sockaddr_in *addr = (struct sockaddr_in *) x;
696         int count = 0;
697
698         while (connect ((SOCKET) socket_io_data.pipe [1], (SOCKADDR *) addr, sizeof (struct sockaddr_in))) {
699                 Sleep (500);
700                 if (++count > 3) {
701                         g_warning ("Error initializing async. sockets %d.", WSAGetLastError ());
702                         g_assert (WSAGetLastError ());
703                 }
704         }
705 }
706 #endif
707
708 static void
709 socket_io_init (SocketIOData *data)
710 {
711 #ifdef HOST_WIN32
712         struct sockaddr_in server;
713         struct sockaddr_in client;
714         SOCKET srv;
715         int len;
716 #endif
717         int inited;
718         guint32 stack_size;
719
720         if (data->inited >= 2) // 2 -> initialized, 3-> cleaned up
721                 return;
722
723         inited = InterlockedCompareExchange (&data->inited, 1, 0);
724         if (inited >= 1) {
725                 while (TRUE) {
726                         if (data->inited >= 2)
727                                 return;
728                         SleepEx (1, FALSE);
729                 }
730         }
731
732         EnterCriticalSection (&data->io_lock);
733
734 #ifdef HAVE_EPOLL
735         data->epoll_disabled = (g_getenv ("MONO_DISABLE_AIO") != NULL);
736         if (FALSE == data->epoll_disabled) {
737                 data->epollfd = epoll_create (256);
738                 data->epoll_disabled = (data->epollfd == -1);
739                 if (data->epoll_disabled && g_getenv ("MONO_DEBUG"))
740                         g_message ("epoll_create() failed. Using plain poll().");
741         } else {
742                 data->epollfd = -1;
743         }
744 #else
745         data->epoll_disabled = TRUE;
746 #endif
747
748 #ifndef HOST_WIN32
749         if (data->epoll_disabled) {
750                 if (pipe (data->pipe) != 0) {
751                         int err = errno;
752                         perror ("mono");
753                         g_assert (err);
754                 }
755         } else {
756                 data->pipe [0] = -1;
757                 data->pipe [1] = -1;
758         }
759 #else
760         srv = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
761         g_assert (srv != INVALID_SOCKET);
762         data->pipe [1] = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
763         g_assert (data->pipe [1] != INVALID_SOCKET);
764
765         server.sin_family = AF_INET;
766         server.sin_addr.s_addr = inet_addr ("127.0.0.1");
767         server.sin_port = 0;
768         if (bind (srv, (SOCKADDR *) &server, sizeof (server))) {
769                 g_print ("%d\n", WSAGetLastError ());
770                 g_assert (1 != 0);
771         }
772
773         len = sizeof (server);
774         getsockname (srv, (SOCKADDR *) &server, &len);
775         listen (srv, 1);
776         mono_thread_create (mono_get_root_domain (), connect_hack, &server);
777         len = sizeof (server);
778         data->pipe [0] = accept (srv, (SOCKADDR *) &client, &len);
779         g_assert (data->pipe [0] != INVALID_SOCKET);
780         closesocket (srv);
781 #endif
782         data->sock_to_state = mono_g_hash_table_new_type (g_direct_hash, g_direct_equal, MONO_HASH_VALUE_GC);
783
784         if (data->epoll_disabled) {
785                 data->new_sem = CreateSemaphore (NULL, 1, 1, NULL);
786                 g_assert (data->new_sem != NULL);
787         }
788
789         stack_size = mono_threads_get_default_stacksize ();
790         mono_threads_set_default_stacksize (128 * 1024);
791         if (data->epoll_disabled) {
792                 mono_thread_create_internal (mono_get_root_domain (), socket_io_poll_main, data, TRUE);
793         }
794 #ifdef HAVE_EPOLL
795         else {
796                 mono_thread_create_internal (mono_get_root_domain (), socket_io_epoll_main, data, TRUE);
797         }
798 #endif
799         mono_threads_set_default_stacksize (stack_size);
800         LeaveCriticalSection (&data->io_lock);
801         data->inited = 2;
802 }
803
804 static void
805 socket_io_add_poll (MonoSocketAsyncResult *state)
806 {
807         int events;
808         char msg [1];
809         MonoMList *list;
810         SocketIOData *data = &socket_io_data;
811         int w;
812
813         if (mono_runtime_is_shutting_down () || data->inited == 3 || data->sock_to_state == NULL)
814                 return;
815
816 #if defined(PLATFORM_MACOSX) || defined(PLATFORM_BSD) || defined(HOST_WIN32) || defined(PLATFORM_SOLARIS)
817         /* select() for connect() does not work well on the Mac. Bug #75436. */
818         /* Bug #77637 for the BSD 6 case */
819         /* Bug #78888 for the Windows case */
820         if (state->operation == AIO_OP_CONNECT && state->blocking == TRUE) {
821                 //FIXME: increment number of threads while this one is waiting?
822                 threadpool_append_job (&async_io_tp, (MonoObject *) state);
823                 return;
824         }
825 #endif
826         WaitForSingleObject (data->new_sem, INFINITE);
827         if (data->newpfd == NULL)
828                 data->newpfd = g_new0 (mono_pollfd, 1);
829
830         EnterCriticalSection (&data->io_lock);
831         if (data->sock_to_state == NULL) {
832                 LeaveCriticalSection (&data->io_lock);
833                 return;
834         }
835
836         /* FIXME: 64 bit issue: handle can be a pointer on windows? */
837         list = mono_g_hash_table_lookup (data->sock_to_state, GINT_TO_POINTER (state->handle));
838         if (list == NULL) {
839                 list = mono_mlist_alloc ((MonoObject*)state);
840         } else {
841                 list = mono_mlist_append (list, (MonoObject*)state);
842         }
843
844         events = get_events_from_list (list);
845         INIT_POLLFD (data->newpfd, GPOINTER_TO_INT (state->handle), events);
846         mono_g_hash_table_replace (data->sock_to_state, GINT_TO_POINTER (state->handle), list);
847         LeaveCriticalSection (&data->io_lock);
848         *msg = (char) state->operation;
849 #ifndef HOST_WIN32
850         w = write (data->pipe [1], msg, 1);
851         w = w;
852 #else
853         send ((SOCKET) data->pipe [1], msg, 1, 0);
854 #endif
855 }
856
857 #ifdef HAVE_EPOLL
858 static gboolean
859 socket_io_add_epoll (MonoSocketAsyncResult *state)
860 {
861         MonoMList *list;
862         SocketIOData *data = &socket_io_data;
863         struct epoll_event event;
864         int epoll_op, ievt;
865         int fd;
866
867         if (mono_runtime_is_shutting_down () || data->inited == 3 || data->sock_to_state == NULL)
868                 return TRUE;
869
870         memset (&event, 0, sizeof (struct epoll_event));
871         fd = GPOINTER_TO_INT (state->handle);
872         EnterCriticalSection (&data->io_lock);
873         if (data->sock_to_state == NULL) {
874                 LeaveCriticalSection (&data->io_lock);
875                 return TRUE;
876         }
877         list = mono_g_hash_table_lookup (data->sock_to_state, GINT_TO_POINTER (fd));
878         if (list == NULL) {
879                 list = mono_mlist_alloc ((MonoObject*)state);
880                 epoll_op = EPOLL_CTL_ADD;
881         } else {
882                 list = mono_mlist_append (list, (MonoObject*)state);
883                 epoll_op = EPOLL_CTL_MOD;
884         }
885
886         ievt = get_events_from_list (list);
887         if ((ievt & MONO_POLLIN) != 0)
888                 event.events |= EPOLLIN;
889         if ((ievt & MONO_POLLOUT) != 0)
890                 event.events |= EPOLLOUT;
891
892         mono_g_hash_table_replace (data->sock_to_state, state->handle, list);
893         event.data.fd = fd;
894         EPOLL_DEBUG ("%s %d with %d", epoll_op == EPOLL_CTL_ADD ? "ADD" : "MOD", fd, event.events);
895         if (epoll_ctl (data->epollfd, epoll_op, fd, &event) == -1) {
896                 int err = errno;
897                 if (epoll_op == EPOLL_CTL_ADD && err == EEXIST) {
898                         epoll_op = EPOLL_CTL_MOD;
899                         if (epoll_ctl (data->epollfd, epoll_op, fd, &event) == -1) {
900                                 g_message ("epoll_ctl(MOD): %d %s", err, g_strerror (err));
901                         }
902                 }
903         }
904         LeaveCriticalSection (&data->io_lock);
905
906         return TRUE;
907 }
908 #endif
909
910 static void
911 socket_io_add (MonoAsyncResult *ares, MonoSocketAsyncResult *state)
912 {
913         if (async_tp.pool_status == 2 || mono_runtime_is_shutting_down ())
914                 return;
915
916         socket_io_init (&socket_io_data);
917
918         MONO_OBJECT_SETREF (state, ares, ares);
919 #ifdef HAVE_EPOLL
920         if (socket_io_data.epoll_disabled == FALSE) {
921                 if (socket_io_add_epoll (state))
922                         return;
923         }
924 #endif
925         socket_io_add_poll (state);
926 }
927
928 #ifndef DISABLE_SOCKETS
929 static gboolean
930 socket_io_filter (MonoObject *target, MonoObject *state)
931 {
932         gint op;
933         MonoSocketAsyncResult *sock_res = (MonoSocketAsyncResult *) state;
934         MonoClass *klass;
935
936         if (target == NULL || state == NULL)
937                 return FALSE;
938
939         if (socket_async_call_klass == NULL) {
940                 klass = target->vtable->klass;
941                 /* Check if it's SocketAsyncCall in System.Net.Sockets
942                  * FIXME: check the assembly is signed correctly for extra care
943                  */
944                 if (klass->name [0] == 'S' && strcmp (klass->name, "SocketAsyncCall") == 0 
945                                 && strcmp (mono_image_get_name (klass->image), "System") == 0
946                                 && klass->nested_in && strcmp (klass->nested_in->name, "Socket") == 0)
947                         socket_async_call_klass = klass;
948         }
949
950         if (process_async_call_klass == NULL) {
951                 klass = target->vtable->klass;
952                 /* Check if it's AsyncReadHandler in System.Diagnostics.Process
953                  * FIXME: check the assembly is signed correctly for extra care
954                  */
955                 if (klass->name [0] == 'A' && strcmp (klass->name, "AsyncReadHandler") == 0 
956                                 && strcmp (mono_image_get_name (klass->image), "System") == 0
957                                 && klass->nested_in && strcmp (klass->nested_in->name, "Process") == 0)
958                         process_async_call_klass = klass;
959         }
960         /* return both when socket_async_call_klass has not been seen yet and when
961          * the object is not an instance of the class.
962          */
963         if (target->vtable->klass != socket_async_call_klass && target->vtable->klass != process_async_call_klass)
964                 return FALSE;
965
966         op = sock_res->operation;
967         if (op < AIO_OP_FIRST || op >= AIO_OP_LAST)
968                 return FALSE;
969
970         return TRUE;
971 }
972 #endif /* !DISABLE_SOCKETS */
973
974 /* Returns the exception thrown when invoking, if any */
975 static MonoObject *
976 mono_async_invoke (ThreadPool *tp, MonoAsyncResult *ares)
977 {
978         ASyncCall *ac = (ASyncCall *)ares->object_data;
979         MonoObject *res, *exc = NULL;
980         MonoArray *out_args = NULL;
981         HANDLE wait_event = NULL;
982
983         if (ares->execution_context) {
984                 /* use captured ExecutionContext (if available) */
985                 MONO_OBJECT_SETREF (ares, original_context, mono_thread_get_execution_context ());
986                 mono_thread_set_execution_context (ares->execution_context);
987         } else {
988                 ares->original_context = NULL;
989         }
990
991         if (ac == NULL) {
992                 /* Fast path from ThreadPool.*QueueUserWorkItem */
993                 void *pa = ares->async_state;
994                 mono_runtime_delegate_invoke (ares->async_delegate, &pa, &exc);
995         } else {
996                 ac->msg->exc = NULL;
997                 res = mono_message_invoke (ares->async_delegate, ac->msg, &exc, &out_args);
998                 MONO_OBJECT_SETREF (ac, res, res);
999                 MONO_OBJECT_SETREF (ac, msg->exc, exc);
1000                 MONO_OBJECT_SETREF (ac, out_args, out_args);
1001
1002                 mono_monitor_enter ((MonoObject *) ares);
1003                 ares->completed = 1;
1004                 if (ares->handle != NULL)
1005                         wait_event = mono_wait_handle_get_handle ((MonoWaitHandle *) ares->handle);
1006                 mono_monitor_exit ((MonoObject *) ares);
1007                 /* notify listeners */
1008                 if (wait_event != NULL)
1009                         SetEvent (wait_event);
1010
1011                 /* call async callback if cb_method != null*/
1012                 if (ac != NULL && ac->cb_method) {
1013                         MonoObject *exc = NULL;
1014                         void *pa = &ares;
1015                         mono_runtime_invoke (ac->cb_method, ac->cb_target, pa, &exc);
1016                         /* 'exc' will be the previous ac->msg->exc if not NULL and not
1017                          * catched. If catched, this will be set to NULL and the
1018                          * exception will not be printed. */
1019                         MONO_OBJECT_SETREF (ac->msg, exc, exc);
1020                 }
1021         }
1022
1023         /* restore original thread execution context if flow isn't suppressed, i.e. non null */
1024         if (ares->original_context) {
1025                 mono_thread_set_execution_context (ares->original_context);
1026                 ares->original_context = NULL;
1027         }
1028         return exc;
1029 }
1030
1031 static void
1032 threadpool_start_idle_threads (ThreadPool *tp)
1033 {
1034         int n;
1035
1036         do {
1037                 while (1) {
1038                         n = tp->nthreads;
1039                         if (n >= tp->min_threads)
1040                                 return;
1041                         if (InterlockedCompareExchange (&tp->nthreads, n + 1, n) == n)
1042                                 break;
1043                 }
1044                 mono_perfcounter_update_value (tp->pc_nthreads, TRUE, 1);
1045                 mono_thread_create_internal (mono_get_root_domain (), tp->async_invoke, tp, TRUE);
1046                 SleepEx (100, TRUE);
1047         } while (1);
1048 }
1049
1050 static void
1051 threadpool_init (ThreadPool *tp, int min_threads, int max_threads, void (*async_invoke) (gpointer))
1052 {
1053         memset (tp, 0, sizeof (ThreadPool));
1054         MONO_SEM_INIT (&tp->lock, 1);
1055         tp->min_threads = min_threads;
1056         tp->max_threads = max_threads;
1057         tp->async_invoke = async_invoke;
1058         MONO_SEM_INIT (&tp->new_job, 0);
1059 }
1060
1061 static void *
1062 init_perf_counter (const char *category, const char *counter)
1063 {
1064         MonoString *category_str;
1065         MonoString *counter_str;
1066         MonoString *machine;
1067         MonoDomain *root;
1068         MonoBoolean custom;
1069         int type;
1070
1071         if (category == NULL || counter == NULL)
1072                 return NULL;
1073         root = mono_get_root_domain ();
1074         category_str = mono_string_new (root, category);
1075         counter_str = mono_string_new (root, counter);
1076         machine = mono_string_new (root, ".");
1077         return mono_perfcounter_get_impl (category_str, counter_str, NULL, machine, &type, &custom);
1078 }
1079
1080 #ifdef DEBUG
1081 static void
1082 print_pool_info (ThreadPool *tp)
1083 {
1084
1085 //      if (tp->tail - tp->head == 0)
1086 //              return;
1087
1088         g_print ("Pool status? %d\n", InterlockedCompareExchange (&tp->pool_status, 0, 0));
1089         g_print ("Min. threads: %d\n", InterlockedCompareExchange (&tp->min_threads, 0, 0));
1090         g_print ("Max. threads: %d\n", InterlockedCompareExchange (&tp->max_threads, 0, 0));
1091         g_print ("nthreads: %d\n", InterlockedCompareExchange (&tp->nthreads, 0, 0));
1092         g_print ("busy threads: %d\n", InterlockedCompareExchange (&tp->busy_threads, 0, 0));
1093         g_print ("Waiting: %d\n", InterlockedCompareExchange (&tp->waiting, 0, 0));
1094         g_print ("Queued: %d\n", (tp->tail - tp->head));
1095         if (tp == &async_tp) {
1096                 int i;
1097                 EnterCriticalSection (&wsqs_lock);
1098                 for (i = 0; i < wsqs->len; i++) {
1099                         g_print ("\tWSQ %d: %d\n", i, mono_wsq_count (g_ptr_array_index (wsqs, i)));
1100                 }
1101                 LeaveCriticalSection (&wsqs_lock);
1102         } else {
1103                 g_print ("\tSockets: %d\n", mono_g_hash_table_size (socket_io_data.sock_to_state));
1104         }
1105         g_print ("-------------\n");
1106 }
1107
1108 static void
1109 signal_handler (int signo)
1110 {
1111         ThreadPool *tp;
1112
1113         tp = &async_tp;
1114         MONO_SEM_WAIT (&tp->lock);
1115         g_print ("\n-----Non-IO-----\n");
1116         print_pool_info (tp);
1117         MONO_SEM_POST (&tp->lock);
1118         tp = &async_io_tp;
1119         MONO_SEM_WAIT (&tp->lock);
1120         g_print ("\n-----IO-----\n");
1121         print_pool_info (tp);
1122         MONO_SEM_POST (&tp->lock);
1123         alarm (2);
1124 }
1125 #endif
1126
1127 void
1128 mono_thread_pool_init ()
1129 {
1130         gint threads_per_cpu = 1;
1131         gint thread_count;
1132         gint cpu_count = mono_cpu_count ();
1133         int result;
1134
1135         if (tp_inited == 2)
1136                 return;
1137
1138         result = InterlockedCompareExchange (&tp_inited, 1, 0);
1139         if (result == 1) {
1140                 while (1) {
1141                         SleepEx (1, FALSE);
1142                         if (tp_inited == 2)
1143                                 return;
1144                 }
1145         }
1146
1147         MONO_GC_REGISTER_ROOT (async_tp.first);
1148         MONO_GC_REGISTER_ROOT (async_tp.last);
1149         MONO_GC_REGISTER_ROOT (async_tp.unused);
1150         MONO_GC_REGISTER_ROOT (async_io_tp.first);
1151         MONO_GC_REGISTER_ROOT (async_io_tp.unused);
1152         MONO_GC_REGISTER_ROOT (async_io_tp.last);
1153
1154         MONO_GC_REGISTER_ROOT (socket_io_data.sock_to_state);
1155         InitializeCriticalSection (&socket_io_data.io_lock);
1156         if (g_getenv ("MONO_THREADS_PER_CPU") != NULL) {
1157                 threads_per_cpu = atoi (g_getenv ("MONO_THREADS_PER_CPU"));
1158                 if (threads_per_cpu < 1)
1159                         threads_per_cpu = 1;
1160         }
1161
1162         thread_count = MIN (cpu_count * threads_per_cpu, 100 * cpu_count);
1163         threadpool_init (&async_tp, thread_count, MAX (100 * cpu_count, thread_count), async_invoke_thread);
1164         threadpool_init (&async_io_tp, cpu_count * 2, cpu_count * 4, async_invoke_thread);
1165         async_io_tp.is_io = TRUE;
1166
1167         async_call_klass = mono_class_from_name (mono_defaults.corlib, "System", "MonoAsyncCall");
1168         g_assert (async_call_klass);
1169
1170         InitializeCriticalSection (&wsqs_lock);
1171         wsqs = g_ptr_array_sized_new (MAX (100 * cpu_count, thread_count));
1172         mono_wsq_init ();
1173
1174         async_tp.pc_nitems = init_perf_counter ("Mono Threadpool", "Work Items Added");
1175         g_assert (async_tp.pc_nitems);
1176
1177         async_io_tp.pc_nitems = init_perf_counter ("Mono Threadpool", "IO Work Items Added");
1178         g_assert (async_io_tp.pc_nitems);
1179
1180         async_tp.pc_nthreads = init_perf_counter ("Mono Threadpool", "# of Threads");
1181         g_assert (async_tp.pc_nthreads);
1182
1183         async_io_tp.pc_nthreads = init_perf_counter ("Mono Threadpool", "# of IO Threads");
1184         g_assert (async_io_tp.pc_nthreads);
1185         tp_inited = 2;
1186 #ifdef DEBUG
1187         signal (SIGALRM, signal_handler);
1188         alarm (2);
1189 #endif
1190 }
1191
1192 MonoAsyncResult *
1193 mono_thread_pool_add (MonoObject *target, MonoMethodMessage *msg, MonoDelegate *async_callback,
1194                       MonoObject *state)
1195 {
1196         MonoDomain *domain = mono_domain_get ();
1197         MonoAsyncResult *ares;
1198         ASyncCall *ac;
1199
1200         ac = (ASyncCall*)mono_object_new (domain, async_call_klass);
1201         MONO_OBJECT_SETREF (ac, msg, msg);
1202         MONO_OBJECT_SETREF (ac, state, state);
1203
1204         if (async_callback) {
1205                 ac->cb_method = mono_get_delegate_invoke (((MonoObject *)async_callback)->vtable->klass);
1206                 MONO_OBJECT_SETREF (ac, cb_target, async_callback);
1207         }
1208
1209         ares = mono_async_result_new (domain, NULL, ac->state, NULL, (MonoObject*)ac);
1210         MONO_OBJECT_SETREF (ares, async_delegate, target);
1211
1212 #ifndef DISABLE_SOCKETS
1213         if (socket_io_filter (target, state)) {
1214                 socket_io_add (ares, (MonoSocketAsyncResult *) state);
1215                 return ares;
1216         }
1217 #endif
1218         threadpool_append_job (&async_tp, (MonoObject *) ares);
1219         return ares;
1220 }
1221
1222 MonoObject *
1223 mono_thread_pool_finish (MonoAsyncResult *ares, MonoArray **out_args, MonoObject **exc)
1224 {
1225         ASyncCall *ac;
1226         HANDLE wait_event;
1227
1228         *exc = NULL;
1229         *out_args = NULL;
1230
1231         /* check if already finished */
1232         mono_monitor_enter ((MonoObject *) ares);
1233         
1234         if (ares->endinvoke_called) {
1235                 *exc = (MonoObject *) mono_get_exception_invalid_operation (NULL);
1236                 mono_monitor_exit ((MonoObject *) ares);
1237                 return NULL;
1238         }
1239
1240         ares->endinvoke_called = 1;
1241         /* wait until we are really finished */
1242         if (!ares->completed) {
1243                 if (ares->handle == NULL) {
1244                         wait_event = CreateEvent (NULL, TRUE, FALSE, NULL);
1245                         g_assert(wait_event != 0);
1246                         MONO_OBJECT_SETREF (ares, handle, (MonoObject *) mono_wait_handle_new (mono_object_domain (ares), wait_event));
1247                 } else {
1248                         wait_event = mono_wait_handle_get_handle ((MonoWaitHandle *) ares->handle);
1249                 }
1250                 mono_monitor_exit ((MonoObject *) ares);
1251                 WaitForSingleObjectEx (wait_event, INFINITE, TRUE);
1252         } else {
1253                 mono_monitor_exit ((MonoObject *) ares);
1254         }
1255
1256         ac = (ASyncCall *) ares->object_data;
1257         g_assert (ac != NULL);
1258         *exc = ac->msg->exc; /* FIXME: GC add write barrier */
1259         *out_args = ac->out_args;
1260
1261         return ac->res;
1262 }
1263
1264 static void
1265 threadpool_kill_idle_threads (ThreadPool *tp)
1266 {
1267         gint n;
1268
1269         n = (gint) InterlockedCompareExchange (&tp->max_threads, 0, -1);
1270         while (n) {
1271                 n--;
1272                 MONO_SEM_POST (&tp->new_job);
1273         }
1274 }
1275
1276 void
1277 mono_thread_pool_cleanup (void)
1278 {
1279         if (async_tp.pool_status == 0 || async_tp.pool_status == 2)
1280                 return;
1281
1282         if (async_tp.pool_status == 1 && InterlockedCompareExchange (&async_tp.pool_status, 2, 1) == 2)
1283                 return;
1284
1285         InterlockedExchange (&async_io_tp.pool_status, 2);
1286         MONO_SEM_WAIT (&async_tp.lock);
1287         threadpool_free_queue (&async_tp);
1288         threadpool_kill_idle_threads (&async_tp);
1289         MONO_SEM_POST (&async_tp.lock);
1290
1291         socket_io_cleanup (&socket_io_data); /* Empty when DISABLE_SOCKETS is defined */
1292         MONO_SEM_WAIT (&async_io_tp.lock);
1293         threadpool_free_queue (&async_io_tp);
1294         threadpool_kill_idle_threads (&async_io_tp);
1295         MONO_SEM_POST (&async_io_tp.lock);
1296         MONO_SEM_DESTROY (&async_io_tp.new_job);
1297
1298         EnterCriticalSection (&wsqs_lock);
1299         mono_wsq_cleanup ();
1300         if (wsqs)
1301                 g_ptr_array_free (wsqs, TRUE);
1302         wsqs = NULL;
1303         LeaveCriticalSection (&wsqs_lock);
1304         MONO_SEM_DESTROY (&async_tp.new_job);
1305 }
1306
1307 /* Caller must enter &tp->lock */
1308 static MonoObject*
1309 dequeue_job_nolock (ThreadPool *tp)
1310 {
1311         MonoObject *ar;
1312         MonoArray *array;
1313         MonoMList *list;
1314
1315         list = tp->first;
1316         do {
1317                 if (mono_runtime_is_shutting_down ())
1318                         return NULL;
1319                 if (!list || tp->head == tp->tail)
1320                         return NULL;
1321
1322                 array = (MonoArray *) mono_mlist_get_data (list);
1323                 ar = mono_array_get (array, MonoObject *, tp->head % QUEUE_LENGTH);
1324                 mono_array_set (array, MonoObject *, tp->head % QUEUE_LENGTH, NULL);
1325                 tp->head++;
1326                 if ((tp->head % QUEUE_LENGTH) == 0) {
1327                         list = tp->first;
1328                         tp->first = mono_mlist_next (list);
1329                         if (tp->first == NULL)
1330                                 tp->last = NULL;
1331                         if (mono_mlist_length (tp->unused) < 20) {
1332                                 /* reuse this chunk */
1333                                 tp->unused = mono_mlist_set_next (list, tp->unused);
1334                         }
1335                         tp->head -= QUEUE_LENGTH;
1336                         tp->tail -= QUEUE_LENGTH;
1337                 }
1338                 list = tp->first;
1339         } while (ar == NULL);
1340         return ar;
1341 }
1342
1343 static gboolean
1344 threadpool_start_thread (ThreadPool *tp)
1345 {
1346         gint n;
1347
1348         while (!mono_runtime_is_shutting_down () && (n = tp->nthreads) < tp->max_threads) {
1349                 if (InterlockedCompareExchange (&tp->nthreads, n + 1, n) == n) {
1350                         mono_perfcounter_update_value (tp->pc_nthreads, TRUE, 1);
1351                         mono_thread_create_internal (mono_get_root_domain (), tp->async_invoke, tp, TRUE);
1352                         return TRUE;
1353                 }
1354         }
1355
1356         return FALSE;
1357 }
1358
1359 static void
1360 pulse_on_new_job (ThreadPool *tp)
1361 {
1362         if (tp->waiting)
1363                 MONO_SEM_POST (&tp->new_job);
1364 }
1365
1366 void
1367 icall_append_job (MonoObject *ar)
1368 {
1369         threadpool_append_job (&async_tp, ar);
1370 }
1371
1372 static void
1373 threadpool_append_job (ThreadPool *tp, MonoObject *ar)
1374 {
1375         threadpool_append_jobs (tp, &ar, 1);
1376 }
1377
1378 static MonoMList *
1379 create_or_reuse_list (ThreadPool *tp)
1380 {
1381         MonoMList *list;
1382         MonoArray *array;
1383
1384         list = NULL;
1385         if (tp->unused) {
1386                 list = tp->unused;
1387                 tp->unused = mono_mlist_next (list);
1388                 mono_mlist_set_next (list, NULL);
1389                 //TP_DEBUG (tp->nodes_reused++);
1390         } else {
1391                 array = mono_array_new_cached (mono_get_root_domain (), mono_defaults.object_class, QUEUE_LENGTH);
1392                 list = mono_mlist_alloc ((MonoObject *) array);
1393                 //TP_DEBUG (tp->nodes_created++);
1394         }
1395         return list;
1396 }
1397
1398 static void
1399 threadpool_append_jobs (ThreadPool *tp, MonoObject **jobs, gint njobs)
1400 {
1401         static int job_counter;
1402         MonoArray *array;
1403         MonoMList *list;
1404         MonoObject *ar;
1405         gint i;
1406         gboolean lock_taken = FALSE; /* We won't take the lock when the local queue is used */
1407
1408         if (mono_runtime_is_shutting_down ())
1409                 return;
1410
1411         if (tp->pool_status == 0 && InterlockedCompareExchange (&tp->pool_status, 1, 0) == 0)
1412                 mono_thread_create_internal (mono_get_root_domain (), threadpool_start_idle_threads, tp, TRUE);
1413
1414         for (i = 0; i < njobs; i++) {
1415                 ar = jobs [i];
1416                 if (ar == NULL || mono_domain_is_unloading (ar->vtable->domain))
1417                         continue; /* Might happen when cleaning domain jobs */
1418                 if (!tp->is_io && (InterlockedIncrement (&job_counter) % 10) == 0) {
1419                         MonoAsyncResult *o = (MonoAsyncResult *) ar;
1420                         o->add_time = mono_100ns_ticks ();
1421                 }
1422                 threadpool_jobs_inc (ar); 
1423                 mono_perfcounter_update_value (tp->pc_nitems, TRUE, 1);
1424                 if (!tp->is_io && mono_wsq_local_push (ar))
1425                         continue;
1426
1427                 if (!lock_taken) {
1428                         MONO_SEM_WAIT (&tp->lock);
1429                         lock_taken = TRUE;
1430                 }
1431                 if ((tp->tail % QUEUE_LENGTH) == 0) {
1432                         list = create_or_reuse_list (tp);
1433                         if (tp->last != NULL)
1434                                 mono_mlist_set_next (tp->last, list);
1435                         tp->last = list;
1436                         if (tp->first == NULL)
1437                                 tp->first = tp->last;
1438                 }
1439
1440                 array = (MonoArray *) mono_mlist_get_data (tp->last);
1441                 mono_array_setref (array, tp->tail % QUEUE_LENGTH, ar);
1442                 tp->tail++;
1443         }
1444         if (lock_taken)
1445                 MONO_SEM_POST (&tp->lock);
1446
1447         if (!tp->is_io && tp->waiting == 0) {
1448                 gint64 ticks = mono_100ns_ticks ();
1449
1450                 if (tp->last_check == 0 || (ticks - tp->last_check) > 5000000) {
1451                         SPIN_LOCK (tp->sp_lock);
1452                         tp->last_check = ticks;
1453                         SPIN_UNLOCK (tp->sp_lock);
1454                         threadpool_start_thread (tp);
1455                 }
1456         }
1457         for (i = 0; i < MIN(njobs, tp->max_threads); i++)
1458                 pulse_on_new_job (tp);
1459 }
1460
1461 static void
1462 threadpool_clear_queue (ThreadPool *tp, MonoDomain *domain)
1463 {
1464         MonoMList *current;
1465         MonoArray *array;
1466         MonoObject *obj;
1467         int domain_count;
1468         int i;
1469
1470         domain_count = 0;
1471         MONO_SEM_WAIT (&tp->lock);
1472         current = tp->first;
1473         while (current) {
1474                 array = (MonoArray *) mono_mlist_get_data (current);
1475                 for (i = 0; i < QUEUE_LENGTH; i++) {
1476                         obj = mono_array_get (array, MonoObject*, i);
1477                         if (obj != NULL && obj->vtable->domain == domain) {
1478                                 domain_count++;
1479                                 mono_array_setref (array, i, NULL);
1480                                 threadpool_jobs_dec (obj);
1481                         }
1482                 }
1483                 current = mono_mlist_next (current);
1484         }
1485
1486         if (!domain_count) {
1487                 MONO_SEM_POST (&tp->lock);
1488                 return;
1489         }
1490
1491         current = tp->first;
1492         tp->first = NULL;
1493         tp->last = NULL;
1494         tp->head = 0;
1495         tp->tail = 0;
1496         MONO_SEM_POST (&tp->lock);
1497         /* Re-add everything but the nullified elements */
1498         while (current) {
1499                 array = (MonoArray *) mono_mlist_get_data (current);
1500                 threadpool_append_jobs (tp, mono_array_addr (array, MonoObject *, 0), QUEUE_LENGTH);
1501                 memset (mono_array_addr (array, MonoObject *, 0), 0, sizeof (MonoObject *) * QUEUE_LENGTH);
1502                 current = mono_mlist_next (current);
1503         }
1504 }
1505
1506 /*
1507  * Clean up the threadpool of all domain jobs.
1508  * Can only be called as part of the domain unloading process as
1509  * it will wait for all jobs to be visible to the interruption code. 
1510  */
1511 gboolean
1512 mono_thread_pool_remove_domain_jobs (MonoDomain *domain, int timeout)
1513 {
1514         HANDLE sem_handle;
1515         int result = TRUE;
1516         guint32 start_time = 0;
1517
1518         g_assert (domain->state == MONO_APPDOMAIN_UNLOADING);
1519
1520         threadpool_clear_queue (&async_tp, domain);
1521         threadpool_clear_queue (&async_io_tp, domain);
1522
1523         /*
1524          * There might be some threads out that could be about to execute stuff from the given domain.
1525          * We avoid that by setting up a semaphore to be pulsed by the thread that reaches zero.
1526          */
1527         sem_handle = CreateSemaphore (NULL, 0, 1, NULL);
1528
1529         domain->cleanup_semaphore = sem_handle;
1530         /*
1531          * The memory barrier here is required to have global ordering between assigning to cleanup_semaphone
1532          * and reading threadpool_jobs.
1533          * Otherwise this thread could read a stale version of threadpool_jobs and wait forever.
1534          */
1535         mono_memory_write_barrier ();
1536
1537         if (domain->threadpool_jobs && timeout != -1)
1538                 start_time = mono_msec_ticks ();
1539         while (domain->threadpool_jobs) {
1540                 WaitForSingleObject (sem_handle, timeout);
1541                 if (timeout != -1 && (mono_msec_ticks () - start_time) > timeout) {
1542                         result = FALSE;
1543                         break;
1544                 }
1545         }
1546
1547         domain->cleanup_semaphore = NULL;
1548         CloseHandle (sem_handle);
1549         return result;
1550 }
1551
1552 static void
1553 threadpool_free_queue (ThreadPool *tp)
1554 {
1555         tp->head = tp->tail = 0;
1556         tp->first = NULL;
1557         tp->unused = NULL;
1558 }
1559
1560 gboolean
1561 mono_thread_pool_is_queue_array (MonoArray *o)
1562 {
1563         gpointer obj = o;
1564
1565         // FIXME: need some fix in sgen code.
1566         // There are roots at: async*tp.unused (MonoMList) and wsqs [n]->queue (MonoArray)
1567         return obj == async_tp.first || obj == async_io_tp.first;
1568 }
1569
1570 static void
1571 add_wsq (MonoWSQ *wsq)
1572 {
1573         int i;
1574
1575         if (wsq == NULL)
1576                 return;
1577
1578         EnterCriticalSection (&wsqs_lock);
1579         if (wsqs == NULL) {
1580                 LeaveCriticalSection (&wsqs_lock);
1581                 return;
1582         }
1583         for (i = 0; i < wsqs->len; i++) {
1584                 if (g_ptr_array_index (wsqs, i) == NULL) {
1585                         wsqs->pdata [i] = wsq;
1586                         LeaveCriticalSection (&wsqs_lock);
1587                         return;
1588                 }
1589         }
1590         g_ptr_array_add (wsqs, wsq);
1591         LeaveCriticalSection (&wsqs_lock);
1592 }
1593
1594 static void
1595 remove_wsq (MonoWSQ *wsq)
1596 {
1597         if (wsq == NULL)
1598                 return;
1599
1600         EnterCriticalSection (&wsqs_lock);
1601         if (wsqs == NULL) {
1602                 LeaveCriticalSection (&wsqs_lock);
1603                 return;
1604         }
1605         g_ptr_array_remove_fast (wsqs, wsq);
1606         LeaveCriticalSection (&wsqs_lock);
1607 }
1608
1609 static void
1610 try_steal (gpointer *data, gboolean retry)
1611 {
1612         int i;
1613         int ms;
1614
1615         if (wsqs == NULL || data == NULL || *data != NULL)
1616                 return;
1617
1618         ms = 0;
1619         do {
1620                 if (mono_runtime_is_shutting_down ())
1621                         return;
1622                 for (i = 0; wsqs != NULL && i < wsqs->len; i++) {
1623                         if (mono_runtime_is_shutting_down ()) {
1624                                 return;
1625                         }
1626                         mono_wsq_try_steal (wsqs->pdata [i], data, ms);
1627                         if (*data != NULL) {
1628                                 return;
1629                         }
1630                 }
1631                 ms += 10;
1632         } while (retry && ms < 11);
1633 }
1634
1635 static gboolean
1636 dequeue_or_steal (ThreadPool *tp, gpointer *data)
1637 {
1638         if (mono_runtime_is_shutting_down ())
1639                 return FALSE;
1640         TP_DEBUG ("Dequeue");
1641         MONO_SEM_WAIT (&tp->lock);
1642         *data = dequeue_job_nolock (tp);
1643         MONO_SEM_POST (&tp->lock);
1644         if (!tp->is_io && !*data)
1645                 try_steal (data, FALSE);
1646         return (*data != NULL);
1647 }
1648
1649 static void
1650 process_idle_times (ThreadPool *tp, gint64 t)
1651 {
1652         gint64 ticks;
1653         gint64 avg;
1654         gboolean compute_avg;
1655         gint new_threads;
1656         gint64 per1;
1657
1658         if (tp->ignore_times || t <= 0)
1659                 return;
1660
1661         compute_avg = FALSE;
1662         ticks = mono_100ns_ticks ();
1663         t = ticks - t;
1664         SPIN_LOCK (tp->sp_lock);
1665         if (tp->ignore_times) {
1666                 SPIN_UNLOCK (tp->sp_lock);
1667                 return;
1668         }
1669         tp->time_sum += t;
1670         tp->n_sum++;
1671         if (tp->last_check == 0)
1672                 tp->last_check = ticks;
1673         else if (tp->last_check > 0 && (ticks - tp->last_check) > 5000000) {
1674                 tp->ignore_times = 1;
1675                 compute_avg = TRUE;
1676         }
1677         SPIN_UNLOCK (tp->sp_lock);
1678
1679         if (!compute_avg)
1680                 return;
1681
1682         //printf ("Items: %d Time elapsed: %.3fs\n", tp->n_sum, (ticks - tp->last_check) / 10000.0);
1683         tp->last_check = ticks;
1684         new_threads = 0;
1685         avg = tp->time_sum / tp->n_sum;
1686         if (tp->averages [1] == 0) {
1687                 tp->averages [1] = avg;
1688         } else {
1689                 per1 = ((100 * (ABS (avg - tp->averages [1]))) / tp->averages [1]);
1690                 if (per1 > 5) {
1691                         if (avg > tp->averages [1]) {
1692                                 if (tp->averages [1] < tp->averages [0]) {
1693                                         new_threads = -1;
1694                                 } else {
1695                                         new_threads = 1;
1696                                 }
1697                         } else if (avg < tp->averages [1] && tp->averages [1] < tp->averages [0]) {
1698                                 new_threads = 1;
1699                         }
1700                 } else {
1701                         int min, n;
1702                         min = tp->min_threads;
1703                         n = tp->nthreads;
1704                         if ((n - min) < min && tp->busy_threads == n)
1705                                 new_threads = 1;
1706                 }
1707                 /*
1708                 if (new_threads != 0) {
1709                         printf ("n: %d per1: %lld avg=%lld avg1=%lld avg0=%lld\n", new_threads, per1, avg, tp->averages [1], tp->averages [0]);
1710                 }
1711                 */
1712         }
1713
1714         tp->time_sum = 0;
1715         tp->n_sum = 0;
1716
1717         tp->averages [0] = tp->averages [1];
1718         tp->averages [1] = avg;
1719         tp->ignore_times = 0;
1720
1721         if (tp->waiting == 0 && new_threads == 1) {
1722                 threadpool_start_thread (tp);
1723         } else if (new_threads == -1) {
1724                 if (tp->destroy_thread == 0 && InterlockedCompareExchange (&tp->destroy_thread, 1, 0) == 0)
1725                         pulse_on_new_job (tp);
1726         }
1727 }
1728
1729 static gboolean
1730 should_i_die (ThreadPool *tp)
1731 {
1732         gboolean result = FALSE;
1733         if (tp->destroy_thread == 1 && InterlockedCompareExchange (&tp->destroy_thread, 0, 1) == 1)
1734                 result = (tp->nthreads > tp->min_threads);
1735         return result;
1736 }
1737
1738 static void
1739 async_invoke_thread (gpointer data)
1740 {
1741         MonoDomain *domain;
1742         MonoInternalThread *thread;
1743         MonoWSQ *wsq;
1744         ThreadPool *tp;
1745         gboolean must_die;
1746   
1747         tp = data;
1748         wsq = NULL;
1749         if (!tp->is_io) {
1750                 wsq = mono_wsq_create ();
1751                 add_wsq (wsq);
1752         }
1753
1754         thread = mono_thread_internal_current ();
1755         if (tp_start_func)
1756                 tp_start_func (tp_hooks_user_data);
1757         data = NULL;
1758         for (;;) {
1759                 MonoAsyncResult *ar;
1760                 gboolean is_io_task;
1761                 int n_naps = 0;
1762
1763                 is_io_task = FALSE;
1764                 ar = (MonoAsyncResult *) data;
1765                 if (ar) {
1766                         InterlockedIncrement (&tp->busy_threads);
1767 #ifndef DISABLE_SOCKETS
1768                         is_io_task = (strcmp (((MonoObject *) data)->vtable->klass->name, "AsyncResult"));
1769                         if (is_io_task) {
1770                                 MonoSocketAsyncResult *state = (MonoSocketAsyncResult *) data;
1771                                 ar = state->ares;
1772                                 switch (state->operation) {
1773                                 case AIO_OP_RECEIVE:
1774                                         state->total = ICALL_RECV (state);
1775                                         break;
1776                                 case AIO_OP_SEND:
1777                                         state->total = ICALL_SEND (state);
1778                                         break;
1779                                 }
1780                         }
1781 #endif
1782                         /* worker threads invokes methods in different domains,
1783                          * so we need to set the right domain here */
1784                         domain = ((MonoObject *)ar)->vtable->domain;
1785                         g_assert (domain);
1786
1787                         if (mono_domain_is_unloading (domain) || mono_runtime_is_shutting_down ()) {
1788                                 threadpool_jobs_dec ((MonoObject *)ar);
1789                                 data = NULL;
1790                                 ar = NULL;
1791                                 InterlockedDecrement (&tp->busy_threads);
1792                         } else {
1793                                 mono_thread_push_appdomain_ref (domain);
1794                                 if (threadpool_jobs_dec ((MonoObject *)ar)) {
1795                                         data = NULL;
1796                                         ar = NULL;
1797                                         mono_thread_pop_appdomain_ref ();
1798                                         InterlockedDecrement (&tp->busy_threads);
1799                                         continue;
1800                                 }
1801
1802                                 if (mono_domain_set (domain, FALSE)) {
1803                                         /* ASyncCall *ac; */
1804
1805                                         if (tp_item_begin_func)
1806                                                 tp_item_begin_func (tp_item_user_data);
1807
1808                                         if (!is_io_task && ar->add_time > 0)
1809                                                 process_idle_times (tp, ar->add_time);
1810                                         /*FIXME: Do something with the exception returned? */
1811                                         mono_async_invoke (tp, ar);
1812                                         if (tp_item_end_func)
1813                                                 tp_item_end_func (tp_item_user_data);
1814                                         /*
1815                                         ac = (ASyncCall *) ar->object_data;
1816                                         if (ac->msg->exc != NULL)
1817                                                 mono_unhandled_exception (ac->msg->exc);
1818                                         */
1819                                         mono_domain_set (mono_get_root_domain (), TRUE);
1820                                 }
1821                                 mono_thread_pop_appdomain_ref ();
1822                                 InterlockedDecrement (&tp->busy_threads);
1823                                 /* If the callee changes the background status, set it back to TRUE */
1824                                 if (!mono_thread_test_state (thread , ThreadState_Background))
1825                                         ves_icall_System_Threading_Thread_SetState (thread, ThreadState_Background);
1826                         }
1827                 }
1828
1829                 ar = NULL;
1830                 data = NULL;
1831                 must_die = should_i_die (tp);
1832                 TP_DEBUG ("Trying to get a job");
1833                 if (!must_die && (tp->is_io || !mono_wsq_local_pop (&data)))
1834                         dequeue_or_steal (tp, &data);
1835                 TP_DEBUG ("Done trying to get a job %p", data);
1836
1837                 n_naps = 0;
1838                 while (!must_die && !data && n_naps < 4) {
1839                         gboolean res;
1840
1841                         TP_DEBUG ("Waiting");
1842                         InterlockedIncrement (&tp->waiting);
1843 #if defined(__OpenBSD__)
1844                         while ((res = mono_sem_wait (&tp->new_job, TRUE)) == -1) {// && errno == EINTR) {
1845 #else
1846                         while ((res = mono_sem_timedwait (&tp->new_job, 2000, TRUE)) == -1) {// && errno == EINTR) {
1847 #endif
1848                                 if (mono_runtime_is_shutting_down ())
1849                                         break;
1850                                 if (THREAD_WANTS_A_BREAK (thread))
1851                                         mono_thread_interruption_checkpoint ();
1852                         }
1853                         TP_DEBUG ("Done waiting");
1854                         InterlockedDecrement (&tp->waiting);
1855                         if (mono_runtime_is_shutting_down ())
1856                                 break;
1857                         must_die = should_i_die (tp);
1858                         dequeue_or_steal (tp, &data);
1859                         n_naps++;
1860                 }
1861
1862                 if (!data && tp->is_io && !mono_runtime_is_shutting_down ()) {
1863                         mono_wsq_local_pop (&data);
1864                         if (data && must_die) {
1865                                 InterlockedCompareExchange (&tp->destroy_thread, 1, 0);
1866                                 pulse_on_new_job (tp);
1867                         }
1868                 }
1869
1870                 if (!data) {
1871                         gint nt;
1872                         gboolean down;
1873                         while (1) {
1874                                 nt = tp->nthreads;
1875                                 down = mono_runtime_is_shutting_down ();
1876                                 if (!down && nt <= tp->min_threads)
1877                                         break;
1878                                 if (down || InterlockedCompareExchange (&tp->nthreads, nt - 1, nt) == nt) {
1879                                         mono_perfcounter_update_value (tp->pc_nthreads, TRUE, -1);
1880                                         TP_DEBUG ("DIE");
1881                                         if (!tp->is_io) {
1882                                                 remove_wsq (wsq);
1883                                                 while (mono_wsq_local_pop (&data)) {
1884                                                         threadpool_jobs_dec (data);
1885                                                         data = NULL;
1886                                                 }
1887                                                 mono_wsq_destroy (wsq);
1888                                         }
1889                                         if (tp_finish_func)
1890                                                 tp_finish_func (tp_hooks_user_data);
1891                                         return;
1892                                 }
1893                         }
1894                 }
1895         }
1896
1897         g_assert_not_reached ();
1898 }
1899
1900 void
1901 ves_icall_System_Threading_ThreadPool_GetAvailableThreads (gint *workerThreads, gint *completionPortThreads)
1902 {
1903         *workerThreads = async_tp.max_threads - async_tp.busy_threads;
1904         *completionPortThreads = async_io_tp.max_threads - async_io_tp.busy_threads;
1905 }
1906
1907 void
1908 ves_icall_System_Threading_ThreadPool_GetMaxThreads (gint *workerThreads, gint *completionPortThreads)
1909 {
1910         *workerThreads = async_tp.max_threads;
1911         *completionPortThreads = async_io_tp.max_threads;
1912 }
1913
1914 void
1915 ves_icall_System_Threading_ThreadPool_GetMinThreads (gint *workerThreads, gint *completionPortThreads)
1916 {
1917         *workerThreads = async_tp.min_threads;
1918         *completionPortThreads = async_io_tp.min_threads;
1919 }
1920
1921 MonoBoolean
1922 ves_icall_System_Threading_ThreadPool_SetMinThreads (gint workerThreads, gint completionPortThreads)
1923 {
1924         gint max_threads;
1925         gint max_io_threads;
1926
1927         max_threads = async_tp.max_threads;
1928         if (workerThreads <= 0 || workerThreads > max_threads)
1929                 return FALSE;
1930
1931         max_io_threads = async_io_tp.max_threads;
1932         if (completionPortThreads <= 0 || completionPortThreads > max_io_threads)
1933                 return FALSE;
1934
1935         InterlockedExchange (&async_tp.min_threads, workerThreads);
1936         InterlockedExchange (&async_io_tp.min_threads, completionPortThreads);
1937         if (workerThreads > async_tp.nthreads)
1938                 mono_thread_create_internal (mono_get_root_domain (), threadpool_start_idle_threads, &async_tp, TRUE);
1939         if (completionPortThreads > async_io_tp.nthreads)
1940                 mono_thread_create_internal (mono_get_root_domain (), threadpool_start_idle_threads, &async_io_tp, TRUE);
1941         return TRUE;
1942 }
1943
1944 MonoBoolean
1945 ves_icall_System_Threading_ThreadPool_SetMaxThreads (gint workerThreads, gint completionPortThreads)
1946 {
1947         gint min_threads;
1948         gint min_io_threads;
1949         gint cpu_count;
1950
1951         cpu_count = mono_cpu_count ();
1952         min_threads = async_tp.min_threads;
1953         if (workerThreads < min_threads || workerThreads < cpu_count)
1954                 return FALSE;
1955
1956         /* We don't really have the concept of completion ports. Do we care here? */
1957         min_io_threads = async_io_tp.min_threads;
1958         if (completionPortThreads < min_io_threads || completionPortThreads < cpu_count)
1959                 return FALSE;
1960
1961         InterlockedExchange (&async_tp.max_threads, workerThreads);
1962         InterlockedExchange (&async_io_tp.max_threads, completionPortThreads);
1963         return TRUE;
1964 }
1965
1966 /**
1967  * mono_install_threadpool_thread_hooks
1968  * @start_func: the function to be called right after a new threadpool thread is created. Can be NULL.
1969  * @finish_func: the function to be called right before a thredpool thread is exiting. Can be NULL.
1970  * @user_data: argument passed to @start_func and @finish_func.
1971  *
1972  * @start_fun will be called right after a threadpool thread is created and @finish_func right before a threadpool thread exits.
1973  * The calls will be made from the thread itself.
1974  */
1975 void
1976 mono_install_threadpool_thread_hooks (MonoThreadPoolFunc start_func, MonoThreadPoolFunc finish_func, gpointer user_data)
1977 {
1978         tp_start_func = start_func;
1979         tp_finish_func = finish_func;
1980         tp_hooks_user_data = user_data;
1981 }
1982
1983 /**
1984  * mono_install_threadpool_item_hooks
1985  * @begin_func: the function to be called before a threadpool work item processing starts.
1986  * @end_func: the function to be called after a threadpool work item is finished.
1987  * @user_data: argument passed to @begin_func and @end_func.
1988  *
1989  * The calls will be made from the thread itself and from the same AppDomain
1990  * where the work item was executed.
1991  *
1992  */
1993 void
1994 mono_install_threadpool_item_hooks (MonoThreadPoolItemFunc begin_func, MonoThreadPoolItemFunc end_func, gpointer user_data)
1995 {
1996         tp_item_begin_func = begin_func;
1997         tp_item_end_func = end_func;
1998         tp_item_user_data = user_data;
1999 }
2000