Update
[mono.git] / mono / metadata / threadpool.c
1 /*
2  * threadpool.c: global thread pool
3  *
4  * Authors:
5  *   Dietmar Maurer (dietmar@ximian.com)
6  *   Gonzalo Paniagua Javier (gonzalo@ximian.com)
7  *
8  * (C) 2001-2003 Ximian, Inc.
9  * (c) 2004 Novell, Inc. (http://www.novell.com)
10  */
11
12 #include <config.h>
13 #include <glib.h>
14
15 #ifdef PLATFORM_WIN32
16 #define WINVER 0x0500
17 #define _WIN32_WINNT 0x0500
18 #define THREADS_PER_CPU 25
19 #else
20 #define THREADS_PER_CPU 50
21 #endif
22
23 #include <mono/metadata/domain-internals.h>
24 #include <mono/metadata/tabledefs.h>
25 #include <mono/metadata/threads.h>
26 #include <mono/metadata/threads-types.h>
27 #include <mono/metadata/exception.h>
28 #include <mono/metadata/file-io.h>
29 #include <mono/metadata/monitor.h>
30 #include <mono/metadata/marshal.h>
31 #include <mono/io-layer/io-layer.h>
32 #include <mono/os/gc_wrapper.h>
33
34 #include "threadpool.h"
35
36 /* maximum number of worker threads */
37 int mono_max_worker_threads = THREADS_PER_CPU;
38 static int mono_min_worker_threads = 0;
39
40 /* current number of worker threads */
41 static int mono_worker_threads = 0;
42
43 /* current number of busy threads */
44 static int busy_worker_threads = 0;
45
46 /* mono_thread_pool_init called */
47 static int tp_inited;
48
49 /* we use this to store a reference to the AsyncResult to avoid GC */
50 static MonoGHashTable *ares_htable = NULL;
51
52 static CRITICAL_SECTION ares_lock;
53
54 /* we append a job */
55 static HANDLE job_added;
56
57 typedef struct {
58         MonoMethodMessage *msg;
59         HANDLE             wait_event;
60         MonoMethod        *cb_method;
61         MonoDelegate      *cb_target;
62         MonoObject        *state;
63         MonoObject        *res;
64         MonoArray         *out_args;
65 } ASyncCall;
66
67 static void async_invoke_thread (gpointer data);
68 static void append_job (MonoAsyncResult *ar);
69
70 static GList *async_call_queue = NULL;
71
72 static void
73 mono_async_invoke (MonoAsyncResult *ares)
74 {
75         ASyncCall *ac = (ASyncCall *)ares->data;
76
77         ac->msg->exc = NULL;
78         ac->res = mono_message_invoke (ares->async_delegate, ac->msg, 
79                                        &ac->msg->exc, &ac->out_args);
80
81         ares->completed = 1;
82
83         /* call async callback if cb_method != null*/
84         if (ac->cb_method) {
85                 MonoObject *exc = NULL;
86                 void *pa = &ares;
87                 mono_runtime_invoke (ac->cb_method, ac->cb_target, pa, &exc);
88                 if (!ac->msg->exc)
89                         ac->msg->exc = exc;
90         }
91
92         /* notify listeners */
93         mono_monitor_enter ((MonoObject *) ares);
94         
95         if (ares->handle != NULL) {
96                 ac->wait_event = ((MonoWaitHandle *) ares->handle)->handle;
97                 SetEvent (ac->wait_event);
98         }
99         mono_monitor_exit ((MonoObject *) ares);
100
101         EnterCriticalSection (&ares_lock);
102         mono_g_hash_table_remove (ares_htable, ares);
103         LeaveCriticalSection (&ares_lock);
104 }
105
106 void
107 mono_thread_pool_init ()
108 {
109         SYSTEM_INFO info;
110         int threads_per_cpu = THREADS_PER_CPU;
111
112         if ((int) InterlockedCompareExchange (&tp_inited, 1, 0) == 1)
113                 return;
114
115         MONO_GC_REGISTER_ROOT (ares_htable);
116         InitializeCriticalSection (&ares_lock);
117         ares_htable = mono_g_hash_table_new (NULL, NULL);
118         job_added = CreateSemaphore (NULL, 0, 0x7fffffff, NULL);
119         GetSystemInfo (&info);
120         if (getenv ("MONO_THREADS_PER_CPU") != NULL) {
121                 threads_per_cpu = atoi (getenv ("MONO_THREADS_PER_CPU"));
122                 if (threads_per_cpu <= 0)
123                         threads_per_cpu = THREADS_PER_CPU;
124         }
125
126         mono_max_worker_threads = threads_per_cpu * info.dwNumberOfProcessors;
127 }
128
129 MonoAsyncResult *
130 mono_thread_pool_add (MonoObject *target, MonoMethodMessage *msg, MonoDelegate *async_callback,
131                       MonoObject *state)
132 {
133         MonoDomain *domain = mono_domain_get ();
134         MonoAsyncResult *ares;
135         ASyncCall *ac;
136         int busy, worker;
137
138 #ifdef HAVE_BOEHM_GC
139         ac = GC_MALLOC (sizeof (ASyncCall));
140 #else
141         /* We'll leak the event if creaated... */
142         ac = g_new0 (ASyncCall, 1);
143 #endif
144         ac->wait_event = NULL;
145         ac->msg = msg;
146         ac->state = state;
147
148         if (async_callback) {
149                 ac->cb_method = mono_get_delegate_invoke (((MonoObject *)async_callback)->vtable->klass);
150                 ac->cb_target = async_callback;
151         }
152
153         ares = mono_async_result_new (domain, NULL, ac->state, ac);
154         ares->async_delegate = target;
155
156         EnterCriticalSection (&ares_lock);
157         mono_g_hash_table_insert (ares_htable, ares, ares);
158         LeaveCriticalSection (&ares_lock);
159         
160         busy = (int) InterlockedCompareExchange (&busy_worker_threads, 0, -1);
161         worker = (int) InterlockedCompareExchange (&mono_worker_threads, 0, -1); 
162         if (worker <= ++busy &&
163             worker < mono_max_worker_threads) {
164                 InterlockedIncrement (&mono_worker_threads);
165                 InterlockedIncrement (&busy_worker_threads);
166                 mono_thread_create (domain, async_invoke_thread, ares);
167         } else {
168                 append_job (ares);
169                 ReleaseSemaphore (job_added, 1, NULL);
170         }
171
172         return ares;
173 }
174
175 MonoObject *
176 mono_thread_pool_finish (MonoAsyncResult *ares, MonoArray **out_args, MonoObject **exc)
177 {
178         ASyncCall *ac;
179
180         *exc = NULL;
181         *out_args = NULL;
182
183         /* check if already finished */
184         mono_monitor_enter ((MonoObject *) ares);
185         
186         if (ares->endinvoke_called) {
187                 *exc = (MonoObject *)mono_exception_from_name (mono_defaults.corlib, "System", 
188                                               "InvalidOperationException");
189                 mono_monitor_exit ((MonoObject *) ares);
190                 return NULL;
191         }
192
193         ares->endinvoke_called = 1;
194         ac = (ASyncCall *)ares->data;
195
196         g_assert (ac != NULL);
197
198         /* wait until we are really finished */
199         if (!ares->completed) {
200                 if (ares->handle == NULL) {
201                         ac->wait_event = CreateEvent (NULL, TRUE, FALSE, NULL);
202                         ares->handle = (MonoObject *) mono_wait_handle_new (mono_object_domain (ares), ac->wait_event);
203                 }
204                 mono_monitor_exit ((MonoObject *) ares);
205                 WaitForSingleObjectEx (ac->wait_event, INFINITE, TRUE);
206         } else {
207                 mono_monitor_exit ((MonoObject *) ares);
208         }
209
210         *exc = ac->msg->exc;
211         *out_args = ac->out_args;
212
213         return ac->res;
214 }
215
216 void
217 mono_thread_pool_cleanup (void)
218 {
219         gint release;
220
221         EnterCriticalSection (&mono_delegate_section);
222         g_list_free (async_call_queue);
223         async_call_queue = NULL;
224         release = (gint) InterlockedCompareExchange (&busy_worker_threads, 0, -1);
225         LeaveCriticalSection (&mono_delegate_section);
226         if (job_added)
227                 ReleaseSemaphore (job_added, release, NULL);
228 }
229
230 static void
231 append_job (MonoAsyncResult *ar)
232 {
233         GList *tmp;
234
235         EnterCriticalSection (&mono_delegate_section);
236         if (async_call_queue == NULL) {
237                 async_call_queue = g_list_append (async_call_queue, ar); 
238         } else {
239                 for (tmp = async_call_queue; tmp && tmp->data != NULL; tmp = tmp->next);
240                 if (tmp == NULL) {
241                         async_call_queue = g_list_append (async_call_queue, ar); 
242                 } else {
243                         tmp->data = ar;
244                 }
245         }
246         LeaveCriticalSection (&mono_delegate_section);
247 }
248
249 static MonoAsyncResult *
250 dequeue_job (void)
251 {
252         MonoAsyncResult *ar = NULL;
253         GList *tmp, *tmp2;
254
255         EnterCriticalSection (&mono_delegate_section);
256         tmp = async_call_queue;
257         if (tmp) {
258                 ar = (MonoAsyncResult *) tmp->data;
259                 tmp->data = NULL;
260                 tmp2 = tmp;
261                 for (tmp2 = tmp; tmp2->next != NULL; tmp2 = tmp2->next);
262                 if (tmp2 != tmp) {
263                         async_call_queue = tmp->next;
264                         tmp->next = NULL;
265                         tmp2->next = tmp;
266                         tmp->prev = tmp2;
267                 }
268         }
269         LeaveCriticalSection (&mono_delegate_section);
270
271         return ar;
272 }
273
274 static void
275 async_invoke_thread (gpointer data)
276 {
277         MonoDomain *domain;
278         MonoThread *thread;
279         int workers, min;
280  
281         thread = mono_thread_current ();
282         thread->threadpool_thread = TRUE;
283         thread->state |= ThreadState_Background;
284
285         for (;;) {
286                 MonoAsyncResult *ar;
287
288                 ar = (MonoAsyncResult *) data;
289                 if (ar) {
290                         /* worker threads invokes methods in different domains,
291                          * so we need to set the right domain here */
292                         domain = ((MonoObject *)ar)->vtable->domain;
293                         if (mono_domain_set (domain, FALSE)) {
294                                 mono_thread_push_appdomain_ref (domain);
295                                 mono_async_invoke (ar);
296                                 mono_thread_pop_appdomain_ref ();
297                         }
298                         InterlockedDecrement (&busy_worker_threads);
299                 }
300
301                 data = dequeue_job ();
302         
303                 if (!data) {
304                         guint32 wr;
305                         int timeout = 10000;
306                         guint32 start_time = GetTickCount ();
307                         
308                         do {
309                                 wr = WaitForSingleObjectEx (job_added, (guint32)timeout, TRUE);
310                                 if ((thread->state & ThreadState_StopRequested)!=0)
311                                         mono_thread_interruption_checkpoint ();
312                         
313                                 timeout -= GetTickCount () - start_time;
314                         
315                                 if (wr != WAIT_TIMEOUT)
316                                         data = dequeue_job ();
317                         }
318                         while (!data && timeout > 0);
319                 }
320
321                 if (!data) {
322                         workers = (int) InterlockedCompareExchange (&mono_worker_threads, 0, -1); 
323                         min = (int) InterlockedCompareExchange (&mono_min_worker_threads, 0, -1); 
324         
325                         while (!data && workers <= min) {
326                                 WaitForSingleObjectEx (job_added, INFINITE, TRUE);
327                                 if ((thread->state & ThreadState_StopRequested)!=0)
328                                         mono_thread_interruption_checkpoint ();
329                         
330                                 data = dequeue_job ();
331                                 workers = (int) InterlockedCompareExchange (&mono_worker_threads, 0, -1); 
332                                 min = (int) InterlockedCompareExchange (&mono_min_worker_threads, 0, -1); 
333                         }
334                 }
335         
336                 if (!data) {
337                         InterlockedDecrement (&mono_worker_threads);
338                         return;
339                 }
340                 
341                 InterlockedIncrement (&busy_worker_threads);
342         }
343
344         g_assert_not_reached ();
345 }
346
347 void
348 ves_icall_System_Threading_ThreadPool_GetAvailableThreads (gint *workerThreads, gint *completionPortThreads)
349 {
350         gint busy;
351
352         MONO_ARCH_SAVE_REGS;
353
354         busy = (gint) InterlockedCompareExchange (&busy_worker_threads, 0, -1);
355         *workerThreads = mono_max_worker_threads - busy;
356         *completionPortThreads = 0;
357 }
358
359 void
360 ves_icall_System_Threading_ThreadPool_GetMaxThreads (gint *workerThreads, gint *completionPortThreads)
361 {
362         MONO_ARCH_SAVE_REGS;
363
364         *workerThreads = mono_max_worker_threads;
365         *completionPortThreads = 0;
366 }
367
368 void
369 ves_icall_System_Threading_ThreadPool_GetMinThreads (gint *workerThreads, gint *completionPortThreads)
370 {
371         gint workers;
372
373         MONO_ARCH_SAVE_REGS;
374
375         workers = (gint) InterlockedCompareExchange (&mono_min_worker_threads, 0, -1);
376         *workerThreads = workers;
377         *completionPortThreads = 0;
378 }
379
380 MonoBoolean
381 ves_icall_System_Threading_ThreadPool_SetMinThreads (gint workerThreads, gint completionPortThreads)
382 {
383         MONO_ARCH_SAVE_REGS;
384
385         if (workerThreads < 0 || workerThreads > mono_max_worker_threads)
386                 return FALSE;
387         InterlockedExchange (&mono_min_worker_threads, workerThreads);
388         /* FIXME: should actually start the idle threads if needed */
389         return TRUE;
390 }
391
392 static void
393 overlapped_callback (guint32 error, guint32 numbytes, WapiOverlapped *overlapped)
394 {
395         MonoFSAsyncResult *ares;
396         MonoThread *thread;
397  
398         MONO_ARCH_SAVE_REGS;
399
400         ares = (MonoFSAsyncResult *) overlapped->handle1;
401         ares->completed = TRUE;
402         if (ares->bytes_read != -1)
403                 ares->bytes_read = numbytes;
404         else
405                 ares->count = numbytes;
406
407         thread = mono_thread_attach (mono_object_domain (ares));
408         if (ares->async_callback != NULL) {
409                 gpointer p [1];
410
411                 *p = ares;
412                 mono_runtime_invoke (ares->async_callback->method_info->method, NULL, p, NULL);
413         }
414
415         SetEvent (ares->wait_handle->handle);
416         mono_thread_detach (thread);
417         g_free (overlapped);
418 }
419
420 MonoBoolean
421 ves_icall_System_Threading_ThreadPool_BindHandle (gpointer handle)
422 {
423         MONO_ARCH_SAVE_REGS;
424
425 #ifdef PLATFORM_WIN32
426         return FALSE;
427 #else
428         if (!BindIoCompletionCallback (handle, overlapped_callback, 0)) {
429                 gint error = GetLastError ();
430                 MonoException *exc;
431                 gchar *msg;
432
433                 if (error == ERROR_INVALID_PARAMETER) {
434                         exc = mono_get_exception_argument (NULL, "Invalid parameter.");
435                 } else {
436                         msg = g_strdup_printf ("Win32 error %d.", error);
437                         exc = mono_exception_from_name_msg (mono_defaults.corlib,
438                                                             "System",
439                                                             "ApplicationException", msg);
440                         g_free (msg);
441                 }
442
443                 mono_raise_exception (exc);
444         }
445
446         return TRUE;
447 #endif
448 }
449