2004-11-28 Martin Baulig <martin@ximian.com>
[mono.git] / mono / metadata / threadpool.c
1 /*
2  * threadpool.c: global thread pool
3  *
4  * Authors:
5  *   Dietmar Maurer (dietmar@ximian.com)
6  *   Gonzalo Paniagua Javier (gonzalo@ximian.com)
7  *
8  * (C) 2001-2003 Ximian, Inc.
9  * (c) 2004 Novell, Inc. (http://www.novell.com)
10  */
11
12 #include <config.h>
13 #include <glib.h>
14
15 #ifdef PLATFORM_WIN32
16 #define WINVER 0x0500
17 #define _WIN32_WINNT 0x0500
18 #define THREADS_PER_CPU 25
19 #else
20 #define THREADS_PER_CPU 50
21 #endif
22
23 #include <mono/metadata/domain-internals.h>
24 #include <mono/metadata/tabledefs.h>
25 #include <mono/metadata/threads.h>
26 #include <mono/metadata/threads-types.h>
27 #include <mono/metadata/exception.h>
28 #include <mono/metadata/file-io.h>
29 #include <mono/metadata/monitor.h>
30 #include <mono/metadata/marshal.h>
31 #include <mono/io-layer/io-layer.h>
32 #include <mono/os/gc_wrapper.h>
33
34 #include "threadpool.h"
35
36 /* maximum number of worker threads */
37 int mono_max_worker_threads = THREADS_PER_CPU;
38 static int mono_min_worker_threads = 0;
39
40 /* current number of worker threads */
41 static int mono_worker_threads = 0;
42
43 /* current number of busy threads */
44 static int busy_worker_threads = 0;
45
46 /* mono_thread_pool_init called */
47 static int tp_inited;
48
49 /* we use this to store a reference to the AsyncResult to avoid GC */
50 static MonoGHashTable *ares_htable = NULL;
51
52 static CRITICAL_SECTION ares_lock;
53
54 /* we append a job */
55 static HANDLE job_added;
56
57 typedef struct {
58         MonoMethodMessage *msg;
59         HANDLE             wait_event;
60         MonoMethod        *cb_method;
61         MonoDelegate      *cb_target;
62         MonoObject        *state;
63         MonoObject        *res;
64         MonoArray         *out_args;
65 } ASyncCall;
66
67 static void async_invoke_thread (gpointer data);
68 static void append_job (MonoAsyncResult *ar);
69
70 static GList *async_call_queue = NULL;
71
72 static void
73 mono_async_invoke (MonoAsyncResult *ares)
74 {
75         ASyncCall *ac = (ASyncCall *)ares->data;
76
77         ac->msg->exc = NULL;
78         ac->res = mono_message_invoke (ares->async_delegate, ac->msg, 
79                                        &ac->msg->exc, &ac->out_args);
80
81         ares->completed = 1;
82
83         /* call async callback if cb_method != null*/
84         if (ac->cb_method) {
85                 MonoObject *exc = NULL;
86                 void *pa = &ares;
87                 mono_runtime_invoke (ac->cb_method, ac->cb_target, pa, &exc);
88                 if (!ac->msg->exc)
89                         ac->msg->exc = exc;
90         }
91
92         /* notify listeners */
93         mono_monitor_enter ((MonoObject *) ares);
94         
95         if (ares->handle != NULL) {
96                 ac->wait_event = ((MonoWaitHandle *) ares->handle)->handle;
97                 SetEvent (ac->wait_event);
98         }
99         mono_monitor_exit ((MonoObject *) ares);
100
101         EnterCriticalSection (&ares_lock);
102         mono_g_hash_table_remove (ares_htable, ares);
103         LeaveCriticalSection (&ares_lock);
104 }
105
106 void
107 mono_thread_pool_init ()
108 {
109         SYSTEM_INFO info;
110         int threads_per_cpu = THREADS_PER_CPU;
111
112         if ((int) InterlockedCompareExchange (&tp_inited, 1, 0) == 1)
113                 return;
114
115         MONO_GC_REGISTER_ROOT (ares_htable);
116         InitializeCriticalSection (&ares_lock);
117         ares_htable = mono_g_hash_table_new (NULL, NULL);
118         job_added = CreateSemaphore (NULL, 0, 0x7fffffff, NULL);
119         GetSystemInfo (&info);
120         if (getenv ("MONO_THREADS_PER_CPU") != NULL) {
121                 threads_per_cpu = atoi (getenv ("MONO_THREADS_PER_CPU"));
122                 if (threads_per_cpu <= 0)
123                         threads_per_cpu = THREADS_PER_CPU;
124         }
125
126         mono_max_worker_threads = threads_per_cpu * info.dwNumberOfProcessors;
127 }
128
129 MonoAsyncResult *
130 mono_thread_pool_add (MonoObject *target, MonoMethodMessage *msg, MonoDelegate *async_callback,
131                       MonoObject *state)
132 {
133         MonoDomain *domain = mono_domain_get ();
134         MonoAsyncResult *ares;
135         ASyncCall *ac;
136         int busy, worker;
137
138 #ifdef HAVE_BOEHM_GC
139         ac = GC_MALLOC (sizeof (ASyncCall));
140 #else
141         /* We'll leak the event if creaated... */
142         ac = g_new0 (ASyncCall, 1);
143 #endif
144         ac->wait_event = NULL;
145         ac->msg = msg;
146         ac->state = state;
147
148         if (async_callback) {
149                 ac->cb_method = mono_get_delegate_invoke (((MonoObject *)async_callback)->vtable->klass);
150                 ac->cb_target = async_callback;
151         }
152
153         ares = mono_async_result_new (domain, NULL, ac->state, ac);
154         ares->async_delegate = target;
155
156         EnterCriticalSection (&ares_lock);
157         mono_g_hash_table_insert (ares_htable, ares, ares);
158         LeaveCriticalSection (&ares_lock);
159         
160         busy = (int) InterlockedCompareExchange (&busy_worker_threads, 0, -1);
161         worker = (int) InterlockedCompareExchange (&mono_worker_threads, 0, -1); 
162         if (worker <= ++busy &&
163             worker < mono_max_worker_threads) {
164                 InterlockedIncrement (&mono_worker_threads);
165                 InterlockedIncrement (&busy_worker_threads);
166                 mono_thread_create (domain, async_invoke_thread, ares);
167         } else {
168                 append_job (ares);
169                 ReleaseSemaphore (job_added, 1, NULL);
170         }
171
172         return ares;
173 }
174
175 MonoObject *
176 mono_thread_pool_finish (MonoAsyncResult *ares, MonoArray **out_args, MonoObject **exc)
177 {
178         ASyncCall *ac;
179
180         *exc = NULL;
181         *out_args = NULL;
182
183         /* check if already finished */
184         mono_monitor_enter ((MonoObject *) ares);
185         
186         if (ares->endinvoke_called) {
187                 *exc = (MonoObject *)mono_exception_from_name (mono_defaults.corlib, "System", 
188                                               "InvalidOperationException");
189                 mono_monitor_exit ((MonoObject *) ares);
190                 return NULL;
191         }
192
193         ares->endinvoke_called = 1;
194         ac = (ASyncCall *)ares->data;
195
196         g_assert (ac != NULL);
197
198         /* wait until we are really finished */
199         if (!ares->completed) {
200                 if (ares->handle == NULL) {
201                         ac->wait_event = CreateEvent (NULL, TRUE, FALSE, NULL);
202                         ares->handle = (MonoObject *) mono_wait_handle_new (mono_object_domain (ares), ac->wait_event);
203                 }
204                 mono_monitor_exit ((MonoObject *) ares);
205                 WaitForSingleObjectEx (ac->wait_event, INFINITE, TRUE);
206         } else {
207                 mono_monitor_exit ((MonoObject *) ares);
208         }
209
210         *exc = ac->msg->exc;
211         *out_args = ac->out_args;
212
213         return ac->res;
214 }
215
216 void
217 mono_thread_pool_cleanup (void)
218 {
219         gint release;
220
221         EnterCriticalSection (&mono_delegate_section);
222         g_list_free (async_call_queue);
223         async_call_queue = NULL;
224         release = (gint) InterlockedCompareExchange (&busy_worker_threads, 0, -1);
225         LeaveCriticalSection (&mono_delegate_section);
226         if (job_added)
227                 ReleaseSemaphore (job_added, release, NULL);
228 }
229
230 static void
231 append_job (MonoAsyncResult *ar)
232 {
233         GList *tmp;
234
235         EnterCriticalSection (&mono_delegate_section);
236         if (async_call_queue == NULL) {
237                 async_call_queue = g_list_append (async_call_queue, ar); 
238         } else {
239                 for (tmp = async_call_queue; tmp && tmp->data != NULL; tmp = tmp->next);
240                 if (tmp == NULL) {
241                         async_call_queue = g_list_append (async_call_queue, ar); 
242                 } else {
243                         tmp->data = ar;
244                 }
245         }
246         LeaveCriticalSection (&mono_delegate_section);
247 }
248
249 static MonoAsyncResult *
250 dequeue_job (void)
251 {
252         MonoAsyncResult *ar = NULL;
253         GList *tmp, *tmp2;
254
255         EnterCriticalSection (&mono_delegate_section);
256         tmp = async_call_queue;
257         if (tmp) {
258                 ar = (MonoAsyncResult *) tmp->data;
259                 tmp->data = NULL;
260                 tmp2 = tmp;
261                 for (tmp2 = tmp; tmp2->next != NULL; tmp2 = tmp2->next);
262                 if (tmp2 != tmp) {
263                         async_call_queue = tmp->next;
264                         tmp->next = NULL;
265                         tmp2->next = tmp;
266                         tmp->prev = tmp2;
267                 }
268         }
269         LeaveCriticalSection (&mono_delegate_section);
270
271         return ar;
272 }
273
274 static void
275 async_invoke_thread (gpointer data)
276 {
277         MonoDomain *domain;
278         MonoThread *thread;
279         int workers, min;
280  
281         thread = mono_thread_current ();
282         thread->threadpool_thread = TRUE;
283         thread->state |= ThreadState_Background;
284
285         for (;;) {
286                 MonoAsyncResult *ar;
287
288                 ar = (MonoAsyncResult *) data;
289                 if (ar) {
290                         /* worker threads invokes methods in different domains,
291                          * so we need to set the right domain here */
292                         domain = ((MonoObject *)ar)->vtable->domain;
293                         if (mono_domain_set (domain, FALSE))
294                                 mono_async_invoke (ar);
295                         InterlockedDecrement (&busy_worker_threads);
296                 }
297
298                 data = dequeue_job ();
299         
300                 if (!data) {
301                         guint32 wr;
302                         int timeout = 500;
303                         guint32 start_time = GetTickCount ();
304                         
305                         do {
306                                 wr = WaitForSingleObjectEx (job_added, (guint32)timeout, TRUE);
307                                 if ((thread->state & ThreadState_StopRequested)!=0)
308                                         mono_thread_interruption_checkpoint ();
309                         
310                                 timeout -= GetTickCount () - start_time;
311                         
312                                 if (wr != WAIT_TIMEOUT)
313                                         data = dequeue_job ();
314                         }
315                         while (!data && timeout > 0);
316                 }
317
318                 if (!data) {
319                         workers = (int) InterlockedCompareExchange (&mono_worker_threads, 0, -1); 
320                         min = (int) InterlockedCompareExchange (&mono_min_worker_threads, 0, -1); 
321         
322                         while (!data && workers <= min) {
323                                 WaitForSingleObjectEx (job_added, INFINITE, TRUE);
324                                 if ((thread->state & ThreadState_StopRequested)!=0)
325                                         mono_thread_interruption_checkpoint ();
326                         
327                                 data = dequeue_job ();
328                                 workers = (int) InterlockedCompareExchange (&mono_worker_threads, 0, -1); 
329                                 min = (int) InterlockedCompareExchange (&mono_min_worker_threads, 0, -1); 
330                         }
331                 }
332         
333                 if (!data) {
334                         InterlockedDecrement (&mono_worker_threads);
335                         return;
336                 }
337                 
338                 InterlockedIncrement (&busy_worker_threads);
339         }
340
341         g_assert_not_reached ();
342 }
343
344 void
345 ves_icall_System_Threading_ThreadPool_GetAvailableThreads (gint *workerThreads, gint *completionPortThreads)
346 {
347         gint busy;
348
349         MONO_ARCH_SAVE_REGS;
350
351         busy = (gint) InterlockedCompareExchange (&busy_worker_threads, 0, -1);
352         *workerThreads = mono_max_worker_threads - busy;
353         *completionPortThreads = 0;
354 }
355
356 void
357 ves_icall_System_Threading_ThreadPool_GetMaxThreads (gint *workerThreads, gint *completionPortThreads)
358 {
359         MONO_ARCH_SAVE_REGS;
360
361         *workerThreads = mono_max_worker_threads;
362         *completionPortThreads = 0;
363 }
364
365 void
366 ves_icall_System_Threading_ThreadPool_GetMinThreads (gint *workerThreads, gint *completionPortThreads)
367 {
368         gint workers;
369
370         MONO_ARCH_SAVE_REGS;
371
372         workers = (gint) InterlockedCompareExchange (&mono_min_worker_threads, 0, -1);
373         *workerThreads = workers;
374         *completionPortThreads = 0;
375 }
376
377 MonoBoolean
378 ves_icall_System_Threading_ThreadPool_SetMinThreads (gint workerThreads, gint completionPortThreads)
379 {
380         MONO_ARCH_SAVE_REGS;
381
382         if (workerThreads < 0 || workerThreads > mono_max_worker_threads)
383                 return FALSE;
384         InterlockedExchange (&mono_min_worker_threads, workerThreads);
385         /* FIXME: should actually start the idle threads if needed */
386         return TRUE;
387 }
388
389 static void
390 overlapped_callback (guint32 error, guint32 numbytes, WapiOverlapped *overlapped)
391 {
392         MonoFSAsyncResult *ares;
393         MonoThread *thread;
394  
395         MONO_ARCH_SAVE_REGS;
396
397         ares = (MonoFSAsyncResult *) overlapped->handle1;
398         ares->completed = TRUE;
399         if (ares->bytes_read != -1)
400                 ares->bytes_read = numbytes;
401         else
402                 ares->count = numbytes;
403
404         thread = mono_thread_attach (mono_object_domain (ares));
405         if (ares->async_callback != NULL) {
406                 gpointer p [1];
407
408                 *p = ares;
409                 mono_runtime_invoke (ares->async_callback->method_info->method, NULL, p, NULL);
410         }
411
412         SetEvent (ares->wait_handle->handle);
413         mono_thread_detach (thread);
414         g_free (overlapped);
415 }
416
417 MonoBoolean
418 ves_icall_System_Threading_ThreadPool_BindHandle (gpointer handle)
419 {
420         MONO_ARCH_SAVE_REGS;
421
422 #ifdef PLATFORM_WIN32
423         return FALSE;
424 #else
425         if (!BindIoCompletionCallback (handle, overlapped_callback, 0)) {
426                 gint error = GetLastError ();
427                 MonoException *exc;
428                 gchar *msg;
429
430                 if (error == ERROR_INVALID_PARAMETER) {
431                         exc = mono_get_exception_argument (NULL, "Invalid parameter.");
432                 } else {
433                         msg = g_strdup_printf ("Win32 error %d.", error);
434                         exc = mono_exception_from_name_msg (mono_defaults.corlib,
435                                                             "System",
436                                                             "ApplicationException", msg);
437                         g_free (msg);
438                 }
439
440                 mono_raise_exception (exc);
441         }
442
443         return TRUE;
444 #endif
445 }
446