abf51355b138426d6b78d28dbc5adacace601c4d
[mono.git] / mono / metadata / threadpool.c
1 /*
2  * threadpool.c: global thread pool
3  *
4  * Authors:
5  *   Dietmar Maurer (dietmar@ximian.com)
6  *   Gonzalo Paniagua Javier (gonzalo@ximian.com)
7  *
8  * (C) 2001-2003 Ximian, Inc.
9  * (c) 2004,2005 Novell, Inc. (http://www.novell.com)
10  */
11
12 #include <config.h>
13 #include <glib.h>
14
15 #ifdef PLATFORM_WIN32
16 #define WINVER 0x0500
17 #define _WIN32_WINNT 0x0500
18 #endif
19
20 #define THREADS_PER_CPU 5 /* 20 + THREADS_PER_CPU * number of CPUs */
21
22 #include <mono/metadata/domain-internals.h>
23 #include <mono/metadata/tabledefs.h>
24 #include <mono/metadata/threads.h>
25 #include <mono/metadata/threads-types.h>
26 #include <mono/metadata/threadpool-internals.h>
27 #include <mono/metadata/exception.h>
28 #include <mono/metadata/file-io.h>
29 #include <mono/metadata/monitor.h>
30 #include <mono/metadata/marshal.h>
31 #include <mono/metadata/socket-io.h>
32 #include <mono/io-layer/io-layer.h>
33 #include <mono/os/gc_wrapper.h>
34 #include <errno.h>
35 #include <sys/time.h>
36 #include <sys/types.h>
37 #include <fcntl.h>
38 #include <unistd.h>
39 #include <string.h>
40
41 #include <mono/utils/mono-poll.h>
42 #ifdef HAVE_EPOLL
43 #include <sys/epoll.h>
44 #endif
45
46 #include "mono/io-layer/socket-wrappers.h"
47
48 #include "threadpool.h"
49
50 #define THREAD_WANTS_A_BREAK(t) ((t->state & (ThreadState_StopRequested | \
51                                                 ThreadState_SuspendRequested)) != 0)
52
53 #undef EPOLL_DEBUG
54
55 /* maximum number of worker threads */
56 static int mono_max_worker_threads;
57 static int mono_min_worker_threads;
58 static int mono_io_max_worker_threads;
59
60 /* current number of worker threads */
61 static int mono_worker_threads = 0;
62 static int io_worker_threads = 0;
63
64 /* current number of busy threads */
65 static int busy_worker_threads = 0;
66 static int busy_io_worker_threads;
67
68 /* mono_thread_pool_init called */
69 static int tp_inited;
70
71 /* we use this to store a reference to the AsyncResult to avoid GC */
72 static MonoGHashTable *ares_htable = NULL;
73
74 static CRITICAL_SECTION ares_lock;
75 static CRITICAL_SECTION io_queue_lock;
76 static int pending_io_items;
77
78 typedef struct {
79         CRITICAL_SECTION io_lock; /* access to sock_to_state */
80         int inited;
81         int pipe [2];
82         GHashTable *sock_to_state;
83
84         HANDLE new_sem; /* access to newpfd and write side of the pipe */
85         mono_pollfd *newpfd;
86         gboolean epoll_disabled;
87 #ifdef HAVE_EPOLL
88         int epollfd;
89 #endif
90 } SocketIOData;
91
92 static SocketIOData socket_io_data;
93
94 /* we append a job */
95 static HANDLE job_added;
96 static HANDLE io_job_added;
97
98 /* Keep in sync with the System.MonoAsyncCall class which provides GC tracking */
99 typedef struct {
100         MonoObject         object;
101         MonoMethodMessage *msg;
102         MonoMethod        *cb_method;
103         MonoDelegate      *cb_target;
104         MonoObject        *state;
105         MonoObject        *res;
106         MonoArray         *out_args;
107         /* This is a HANDLE, we use guint64 so the managed object layout remains constant */
108         guint64           wait_event;
109 } ASyncCall;
110
111 static void async_invoke_thread (gpointer data);
112 static void append_job (CRITICAL_SECTION *cs, GList **plist, gpointer ar);
113 static void start_thread_or_queue (MonoAsyncResult *ares);
114 static void mono_async_invoke (MonoAsyncResult *ares);
115 static gpointer dequeue_job (CRITICAL_SECTION *cs, GList **plist);
116
117 static GList *async_call_queue = NULL;
118 static GList *async_io_queue = NULL;
119
120 static MonoClass *async_call_klass;
121 static MonoClass *socket_async_call_klass;
122
123 #define INIT_POLLFD(a, b, c) {(a)->fd = b; (a)->events = c; (a)->revents = 0;}
124 enum {
125         AIO_OP_FIRST,
126         AIO_OP_ACCEPT = 0,
127         AIO_OP_CONNECT,
128         AIO_OP_RECEIVE,
129         AIO_OP_RECEIVEFROM,
130         AIO_OP_SEND,
131         AIO_OP_SENDTO,
132         AIO_OP_RECV_JUST_CALLBACK,
133         AIO_OP_SEND_JUST_CALLBACK,
134         AIO_OP_LAST
135 };
136
137 static void
138 socket_io_cleanup (SocketIOData *data)
139 {
140         gint release;
141
142         if (data->inited == 0)
143                 return;
144
145         EnterCriticalSection (&data->io_lock);
146         data->inited = 0;
147 #ifdef PLATFORM_WIN32
148         closesocket (data->pipe [0]);
149         closesocket (data->pipe [1]);
150 #else
151         close (data->pipe [0]);
152         close (data->pipe [1]);
153 #endif
154         data->pipe [0] = -1;
155         data->pipe [1] = -1;
156         if (data->new_sem)
157                 CloseHandle (data->new_sem);
158         data->new_sem = NULL;
159         g_hash_table_destroy (data->sock_to_state);
160         data->sock_to_state = NULL;
161         g_list_free (async_io_queue);
162         async_io_queue = NULL;
163         release = (gint) InterlockedCompareExchange (&io_worker_threads, 0, -1);
164         if (io_job_added)
165                 ReleaseSemaphore (io_job_added, release, NULL);
166         g_free (data->newpfd);
167         data->newpfd = NULL;
168 #ifdef HAVE_EPOLL
169         if (FALSE == data->epoll_disabled)
170                 close (data->epollfd);
171 #endif
172         LeaveCriticalSection (&data->io_lock);
173 }
174
175 static int
176 get_event_from_state (MonoSocketAsyncResult *state)
177 {
178         switch (state->operation) {
179         case AIO_OP_ACCEPT:
180         case AIO_OP_RECEIVE:
181         case AIO_OP_RECV_JUST_CALLBACK:
182         case AIO_OP_RECEIVEFROM:
183                 return MONO_POLLIN;
184         case AIO_OP_SEND:
185         case AIO_OP_SEND_JUST_CALLBACK:
186         case AIO_OP_SENDTO:
187         case AIO_OP_CONNECT:
188                 return MONO_POLLOUT;
189         default: /* Should never happen */
190                 g_print ("get_event_from_state: unknown value in switch!!!\n");
191                 return 0;
192         }
193 }
194
195 static int
196 get_events_from_list (GSList *list)
197 {
198         MonoSocketAsyncResult *state;
199         int events = 0;
200
201         while (list && list->data) {
202                 state = (MonoSocketAsyncResult *) list->data;
203                 events |= get_event_from_state (state);
204                 list = list->next;
205         }
206
207         return events;
208 }
209
210 #define ICALL_RECV(x)   ves_icall_System_Net_Sockets_Socket_Receive_internal (\
211                                 (SOCKET) x->handle, x->buffer, x->offset, x->size,\
212                                  x->socket_flags, &x->error);
213
214 #define ICALL_SEND(x)   ves_icall_System_Net_Sockets_Socket_Send_internal (\
215                                 (SOCKET) x->handle, x->buffer, x->offset, x->size,\
216                                  x->socket_flags, &x->error);
217
218 static void
219 async_invoke_io_thread (gpointer data)
220 {
221         MonoDomain *domain;
222         MonoThread *thread;
223         thread = mono_thread_current ();
224         thread->threadpool_thread = TRUE;
225         ves_icall_System_Threading_Thread_SetState (thread, ThreadState_Background);
226
227         for (;;) {
228                 MonoSocketAsyncResult *state;
229                 MonoAsyncResult *ar;
230
231                 state = (MonoSocketAsyncResult *) data;
232                 if (state) {
233                         InterlockedDecrement (&pending_io_items);
234                         ar = state->ares;
235                         /* worker threads invokes methods in different domains,
236                          * so we need to set the right domain here */
237                         switch (state->operation) {
238                         case AIO_OP_RECEIVE:
239                                 state->total = ICALL_RECV (state);
240                                 break;
241                         case AIO_OP_SEND:
242                                 state->total = ICALL_SEND (state);
243                                 break;
244                         }
245
246                         domain = ((MonoObject *)ar)->vtable->domain;
247                         mono_thread_push_appdomain_ref (domain);
248                         if (mono_domain_set (domain, FALSE)) {
249                                 ASyncCall *ac;
250
251                                 mono_async_invoke (ar);
252                                 ac = (ASyncCall *) ar->object_data;
253                                 /*
254                                 if (ac->msg->exc != NULL)
255                                         mono_unhandled_exception (ac->msg->exc);
256                                 */
257                                 mono_domain_set (mono_get_root_domain (), TRUE);
258                         }
259                         mono_thread_pop_appdomain_ref ();
260                         InterlockedDecrement (&busy_io_worker_threads);
261                 }
262
263                 data = dequeue_job (&io_queue_lock, &async_io_queue);
264         
265                 if (!data) {
266                         guint32 wr;
267                         int timeout = 10000;
268                         guint32 start_time = GetTickCount ();
269                         
270                         do {
271                                 wr = WaitForSingleObjectEx (io_job_added, (guint32)timeout, TRUE);
272                                 if (THREAD_WANTS_A_BREAK (thread))
273                                         mono_thread_interruption_checkpoint ();
274                         
275                                 timeout -= GetTickCount () - start_time;
276                         
277                                 if (wr != WAIT_TIMEOUT)
278                                         data = dequeue_job (&io_queue_lock, &async_io_queue);
279                         }
280                         while (!data && timeout > 0);
281                 }
282
283                 if (!data) {
284                         if (InterlockedDecrement (&io_worker_threads) < 2) {
285                                 /* If we have pending items, keep the thread alive */
286                                 if (InterlockedCompareExchange (&pending_io_items, 0, 0) != 0) {
287                                         InterlockedIncrement (&io_worker_threads);
288                                         continue;
289                                 }
290                         }
291                         return;
292                 }
293                 
294                 InterlockedIncrement (&busy_io_worker_threads);
295         }
296
297         g_assert_not_reached ();
298 }
299
300 static void
301 start_io_thread_or_queue (MonoSocketAsyncResult *ares)
302 {
303         int busy, worker;
304         MonoDomain *domain;
305
306         busy = (int) InterlockedCompareExchange (&busy_io_worker_threads, 0, -1);
307         worker = (int) InterlockedCompareExchange (&io_worker_threads, 0, -1); 
308         if (worker <= ++busy &&
309             worker < mono_io_max_worker_threads) {
310                 InterlockedIncrement (&busy_io_worker_threads);
311                 InterlockedIncrement (&io_worker_threads);
312                 domain = ((ares) ? ((MonoObject *) ares)->vtable->domain : mono_domain_get ());
313                 mono_thread_create (mono_get_root_domain (), async_invoke_io_thread, ares);
314         } else {
315                 append_job (&io_queue_lock, &async_io_queue, ares);
316                 ReleaseSemaphore (io_job_added, 1, NULL);
317         }
318 }
319
320 static GSList *
321 process_io_event (GSList *list, int event)
322 {
323         MonoSocketAsyncResult *state;
324         GSList *oldlist;
325
326         oldlist = list;
327         state = NULL;
328         while (list) {
329                 state = (MonoSocketAsyncResult *) list->data;
330                 if (get_event_from_state (state) == event)
331                         break;
332                 
333                 list = list->next;
334         }
335
336         if (list != NULL) {
337                 oldlist = g_slist_remove_link (oldlist, list);
338                 g_slist_free_1 (list);
339 #ifdef EPOLL_DEBUG
340                 g_print ("Dispatching event %d on socket %d\n", event, state->handle);
341 #endif
342                 InterlockedIncrement (&pending_io_items);
343                 start_io_thread_or_queue (state);
344         }
345
346         return oldlist;
347 }
348
349 static int
350 mark_bad_fds (mono_pollfd *pfds, int nfds)
351 {
352         int i, ret;
353         mono_pollfd *pfd;
354         int count = 0;
355
356         for (i = 0; i < nfds; i++) {
357                 pfd = &pfds [i];
358                 if (pfd->fd == -1)
359                         continue;
360
361                 ret = mono_poll (pfd, 1, 0);
362                 if (ret == -1 && errno == EBADF) {
363                         pfd->revents |= MONO_POLLNVAL;
364                         count++;
365                 } else if (ret == 1) {
366                         count++;
367                 }
368         }
369
370         return count;
371 }
372
373 static void
374 socket_io_poll_main (gpointer p)
375 {
376 #define INITIAL_POLLFD_SIZE     1024
377 #define POLL_ERRORS (MONO_POLLERR | MONO_POLLHUP | MONO_POLLNVAL)
378         SocketIOData *data = p;
379         mono_pollfd *pfds;
380         gint maxfd = 1;
381         gint allocated;
382         gint i;
383         MonoThread *thread;
384
385         thread = mono_thread_current ();
386         thread->threadpool_thread = TRUE;
387         ves_icall_System_Threading_Thread_SetState (thread, ThreadState_Background);
388
389         allocated = INITIAL_POLLFD_SIZE;
390         pfds = g_new0 (mono_pollfd, allocated);
391         INIT_POLLFD (pfds, data->pipe [0], MONO_POLLIN);
392         for (i = 1; i < allocated; i++)
393                 INIT_POLLFD (&pfds [i], -1, 0);
394
395         while (1) {
396                 int nsock = 0;
397                 mono_pollfd *pfd;
398                 char one [1];
399                 GSList *list;
400
401                 do {
402                         if (nsock == -1) {
403                                 if (THREAD_WANTS_A_BREAK (thread))
404                                         mono_thread_interruption_checkpoint ();
405                         }
406
407                         nsock = mono_poll (pfds, maxfd, -1);
408                 } while (nsock == -1 && errno == EINTR);
409
410                 /* 
411                  * Apart from EINTR, we only check EBADF, for the rest:
412                  *  EINVAL: mono_poll() 'protects' us from descriptor
413                  *      numbers above the limit if using select() by marking
414                  *      then as MONO_POLLERR.  If a system poll() is being
415                  *      used, the number of descriptor we're passing will not
416                  *      be over sysconf(_SC_OPEN_MAX), as the error would have
417                  *      happened when opening.
418                  *
419                  *  EFAULT: we own the memory pointed by pfds.
420                  *  ENOMEM: we're doomed anyway
421                  *
422                  */
423
424                 if (nsock == -1 && errno == EBADF) {
425                         pfds->revents = 0; /* Just in case... */
426                         nsock = mark_bad_fds (pfds, maxfd);
427                 }
428
429                 if ((pfds->revents & POLL_ERRORS) != 0) {
430                         /* We're supposed to die now, as the pipe has been closed */
431                         g_free (pfds);
432                         socket_io_cleanup (data);
433                         return;
434                 }
435
436                 /* Got a new socket */
437                 if ((pfds->revents & MONO_POLLIN) != 0) {
438                         int nread;
439
440                         for (i = 1; i < allocated; i++) {
441                                 pfd = &pfds [i];
442                                 if (pfd->fd == -1 || pfd->fd == data->newpfd->fd)
443                                         break;
444                         }
445
446                         if (i == allocated) {
447                                 mono_pollfd *oldfd;
448
449                                 oldfd = pfds;
450                                 i = allocated;
451                                 allocated = allocated * 2;
452                                 pfds = g_renew (mono_pollfd, oldfd, allocated);
453                                 g_free (oldfd);
454                                 for (; i < allocated; i++)
455                                         INIT_POLLFD (&pfds [i], -1, 0);
456                         }
457 #ifndef PLATFORM_WIN32
458                         nread = read (data->pipe [0], one, 1);
459 #else
460                         nread = recv ((SOCKET) data->pipe [0], one, 1, 0);
461 #endif
462                         if (nread <= 0) {
463                                 g_free (pfds);
464                                 return; /* we're closed */
465                         }
466
467                         INIT_POLLFD (&pfds [i], data->newpfd->fd, data->newpfd->events);
468                         ReleaseSemaphore (data->new_sem, 1, NULL);
469                         if (i >= maxfd)
470                                 maxfd = i + 1;
471                         nsock--;
472                 }
473
474                 if (nsock == 0)
475                         continue;
476
477                 EnterCriticalSection (&data->io_lock);
478                 if (data->inited == 0) {
479                         g_free (pfds);
480                         return; /* cleanup called */
481                 }
482
483                 for (i = 1; i < maxfd && nsock > 0; i++) {
484                         pfd = &pfds [i];
485                         if (pfd->fd == -1 || pfd->revents == 0)
486                                 continue;
487
488                         nsock--;
489                         list = g_hash_table_lookup (data->sock_to_state, GINT_TO_POINTER (pfd->fd));
490                         if (list != NULL && (pfd->revents & (MONO_POLLIN | POLL_ERRORS)) != 0) {
491                                 list = process_io_event (list, MONO_POLLIN);
492                         }
493
494                         if (list != NULL && (pfd->revents & (MONO_POLLOUT | POLL_ERRORS)) != 0) {
495                                 list = process_io_event (list, MONO_POLLOUT);
496                         }
497
498                         if (list != NULL) {
499                                 g_hash_table_replace (data->sock_to_state, GINT_TO_POINTER (pfd->fd), list);
500                                 pfd->events = get_events_from_list (list);
501                         } else {
502                                 g_hash_table_remove (data->sock_to_state, GINT_TO_POINTER (pfd->fd));
503                                 pfd->fd = -1;
504                                 if (i == maxfd - 1)
505                                         maxfd--;
506                         }
507                 }
508                 LeaveCriticalSection (&data->io_lock);
509         }
510 }
511
512 #ifdef HAVE_EPOLL
513 #define EPOLL_ERRORS (EPOLLERR | EPOLLHUP)
514 static void
515 socket_io_epoll_main (gpointer p)
516 {
517         SocketIOData *data;
518         int epollfd;
519         MonoThread *thread;
520         struct epoll_event *events, *evt;
521         const int nevents = 512;
522         int ready = 0, i;
523
524         data = p;
525         epollfd = data->epollfd;
526         thread = mono_thread_current ();
527         thread->threadpool_thread = TRUE;
528         ves_icall_System_Threading_Thread_SetState (thread, ThreadState_Background);
529         events = g_new0 (struct epoll_event, nevents);
530
531         while (1) {
532                 do {
533                         if (ready == -1) {
534                                 if (THREAD_WANTS_A_BREAK (thread))
535                                         mono_thread_interruption_checkpoint ();
536                         }
537 #ifdef EPOLL_DEBUG
538                         g_print ("epoll_wait init\n");
539 #endif
540                         ready = epoll_wait (epollfd, events, nevents, -1);
541 #ifdef EPOLL_DEBUG
542                         {
543                         int err = errno;
544                         g_print ("epoll_wait end with %d ready sockets (%d %s).\n", ready, err, (err) ? g_strerror (err) : "");
545                         errno = err;
546                         }
547 #endif
548                 } while (ready == -1 && errno == EINTR);
549
550                 if (ready == -1) {
551                         int err = errno;
552                         g_free (events);
553                         if (err != EBADF)
554                                 g_warning ("epoll_wait: %d %s\n", err, g_strerror (err));
555
556                         close (epollfd);
557                         return;
558                 }
559
560                 EnterCriticalSection (&data->io_lock);
561                 if (data->inited == 0) {
562 #ifdef EPOLL_DEBUG
563                         g_print ("data->inited == 0\n");
564 #endif
565                         g_free (events);
566                         close (epollfd);
567                         return; /* cleanup called */
568                 }
569
570                 for (i = 0; i < ready; i++) {
571                         int fd;
572                         GSList *list;
573
574                         evt = &events [i];
575                         fd = evt->data.fd;
576                         list = g_hash_table_lookup (data->sock_to_state, GINT_TO_POINTER (fd));
577 #ifdef EPOLL_DEBUG
578                         g_print ("Event %d on %d list length: %d\n", evt->events, fd, g_slist_length (list));
579 #endif
580                         if (list != NULL && (evt->events & (EPOLLIN | EPOLL_ERRORS)) != 0) {
581                                 list = process_io_event (list, MONO_POLLIN);
582                         }
583
584                         if (list != NULL && (evt->events & (EPOLLOUT | EPOLL_ERRORS)) != 0) {
585                                 list = process_io_event (list, MONO_POLLOUT);
586                         }
587
588                         if (list != NULL) {
589                                 g_hash_table_replace (data->sock_to_state, GINT_TO_POINTER (fd), list);
590                                 evt->events = get_events_from_list (list);
591 #ifdef EPOLL_DEBUG
592                                 g_print ("MOD %d to %d\n", fd, evt->events);
593 #endif
594                                 if (epoll_ctl (epollfd, EPOLL_CTL_MOD, fd, evt)) {
595                                         if (epoll_ctl (epollfd, EPOLL_CTL_ADD, fd, evt) == -1) {
596 #ifdef EPOLL_DEBUG
597                                                 int err = errno;
598                                                 g_message ("epoll_ctl(MOD): %d %s fd: %d events: %d", err, g_strerror (err), fd, evt->events);
599                                                 errno = err;
600 #endif
601                                         }
602                                 }
603                         } else {
604                                 g_hash_table_remove (data->sock_to_state, GINT_TO_POINTER (fd));
605 #ifdef EPOLL_DEBUG
606                                 g_print ("DEL %d\n", fd);
607 #endif
608                                 epoll_ctl (epollfd, EPOLL_CTL_DEL, fd, evt);
609                         }
610                 }
611                 LeaveCriticalSection (&data->io_lock);
612         }
613 }
614 #endif
615
616 /*
617  * select/poll wake up when a socket is closed, but epoll just removes
618  * the socket from its internal list without notification.
619  */
620 void
621 mono_thread_pool_remove_socket (int sock)
622 {
623 #ifdef HAVE_EPOLL
624         GSList *list, *next;
625         MonoSocketAsyncResult *state;
626
627         if (socket_io_data.epoll_disabled == TRUE || socket_io_data.inited == FALSE)
628                 return;
629
630         EnterCriticalSection (&socket_io_data.io_lock);
631         list = g_hash_table_lookup (socket_io_data.sock_to_state, GINT_TO_POINTER (sock));
632         if (list) {
633                 g_hash_table_remove (socket_io_data.sock_to_state, GINT_TO_POINTER (sock));
634         }
635         LeaveCriticalSection (&socket_io_data.io_lock);
636         
637         while (list) {
638                 state = (MonoSocketAsyncResult *) list->data;
639                 if (state->operation == AIO_OP_RECEIVE)
640                         state->operation = AIO_OP_RECV_JUST_CALLBACK;
641                 else if (state->operation == AIO_OP_SEND)
642                         state->operation = AIO_OP_SEND_JUST_CALLBACK;
643
644                 next = g_slist_remove_link (list, list);
645                 list = process_io_event (list, MONO_POLLIN);
646                 if (list)
647                         process_io_event (list, MONO_POLLOUT);
648
649                 list = next;
650         }
651 #endif
652 }
653
654 #ifdef PLATFORM_WIN32
655 static void
656 connect_hack (gpointer x)
657 {
658         struct sockaddr_in *addr = (struct sockaddr_in *) x;
659         int count = 0;
660
661         while (connect ((SOCKET) socket_io_data.pipe [1], (SOCKADDR *) addr, sizeof (struct sockaddr_in))) {
662                 Sleep (500);
663                 if (++count > 3) {
664                         g_warning ("Error initializing async. sockets %d.\n", WSAGetLastError ());
665                         g_assert (WSAGetLastError ());
666                 }
667         }
668 }
669 #endif
670
671 static void
672 socket_io_init (SocketIOData *data)
673 {
674 #ifdef PLATFORM_WIN32
675         struct sockaddr_in server;
676         struct sockaddr_in client;
677         SOCKET srv;
678         int len;
679 #endif
680         int inited;
681
682         inited = InterlockedCompareExchange (&data->inited, -1, -1);
683         if (inited == 1)
684                 return;
685
686         EnterCriticalSection (&data->io_lock);
687         inited = InterlockedCompareExchange (&data->inited, -1, -1);
688         if (inited == 1) {
689                 LeaveCriticalSection (&data->io_lock);
690                 return;
691         }
692
693 #ifdef HAVE_EPOLL
694         data->epoll_disabled = (g_getenv ("MONO_DISABLE_AIO") != NULL);
695         if (FALSE == data->epoll_disabled) {
696                 data->epollfd = epoll_create (256);
697                 data->epoll_disabled = (data->epollfd == -1);
698                 if (data->epoll_disabled && g_getenv ("MONO_DEBUG"))
699                         g_message ("epoll_create() failed. Using plain poll().");
700         } else {
701                 data->epollfd = -1;
702         }
703 #else
704         data->epoll_disabled = TRUE;
705 #endif
706
707 #ifndef PLATFORM_WIN32
708         if (data->epoll_disabled) {
709                 if (pipe (data->pipe) != 0) {
710                         int err = errno;
711                         perror ("mono");
712                         g_assert (err);
713                 }
714         } else {
715                 data->pipe [0] = -1;
716                 data->pipe [1] = -1;
717         }
718 #else
719         srv = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
720         g_assert (srv != INVALID_SOCKET);
721         data->pipe [1] = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
722         g_assert (data->pipe [1] != INVALID_SOCKET);
723
724         server.sin_family = AF_INET;
725         server.sin_addr.s_addr = inet_addr ("127.0.0.1");
726         server.sin_port = 0;
727         if (bind (srv, (SOCKADDR *) &server, sizeof (server))) {
728                 g_print ("%d\n", WSAGetLastError ());
729                 g_assert (1 != 0);
730         }
731
732         len = sizeof (server);
733         getsockname (srv, (SOCKADDR *) &server, &len);
734         listen (srv, 1);
735         mono_thread_create (mono_get_root_domain (), connect_hack, &server);
736         len = sizeof (server);
737         data->pipe [0] = accept (srv, (SOCKADDR *) &client, &len);
738         g_assert (data->pipe [0] != INVALID_SOCKET);
739         closesocket (srv);
740 #endif
741         mono_io_max_worker_threads = mono_max_worker_threads / 2;
742         if (mono_io_max_worker_threads < 10)
743                 mono_io_max_worker_threads = 10;
744
745         data->sock_to_state = g_hash_table_new (g_direct_hash, g_direct_equal);
746
747         if (data->epoll_disabled)
748                 data->new_sem = CreateSemaphore (NULL, 1, 1, NULL);
749         io_job_added = CreateSemaphore (NULL, 0, 0x7fffffff, NULL);
750         InitializeCriticalSection (&io_queue_lock);
751         if (data->epoll_disabled) {
752                 mono_thread_create (mono_get_root_domain (), socket_io_poll_main, data);
753         }
754 #ifdef HAVE_EPOLL
755         else {
756                 mono_thread_create (mono_get_root_domain (), socket_io_epoll_main, data);
757         }
758 #endif
759         InterlockedCompareExchange (&data->inited, 1, 0);
760         LeaveCriticalSection (&data->io_lock);
761 }
762
763 static void
764 socket_io_add_poll (MonoSocketAsyncResult *state)
765 {
766         int events;
767         char msg [1];
768         GSList *list;
769         SocketIOData *data = &socket_io_data;
770
771 #if defined(PLATFORM_MACOSX) || defined(PLATFORM_BSD6) || defined(PLATFORM_WIN32)
772         /* select() for connect() does not work well on the Mac. Bug #75436. */
773         /* Bug #77637 for the BSD 6 case */
774         /* Bug #78888 for the Windows case */
775         if (state->operation == AIO_OP_CONNECT && state->blocking == TRUE) {
776                 start_io_thread_or_queue (state);
777                 return;
778         }
779 #endif
780         WaitForSingleObject (data->new_sem, INFINITE);
781         if (data->newpfd == NULL)
782                 data->newpfd = g_new0 (mono_pollfd, 1);
783
784         EnterCriticalSection (&data->io_lock);
785         list = g_hash_table_lookup (data->sock_to_state, GINT_TO_POINTER (state->handle));
786         if (list == NULL) {
787                 list = g_slist_alloc ();
788                 list->data = state;
789         } else {
790                 list = g_slist_append (list, state);
791         }
792
793         events = get_events_from_list (list);
794         INIT_POLLFD (data->newpfd, GPOINTER_TO_INT (state->handle), events);
795         g_hash_table_replace (data->sock_to_state, GINT_TO_POINTER (state->handle), list);
796         LeaveCriticalSection (&data->io_lock);
797         *msg = (char) state->operation;
798 #ifndef PLATFORM_WIN32
799         write (data->pipe [1], msg, 1);
800 #else
801         send ((SOCKET) data->pipe [1], msg, 1, 0);
802 #endif
803 }
804
805 #ifdef HAVE_EPOLL
806 static gboolean
807 socket_io_add_epoll (MonoSocketAsyncResult *state)
808 {
809         GSList *list;
810         SocketIOData *data = &socket_io_data;
811         struct epoll_event event;
812         int epoll_op, ievt;
813         int fd;
814
815         memset (&event, 0, sizeof (struct epoll_event));
816         fd = GPOINTER_TO_INT (state->handle);
817         EnterCriticalSection (&data->io_lock);
818         list = g_hash_table_lookup (data->sock_to_state, GINT_TO_POINTER (fd));
819         if (list == NULL) {
820                 list = g_slist_alloc ();
821                 list->data = state;
822                 epoll_op = EPOLL_CTL_ADD;
823         } else {
824                 list = g_slist_append (list, state);
825                 epoll_op = EPOLL_CTL_MOD;
826         }
827
828         ievt = get_events_from_list (list);
829         if ((ievt & MONO_POLLIN) != 0)
830                 event.events |= EPOLLIN;
831         if ((ievt & MONO_POLLOUT) != 0)
832                 event.events |= EPOLLOUT;
833
834         g_hash_table_replace (data->sock_to_state, state->handle, list);
835         event.data.fd = fd;
836 #ifdef EPOLL_DEBUG
837         g_print ("%s %d with %d\n", epoll_op == EPOLL_CTL_ADD ? "ADD" : "MOD", fd, event.events);
838 #endif
839         if (epoll_ctl (data->epollfd, epoll_op, fd, &event) == -1) {
840                 int err = errno;
841                 if (epoll_op == EPOLL_CTL_ADD && err == EEXIST) {
842                         epoll_op = EPOLL_CTL_MOD;
843                         if (epoll_ctl (data->epollfd, epoll_op, fd, &event) == -1) {
844                                 g_message ("epoll_ctl(MOD): %d %s\n", err, g_strerror (err));
845                         }
846                 }
847         }
848
849         LeaveCriticalSection (&data->io_lock);
850         return TRUE;
851 }
852 #endif
853
854 static void
855 socket_io_add (MonoAsyncResult *ares, MonoSocketAsyncResult *state)
856 {
857         socket_io_init (&socket_io_data);
858         MONO_OBJECT_SETREF (state, ares, ares);
859 #ifdef HAVE_EPOLL
860         if (socket_io_data.epoll_disabled == FALSE) {
861                 if (socket_io_add_epoll (state))
862                         return;
863         }
864 #endif
865         socket_io_add_poll (state);
866 }
867
868 static gboolean
869 socket_io_filter (MonoObject *target, MonoObject *state)
870 {
871         gint op;
872         MonoSocketAsyncResult *sock_res = (MonoSocketAsyncResult *) state;
873         MonoClass *klass;
874
875         if (target == NULL || state == NULL)
876                 return FALSE;
877
878         if (socket_async_call_klass == NULL) {
879                 klass = target->vtable->klass;
880                 /* Check if it's SocketAsyncCall in System
881                  * FIXME: check the assembly is signed correctly for extra care
882                  */
883                 if (klass->name [0] == 'S' && strcmp (klass->name, "SocketAsyncCall") == 0 
884                                 && strcmp (mono_image_get_name (klass->image), "System") == 0
885                                 && klass->nested_in && strcmp (klass->nested_in->name, "Socket") == 0)
886                         socket_async_call_klass = klass;
887         }
888
889         /* return both when socket_async_call_klass has not been seen yet and when
890          * the object is not an instance of the class.
891          */
892         if (target->vtable->klass != socket_async_call_klass)
893                 return FALSE;
894
895         op = sock_res->operation;
896         if (op < AIO_OP_FIRST || op >= AIO_OP_LAST)
897                 return FALSE;
898
899         return TRUE;
900 }
901
902 static void
903 mono_async_invoke (MonoAsyncResult *ares)
904 {
905         ASyncCall *ac = (ASyncCall *)ares->object_data;
906         MonoThread *thread = NULL;
907
908         if (ares->execution_context) {
909                 /* use captured ExecutionContext (if available) */
910                 thread = mono_thread_current ();
911                 MONO_OBJECT_SETREF (ares, original_context, thread->execution_context);
912                 MONO_OBJECT_SETREF (thread, execution_context, ares->execution_context);
913         } else {
914                 ares->original_context = NULL;
915         }
916
917         ac->msg->exc = NULL;
918         ac->res = mono_message_invoke (ares->async_delegate, ac->msg, 
919                                        &ac->msg->exc, &ac->out_args);
920
921         ares->completed = 1;
922
923         /* call async callback if cb_method != null*/
924         if (ac->cb_method) {
925                 MonoObject *exc = NULL;
926                 void *pa = &ares;
927                 mono_runtime_invoke (ac->cb_method, ac->cb_target, pa, &exc);
928                 /* 'exc' will be the previous ac->msg->exc if not NULL and not
929                  * catched. If catched, this will be set to NULL and the
930                  * exception will not be printed. */
931                 MONO_OBJECT_SETREF (ac->msg, exc, exc);
932         }
933
934         /* restore original thread execution context if flow isn't suppressed, i.e. non null */
935         if (ares->original_context) {
936                 MONO_OBJECT_SETREF (thread, execution_context, ares->original_context);
937                 ares->original_context = NULL;
938         }
939
940         /* notify listeners */
941         mono_monitor_enter ((MonoObject *) ares);
942         if (ares->handle != NULL) {
943                 ac->wait_event = (gsize)((MonoWaitHandle *) ares->handle)->handle;
944                 SetEvent ((gpointer)(gsize)ac->wait_event);
945         }
946         mono_monitor_exit ((MonoObject *) ares);
947
948         EnterCriticalSection (&ares_lock);
949         mono_g_hash_table_remove (ares_htable, ares);
950         LeaveCriticalSection (&ares_lock);
951 }
952
953 void
954 mono_thread_pool_init ()
955 {
956         SYSTEM_INFO info;
957         int threads_per_cpu = THREADS_PER_CPU;
958
959         if ((int) InterlockedCompareExchange (&tp_inited, 1, 0) == 1)
960                 return;
961
962         MONO_GC_REGISTER_ROOT (ares_htable);
963         InitializeCriticalSection (&socket_io_data.io_lock);
964         InitializeCriticalSection (&ares_lock);
965         ares_htable = mono_g_hash_table_new_type (NULL, NULL, MONO_HASH_KEY_VALUE_GC);
966         job_added = CreateSemaphore (NULL, 0, 0x7fffffff, NULL);
967         GetSystemInfo (&info);
968         if (g_getenv ("MONO_THREADS_PER_CPU") != NULL) {
969                 threads_per_cpu = atoi (g_getenv ("MONO_THREADS_PER_CPU"));
970                 if (threads_per_cpu <= 0)
971                         threads_per_cpu = THREADS_PER_CPU;
972         }
973
974         mono_max_worker_threads = 20 + threads_per_cpu * info.dwNumberOfProcessors;
975
976         async_call_klass = mono_class_from_name (mono_defaults.corlib, "System", "MonoAsyncCall");
977         g_assert (async_call_klass);
978 }
979
980 MonoAsyncResult *
981 mono_thread_pool_add (MonoObject *target, MonoMethodMessage *msg, MonoDelegate *async_callback,
982                       MonoObject *state)
983 {
984         MonoDomain *domain = mono_domain_get ();
985         MonoAsyncResult *ares;
986         ASyncCall *ac;
987
988 #ifdef HAVE_BOEHM_GC
989         ac = GC_MALLOC (sizeof (ASyncCall));
990 #elif defined(HAVE_SGEN_GC)
991         ac = mono_object_new (mono_domain_get (), async_call_klass);
992 #else
993         /* We'll leak the event if creaated... */
994         ac = g_new0 (ASyncCall, 1);
995 #endif
996         ac->wait_event = 0;
997         MONO_OBJECT_SETREF (ac, msg, msg);
998         MONO_OBJECT_SETREF (ac, state, state);
999
1000         if (async_callback) {
1001                 ac->cb_method = mono_get_delegate_invoke (((MonoObject *)async_callback)->vtable->klass);
1002                 ac->cb_target = async_callback;
1003         }
1004
1005         ares = mono_async_result_new (domain, NULL, ac->state, NULL, (MonoObject*)ac);
1006         MONO_OBJECT_SETREF (ares, async_delegate, target);
1007
1008         EnterCriticalSection (&ares_lock);
1009         mono_g_hash_table_insert (ares_htable, ares, ares);
1010         LeaveCriticalSection (&ares_lock);
1011
1012         if (socket_io_filter (target, state)) {
1013                 socket_io_add (ares, (MonoSocketAsyncResult *) state);
1014                 return ares;
1015         }
1016
1017         start_thread_or_queue (ares);
1018         return ares;
1019 }
1020
1021 static void
1022 start_thread_or_queue (MonoAsyncResult *ares)
1023 {
1024         int busy, worker;
1025         MonoDomain *domain;
1026
1027         busy = (int) InterlockedCompareExchange (&busy_worker_threads, 0, -1);
1028         worker = (int) InterlockedCompareExchange (&mono_worker_threads, 0, -1); 
1029         if (worker <= ++busy &&
1030             worker < mono_max_worker_threads) {
1031                 InterlockedIncrement (&mono_worker_threads);
1032                 InterlockedIncrement (&busy_worker_threads);
1033                 domain = ((MonoObject *) ares)->vtable->domain;
1034                 mono_thread_create (mono_get_root_domain (), async_invoke_thread, ares);
1035         } else {
1036                 append_job (&mono_delegate_section, &async_call_queue, ares);
1037                 ReleaseSemaphore (job_added, 1, NULL);
1038         }
1039 }
1040
1041 MonoObject *
1042 mono_thread_pool_finish (MonoAsyncResult *ares, MonoArray **out_args, MonoObject **exc)
1043 {
1044         ASyncCall *ac;
1045
1046         *exc = NULL;
1047         *out_args = NULL;
1048
1049         /* check if already finished */
1050         mono_monitor_enter ((MonoObject *) ares);
1051         
1052         if (ares->endinvoke_called) {
1053                 *exc = (MonoObject *)mono_exception_from_name (mono_defaults.corlib, "System", 
1054                                               "InvalidOperationException");
1055                 mono_monitor_exit ((MonoObject *) ares);
1056                 return NULL;
1057         }
1058
1059         ares->endinvoke_called = 1;
1060         ac = (ASyncCall *)ares->object_data;
1061
1062         g_assert (ac != NULL);
1063
1064         /* wait until we are really finished */
1065         if (!ares->completed) {
1066                 if (ares->handle == NULL) {
1067                         ac->wait_event = (gsize)CreateEvent (NULL, TRUE, FALSE, NULL);
1068                         MONO_OBJECT_SETREF (ares, handle, (MonoObject *) mono_wait_handle_new (mono_object_domain (ares), (gpointer)(gsize)ac->wait_event));
1069                 }
1070                 mono_monitor_exit ((MonoObject *) ares);
1071                 WaitForSingleObjectEx ((gpointer)(gsize)ac->wait_event, INFINITE, TRUE);
1072         } else {
1073                 mono_monitor_exit ((MonoObject *) ares);
1074         }
1075
1076         *exc = ac->msg->exc; /* FIXME: GC add write barrier */
1077         *out_args = ac->out_args;
1078
1079         return ac->res;
1080 }
1081
1082 void
1083 mono_thread_pool_cleanup (void)
1084 {
1085         gint release;
1086
1087         EnterCriticalSection (&mono_delegate_section);
1088         g_list_free (async_call_queue);
1089         async_call_queue = NULL;
1090         release = (gint) InterlockedCompareExchange (&mono_worker_threads, 0, -1);
1091         LeaveCriticalSection (&mono_delegate_section);
1092         if (job_added)
1093                 ReleaseSemaphore (job_added, release, NULL);
1094
1095         socket_io_cleanup (&socket_io_data);
1096 }
1097
1098 static void
1099 append_job (CRITICAL_SECTION *cs, GList **plist, gpointer ar)
1100 {
1101         GList *tmp, *list;
1102
1103         EnterCriticalSection (cs);
1104         list = *plist;
1105         if (list == NULL) {
1106                 list = g_list_append (list, ar); 
1107         } else {
1108                 for (tmp = list; tmp && tmp->data != NULL; tmp = tmp->next);
1109                 if (tmp == NULL) {
1110                         list = g_list_append (list, ar); 
1111                 } else {
1112                         tmp->data = ar;
1113                 }
1114         }
1115         *plist = list;
1116         LeaveCriticalSection (cs);
1117 }
1118
1119 static gpointer
1120 dequeue_job (CRITICAL_SECTION *cs, GList **plist)
1121 {
1122         gpointer ar = NULL;
1123         GList *tmp, *tmp2, *list;
1124
1125         EnterCriticalSection (cs);
1126         list = *plist;
1127         tmp = list;
1128         if (tmp) {
1129                 ar = tmp->data;
1130                 tmp->data = NULL;
1131                 tmp2 = tmp;
1132                 for (tmp2 = tmp; tmp2->next != NULL; tmp2 = tmp2->next);
1133                 if (tmp2 != tmp) {
1134                         list = tmp->next;
1135                         tmp->next = NULL;
1136                         tmp2->next = tmp;
1137                         tmp->prev = tmp2;
1138                 }
1139         }
1140         *plist = list;
1141         LeaveCriticalSection (cs);
1142
1143         return ar;
1144 }
1145
1146 static void
1147 async_invoke_thread (gpointer data)
1148 {
1149         MonoDomain *domain;
1150         MonoThread *thread;
1151         int workers, min;
1152  
1153         thread = mono_thread_current ();
1154         thread->threadpool_thread = TRUE;
1155         ves_icall_System_Threading_Thread_SetState (thread, ThreadState_Background);
1156
1157         for (;;) {
1158                 MonoAsyncResult *ar;
1159
1160                 ar = (MonoAsyncResult *) data;
1161                 if (ar) {
1162                         /* worker threads invokes methods in different domains,
1163                          * so we need to set the right domain here */
1164                         domain = ((MonoObject *)ar)->vtable->domain;
1165                         mono_thread_push_appdomain_ref (domain);
1166                         if (mono_domain_set (domain, FALSE)) {
1167                                 ASyncCall *ac;
1168
1169                                 mono_async_invoke (ar);
1170                                 ac = (ASyncCall *) ar->object_data;
1171                                 /*
1172                                 if (ac->msg->exc != NULL)
1173                                         mono_unhandled_exception (ac->msg->exc);
1174                                 */
1175                                 mono_domain_set (mono_get_root_domain (), TRUE);
1176                         }
1177                         mono_thread_pop_appdomain_ref ();
1178                         InterlockedDecrement (&busy_worker_threads);
1179                 }
1180
1181                 data = dequeue_job (&mono_delegate_section, &async_call_queue);
1182
1183                 if (!data) {
1184                         guint32 wr;
1185                         int timeout = 10000;
1186                         guint32 start_time = GetTickCount ();
1187                         
1188                         do {
1189                                 wr = WaitForSingleObjectEx (job_added, (guint32)timeout, TRUE);
1190                                 if (THREAD_WANTS_A_BREAK (thread))
1191                                         mono_thread_interruption_checkpoint ();
1192                         
1193                                 timeout -= GetTickCount () - start_time;
1194                         
1195                                 if (wr != WAIT_TIMEOUT)
1196                                         data = dequeue_job (&mono_delegate_section, &async_call_queue);
1197                         }
1198                         while (!data && timeout > 0);
1199                 }
1200
1201                 if (!data) {
1202                         workers = (int) InterlockedCompareExchange (&mono_worker_threads, 0, -1); 
1203                         min = (int) InterlockedCompareExchange (&mono_min_worker_threads, 0, -1); 
1204         
1205                         while (!data && workers <= min) {
1206                                 WaitForSingleObjectEx (job_added, INFINITE, TRUE);
1207                                 if (THREAD_WANTS_A_BREAK (thread))
1208                                         mono_thread_interruption_checkpoint ();
1209                         
1210                                 data = dequeue_job (&mono_delegate_section, &async_call_queue);
1211                                 workers = (int) InterlockedCompareExchange (&mono_worker_threads, 0, -1); 
1212                                 min = (int) InterlockedCompareExchange (&mono_min_worker_threads, 0, -1); 
1213                         }
1214                 }
1215         
1216                 if (!data) {
1217                         InterlockedDecrement (&mono_worker_threads);
1218                         return;
1219                 }
1220                 
1221                 InterlockedIncrement (&busy_worker_threads);
1222         }
1223
1224         g_assert_not_reached ();
1225 }
1226
1227 void
1228 ves_icall_System_Threading_ThreadPool_GetAvailableThreads (gint *workerThreads, gint *completionPortThreads)
1229 {
1230         gint busy;
1231
1232         MONO_ARCH_SAVE_REGS;
1233
1234         busy = (gint) InterlockedCompareExchange (&busy_worker_threads, 0, -1);
1235         *workerThreads = mono_max_worker_threads - busy;
1236         *completionPortThreads = 0;
1237 }
1238
1239 void
1240 ves_icall_System_Threading_ThreadPool_GetMaxThreads (gint *workerThreads, gint *completionPortThreads)
1241 {
1242         MONO_ARCH_SAVE_REGS;
1243
1244         *workerThreads = mono_max_worker_threads;
1245         *completionPortThreads = 0;
1246 }
1247
1248 void
1249 ves_icall_System_Threading_ThreadPool_GetMinThreads (gint *workerThreads, gint *completionPortThreads)
1250 {
1251         gint workers;
1252
1253         MONO_ARCH_SAVE_REGS;
1254
1255         workers = (gint) InterlockedCompareExchange (&mono_min_worker_threads, 0, -1);
1256         *workerThreads = workers;
1257         *completionPortThreads = 0;
1258 }
1259
1260 MonoBoolean
1261 ves_icall_System_Threading_ThreadPool_SetMinThreads (gint workerThreads, gint completionPortThreads)
1262 {
1263         MONO_ARCH_SAVE_REGS;
1264
1265         if (workerThreads < 0 || workerThreads > mono_max_worker_threads)
1266                 return FALSE;
1267         InterlockedExchange (&mono_min_worker_threads, workerThreads);
1268         /* FIXME: should actually start the idle threads if needed */
1269         return TRUE;
1270 }
1271