2004-06-05 Atsushi Enomoto <atsushi@ximian.com>
[mono.git] / mono / metadata / threadpool.c
1 /*
2  * threadpool.c: global thread pool
3  *
4  * Authors:
5  *   Dietmar Maurer (dietmar@ximian.com)
6  *   Gonzalo Paniagua Javier (gonzalo@ximian.com)
7  *
8  * (C) 2001-2003 Ximian, Inc.
9  * (c) 2004 Novell, Inc. (http://www.novell.com)
10  */
11
12 #include <config.h>
13 #include <glib.h>
14
15 #ifdef PLATFORM_WIN32
16 #define WINVER 0x0500
17 #define _WIN32_WINNT 0x0500
18 #endif
19
20 #include <mono/metadata/appdomain.h>
21 #include <mono/metadata/tabledefs.h>
22 #include <mono/metadata/threads.h>
23 #include <mono/metadata/exception.h>
24 #include <mono/metadata/file-io.h>
25 #include <mono/metadata/monitor.h>
26 #include <mono/metadata/marshal.h>
27 #include <mono/io-layer/io-layer.h>
28 #include <mono/os/gc_wrapper.h>
29
30 #include "threadpool.h"
31
32 /* maximum number of worker threads */
33 int mono_max_worker_threads = 25; /* fixme: should be 25 per available CPU */
34 int mono_min_worker_threads = 0;
35
36 /* current number of worker threads */
37 static int mono_worker_threads = 0;
38
39 /* current number of busy threads */
40 int busy_worker_threads = 0;
41
42 /* we use this to store a reference to the AsyncResult to avoid GC */
43 static MonoGHashTable *ares_htable = NULL;
44
45 /* we append a job */
46 static HANDLE job_added;
47
48 typedef struct {
49         MonoMethodMessage *msg;
50         HANDLE             wait_event;
51         MonoMethod        *cb_method;
52         MonoDelegate      *cb_target;
53         MonoObject        *state;
54         MonoObject        *res;
55         MonoArray         *out_args;
56 } ASyncCall;
57
58 static void async_invoke_thread (gpointer data);
59 static void append_job (MonoAsyncResult *ar);
60
61 static GList *async_call_queue = NULL;
62
63 static void
64 mono_async_invoke (MonoAsyncResult *ares)
65 {
66         ASyncCall *ac = (ASyncCall *)ares->data;
67
68         ac->msg->exc = NULL;
69         ac->res = mono_message_invoke (ares->async_delegate, ac->msg, 
70                                        &ac->msg->exc, &ac->out_args);
71
72         ares->completed = 1;
73
74         /* call async callback if cb_method != null*/
75         if (ac->cb_method) {
76                 MonoObject *exc = NULL;
77                 void *pa = &ares;
78                 mono_runtime_invoke (ac->cb_method, ac->cb_target, pa, &exc);
79                 if (!ac->msg->exc)
80                         ac->msg->exc = exc;
81         }
82
83         /* notify listeners */
84         if(!mono_monitor_try_enter ((MonoObject *) ares, INFINITE))
85                 return;
86         
87         if (ares->handle != NULL) {
88                 ac->wait_event = ((MonoWaitHandle *) ares->handle)->handle;
89                 SetEvent (ac->wait_event);
90         }
91         mono_monitor_exit ((MonoObject *) ares);
92
93         mono_g_hash_table_remove (ares_htable, ares);
94 }
95
96 MonoAsyncResult *
97 mono_thread_pool_add (MonoObject *target, MonoMethodMessage *msg, MonoDelegate *async_callback,
98                       MonoObject *state)
99 {
100         MonoDomain *domain = mono_domain_get ();
101         MonoAsyncResult *ares;
102         ASyncCall *ac;
103         int busy, worker;
104
105 #ifdef HAVE_BOEHM_GC
106         ac = GC_MALLOC (sizeof (ASyncCall));
107 #else
108         /* We'll leak the event if creaated... */
109         ac = g_new0 (ASyncCall, 1);
110 #endif
111         ac->wait_event = NULL;
112         ac->msg = msg;
113         ac->state = state;
114
115         if (async_callback) {
116                 ac->cb_method = mono_get_delegate_invoke (((MonoObject *)async_callback)->vtable->klass);
117                 ac->cb_target = async_callback;
118         }
119
120         ares = mono_async_result_new (domain, NULL, ac->state, ac);
121         ares->async_delegate = target;
122
123         if (!ares_htable) {
124                 ares_htable = mono_g_hash_table_new (NULL, NULL);
125                 job_added = CreateSemaphore (NULL, 0, 0x7fffffff, NULL);
126         }
127
128         mono_g_hash_table_insert (ares_htable, ares, ares);
129
130         busy = (int) InterlockedCompareExchange (&busy_worker_threads, 0, -1);
131         worker = (int) InterlockedCompareExchange (&mono_worker_threads, 0, -1); 
132         if (worker <= ++busy &&
133             worker < mono_max_worker_threads) {
134                 InterlockedIncrement (&mono_worker_threads);
135                 InterlockedIncrement (&busy_worker_threads);
136                 mono_thread_create (domain, async_invoke_thread, ares);
137         } else {
138                 append_job (ares);
139                 ReleaseSemaphore (job_added, 1, NULL);
140         }
141
142         return ares;
143 }
144
145 MonoObject *
146 mono_thread_pool_finish (MonoAsyncResult *ares, MonoArray **out_args, MonoObject **exc)
147 {
148         ASyncCall *ac;
149
150         *exc = NULL;
151         *out_args = NULL;
152
153         /* check if already finished */
154         if (!mono_monitor_try_enter ((MonoObject *) ares, INFINITE)) {
155                 return NULL;
156         }
157         
158         if (ares->endinvoke_called) {
159                 *exc = (MonoObject *)mono_exception_from_name (mono_defaults.corlib, "System", 
160                                               "InvalidOperationException");
161                 mono_monitor_exit ((MonoObject *) ares);
162                 return NULL;
163         }
164
165         ares->endinvoke_called = 1;
166         ac = (ASyncCall *)ares->data;
167
168         g_assert (ac != NULL);
169
170         /* wait until we are really finished */
171         if (!ares->completed) {
172                 if (ares->handle == NULL) {
173                         ac->wait_event = CreateEvent (NULL, TRUE, FALSE, NULL);
174                         ares->handle = (MonoObject *) mono_wait_handle_new (mono_object_domain (ares), ac->wait_event);
175                 }
176                 mono_monitor_exit ((MonoObject *) ares);
177                 WaitForSingleObjectEx (ac->wait_event, INFINITE, TRUE);
178         } else {
179                 mono_monitor_exit ((MonoObject *) ares);
180         }
181
182         *exc = ac->msg->exc;
183         *out_args = ac->out_args;
184
185         return ac->res;
186 }
187
188 void
189 mono_thread_pool_cleanup (void)
190 {
191         gint release;
192
193         EnterCriticalSection (&mono_delegate_section);
194         g_list_free (async_call_queue);
195         async_call_queue = NULL;
196         release = (gint) InterlockedCompareExchange (&busy_worker_threads, 0, -1);
197         LeaveCriticalSection (&mono_delegate_section);
198         if (job_added)
199                 ReleaseSemaphore (job_added, release, NULL);
200 }
201
202 static void
203 append_job (MonoAsyncResult *ar)
204 {
205         EnterCriticalSection (&mono_delegate_section);
206         async_call_queue = g_list_append (async_call_queue, ar); 
207         LeaveCriticalSection (&mono_delegate_section);
208 }
209
210 static MonoAsyncResult *
211 dequeue_job (void)
212 {
213         MonoAsyncResult *ar = NULL;
214         GList *tmp = NULL;
215
216         EnterCriticalSection (&mono_delegate_section);
217         if (async_call_queue) {
218                 ar = (MonoAsyncResult *)async_call_queue->data;
219                 tmp = async_call_queue;
220                 async_call_queue = g_list_remove_link (tmp, tmp); 
221         }
222         LeaveCriticalSection (&mono_delegate_section);
223         if (tmp)
224                 g_list_free_1 (tmp);
225
226         return ar;
227 }
228
229 static void
230 async_invoke_thread (gpointer data)
231 {
232         MonoDomain *domain;
233         MonoThread *thread;
234         int workers, min;
235  
236         thread = mono_thread_current ();
237         thread->threadpool_thread = TRUE;
238         thread->state |= ThreadState_Background;
239         for (;;) {
240                 MonoAsyncResult *ar;
241
242                 ar = (MonoAsyncResult *) data;
243                 if (ar) {
244                         /* worker threads invokes methods in different domains,
245                          * so we need to set the right domain here */
246                         domain = ((MonoObject *)ar)->vtable->domain;
247                         if (mono_domain_set (domain, FALSE))
248                                 mono_async_invoke (ar);
249                         InterlockedDecrement (&busy_worker_threads);
250                 }
251
252                 data = dequeue_job ();
253         
254                 if (!data) {
255                         guint32 wr;
256                         int timeout = 500;
257                         guint32 start_time = GetTickCount ();
258                         
259                         do {
260                                 wr = WaitForSingleObjectEx (job_added, (guint32)timeout, TRUE);
261                                 mono_thread_interruption_checkpoint ();
262                         
263                                 timeout -= GetTickCount () - start_time;
264                         
265                                 if (wr != WAIT_TIMEOUT)
266                                         data = dequeue_job ();
267                         }
268                         while (!data && timeout > 0);
269                 }
270                 
271                 if (!data) {
272                         workers = (int) InterlockedCompareExchange (&mono_worker_threads, 0, -1); 
273                         min = (int) InterlockedCompareExchange (&mono_min_worker_threads, 0, -1); 
274         
275                         while (!data && workers <= min) {
276                                 WaitForSingleObjectEx (job_added, INFINITE, TRUE);
277                                 mono_thread_interruption_checkpoint ();
278                         
279                                 data = dequeue_job ();
280                                 workers = (int) InterlockedCompareExchange (&mono_worker_threads, 0, -1); 
281                                 min = (int) InterlockedCompareExchange (&mono_min_worker_threads, 0, -1); 
282                         }
283                 }
284         
285                 if (!data) {
286                         InterlockedDecrement (&mono_worker_threads);
287                         return;
288                 }
289                 
290                 InterlockedIncrement (&busy_worker_threads);
291         }
292
293         g_assert_not_reached ();
294 }
295
296 void
297 ves_icall_System_Threading_ThreadPool_GetAvailableThreads (gint *workerThreads, gint *completionPortThreads)
298 {
299         gint busy;
300
301         MONO_ARCH_SAVE_REGS;
302
303         busy = (gint) InterlockedCompareExchange (&busy_worker_threads, 0, -1);
304         *workerThreads = mono_max_worker_threads - busy;
305         *completionPortThreads = 0;
306 }
307
308 void
309 ves_icall_System_Threading_ThreadPool_GetMaxThreads (gint *workerThreads, gint *completionPortThreads)
310 {
311         MONO_ARCH_SAVE_REGS;
312
313         *workerThreads = mono_max_worker_threads;
314         *completionPortThreads = 0;
315 }
316
317 void
318 ves_icall_System_Threading_ThreadPool_GetMinThreads (gint *workerThreads, gint *completionPortThreads)
319 {
320         gint workers;
321
322         MONO_ARCH_SAVE_REGS;
323
324         workers = (gint) InterlockedCompareExchange (&mono_min_worker_threads, 0, -1);
325         *workerThreads = workers;
326         *completionPortThreads = 0;
327 }
328
329 void
330 ves_icall_System_Threading_ThreadPool_SetMinThreads (gint workerThreads, gint completionPortThreads)
331 {
332         MONO_ARCH_SAVE_REGS;
333
334         InterlockedExchange (&mono_min_worker_threads, workerThreads);
335 }
336
337 static void
338 overlapped_callback (guint32 error, guint32 numbytes, WapiOverlapped *overlapped)
339 {
340         MonoFSAsyncResult *ares;
341         MonoThread *thread;
342  
343         MONO_ARCH_SAVE_REGS;
344
345         ares = (MonoFSAsyncResult *) overlapped->handle1;
346         ares->completed = TRUE;
347         if (ares->bytes_read != -1)
348                 ares->bytes_read = numbytes;
349         else
350                 ares->count = numbytes;
351
352         thread = mono_thread_attach (mono_object_domain (ares));
353         if (ares->async_callback != NULL) {
354                 gpointer p [1];
355
356                 *p = ares;
357                 mono_runtime_invoke (ares->async_callback->method_info->method, NULL, p, NULL);
358         }
359
360         SetEvent (ares->wait_handle->handle);
361         mono_thread_detach (thread);
362         g_free (overlapped);
363 }
364
365 MonoBoolean
366 ves_icall_System_Threading_ThreadPool_BindHandle (gpointer handle)
367 {
368         MONO_ARCH_SAVE_REGS;
369
370 #ifdef PLATFORM_WIN32
371         return FALSE;
372 #else
373         if (!BindIoCompletionCallback (handle, overlapped_callback, 0)) {
374                 gint error = GetLastError ();
375                 MonoException *exc;
376                 gchar *msg;
377
378                 if (error == ERROR_INVALID_PARAMETER) {
379                         exc = mono_get_exception_argument (NULL, "Invalid parameter.");
380                 } else {
381                         msg = g_strdup_printf ("Win32 error %d.", error);
382                         exc = mono_exception_from_name_msg (mono_defaults.corlib,
383                                                             "System",
384                                                             "ApplicationException", msg);
385                         g_free (msg);
386                 }
387
388                 mono_raise_exception (exc);
389         }
390
391         return TRUE;
392 #endif
393 }
394