2004-08-04 Bernie Solomon <bernard@ugsolutions.com>
[mono.git] / mono / metadata / threadpool.c
1 /*
2  * threadpool.c: global thread pool
3  *
4  * Authors:
5  *   Dietmar Maurer (dietmar@ximian.com)
6  *   Gonzalo Paniagua Javier (gonzalo@ximian.com)
7  *
8  * (C) 2001-2003 Ximian, Inc.
9  * (c) 2004 Novell, Inc. (http://www.novell.com)
10  */
11
12 #include <config.h>
13 #include <glib.h>
14
15 #ifdef PLATFORM_WIN32
16 #define WINVER 0x0500
17 #define _WIN32_WINNT 0x0500
18 #endif
19
20 #include <mono/metadata/domain-internals.h>
21 #include <mono/metadata/tabledefs.h>
22 #include <mono/metadata/threads.h>
23 #include <mono/metadata/threads-types.h>
24 #include <mono/metadata/exception.h>
25 #include <mono/metadata/file-io.h>
26 #include <mono/metadata/monitor.h>
27 #include <mono/metadata/marshal.h>
28 #include <mono/io-layer/io-layer.h>
29 #include <mono/os/gc_wrapper.h>
30
31 #include "threadpool.h"
32
33 /* maximum number of worker threads */
34 int mono_max_worker_threads = 25; /* fixme: should be 25 per available CPU */
35 static int mono_min_worker_threads = 0;
36
37 /* current number of worker threads */
38 static int mono_worker_threads = 0;
39
40 /* current number of busy threads */
41 static int busy_worker_threads = 0;
42
43 /* we use this to store a reference to the AsyncResult to avoid GC */
44 static MonoGHashTable *ares_htable = NULL;
45
46 /* we append a job */
47 static HANDLE job_added;
48
49 typedef struct {
50         MonoMethodMessage *msg;
51         HANDLE             wait_event;
52         MonoMethod        *cb_method;
53         MonoDelegate      *cb_target;
54         MonoObject        *state;
55         MonoObject        *res;
56         MonoArray         *out_args;
57 } ASyncCall;
58
59 static void async_invoke_thread (gpointer data);
60 static void append_job (MonoAsyncResult *ar);
61
62 static GList *async_call_queue = NULL;
63
64 static void
65 mono_async_invoke (MonoAsyncResult *ares)
66 {
67         ASyncCall *ac = (ASyncCall *)ares->data;
68
69         ac->msg->exc = NULL;
70         ac->res = mono_message_invoke (ares->async_delegate, ac->msg, 
71                                        &ac->msg->exc, &ac->out_args);
72
73         ares->completed = 1;
74
75         /* call async callback if cb_method != null*/
76         if (ac->cb_method) {
77                 MonoObject *exc = NULL;
78                 void *pa = &ares;
79                 mono_runtime_invoke (ac->cb_method, ac->cb_target, pa, &exc);
80                 if (!ac->msg->exc)
81                         ac->msg->exc = exc;
82         }
83
84         /* notify listeners */
85         if(!mono_monitor_enter ((MonoObject *) ares))
86                 return;
87         
88         if (ares->handle != NULL) {
89                 ac->wait_event = ((MonoWaitHandle *) ares->handle)->handle;
90                 SetEvent (ac->wait_event);
91         }
92         mono_monitor_exit ((MonoObject *) ares);
93
94         mono_g_hash_table_remove (ares_htable, ares);
95 }
96
97 MonoAsyncResult *
98 mono_thread_pool_add (MonoObject *target, MonoMethodMessage *msg, MonoDelegate *async_callback,
99                       MonoObject *state)
100 {
101         MonoDomain *domain = mono_domain_get ();
102         MonoAsyncResult *ares;
103         ASyncCall *ac;
104         int busy, worker;
105
106 #ifdef HAVE_BOEHM_GC
107         ac = GC_MALLOC (sizeof (ASyncCall));
108 #else
109         /* We'll leak the event if creaated... */
110         ac = g_new0 (ASyncCall, 1);
111 #endif
112         ac->wait_event = NULL;
113         ac->msg = msg;
114         ac->state = state;
115
116         if (async_callback) {
117                 ac->cb_method = mono_get_delegate_invoke (((MonoObject *)async_callback)->vtable->klass);
118                 ac->cb_target = async_callback;
119         }
120
121         ares = mono_async_result_new (domain, NULL, ac->state, ac);
122         ares->async_delegate = target;
123
124         if (!ares_htable) {
125                 MONO_GC_REGISTER_ROOT (ares_htable);
126                 ares_htable = mono_g_hash_table_new (NULL, NULL);
127                 job_added = CreateSemaphore (NULL, 0, 0x7fffffff, NULL);
128         }
129
130         mono_g_hash_table_insert (ares_htable, ares, ares);
131
132         busy = (int) InterlockedCompareExchange (&busy_worker_threads, 0, -1);
133         worker = (int) InterlockedCompareExchange (&mono_worker_threads, 0, -1); 
134         if (worker <= ++busy &&
135             worker < mono_max_worker_threads) {
136                 InterlockedIncrement (&mono_worker_threads);
137                 InterlockedIncrement (&busy_worker_threads);
138                 mono_thread_create (domain, async_invoke_thread, ares);
139         } else {
140                 append_job (ares);
141                 ReleaseSemaphore (job_added, 1, NULL);
142         }
143
144         return ares;
145 }
146
147 MonoObject *
148 mono_thread_pool_finish (MonoAsyncResult *ares, MonoArray **out_args, MonoObject **exc)
149 {
150         ASyncCall *ac;
151
152         *exc = NULL;
153         *out_args = NULL;
154
155         /* check if already finished */
156         if (!mono_monitor_enter ((MonoObject *) ares)) {
157                 return NULL;
158         }
159         
160         if (ares->endinvoke_called) {
161                 *exc = (MonoObject *)mono_exception_from_name (mono_defaults.corlib, "System", 
162                                               "InvalidOperationException");
163                 mono_monitor_exit ((MonoObject *) ares);
164                 return NULL;
165         }
166
167         ares->endinvoke_called = 1;
168         ac = (ASyncCall *)ares->data;
169
170         g_assert (ac != NULL);
171
172         /* wait until we are really finished */
173         if (!ares->completed) {
174                 if (ares->handle == NULL) {
175                         ac->wait_event = CreateEvent (NULL, TRUE, FALSE, NULL);
176                         ares->handle = (MonoObject *) mono_wait_handle_new (mono_object_domain (ares), ac->wait_event);
177                 }
178                 mono_monitor_exit ((MonoObject *) ares);
179                 WaitForSingleObjectEx (ac->wait_event, INFINITE, TRUE);
180         } else {
181                 mono_monitor_exit ((MonoObject *) ares);
182         }
183
184         *exc = ac->msg->exc;
185         *out_args = ac->out_args;
186
187         return ac->res;
188 }
189
190 void
191 mono_thread_pool_cleanup (void)
192 {
193         gint release;
194
195         EnterCriticalSection (&mono_delegate_section);
196         g_list_free (async_call_queue);
197         async_call_queue = NULL;
198         release = (gint) InterlockedCompareExchange (&busy_worker_threads, 0, -1);
199         LeaveCriticalSection (&mono_delegate_section);
200         if (job_added)
201                 ReleaseSemaphore (job_added, release, NULL);
202 }
203
204 static void
205 append_job (MonoAsyncResult *ar)
206 {
207         EnterCriticalSection (&mono_delegate_section);
208         async_call_queue = g_list_append (async_call_queue, ar); 
209         LeaveCriticalSection (&mono_delegate_section);
210 }
211
212 static MonoAsyncResult *
213 dequeue_job (void)
214 {
215         MonoAsyncResult *ar = NULL;
216         GList *tmp = NULL;
217
218         EnterCriticalSection (&mono_delegate_section);
219         if (async_call_queue) {
220                 ar = (MonoAsyncResult *)async_call_queue->data;
221                 tmp = async_call_queue;
222                 async_call_queue = g_list_remove_link (tmp, tmp); 
223         }
224         LeaveCriticalSection (&mono_delegate_section);
225         if (tmp)
226                 g_list_free_1 (tmp);
227
228         return ar;
229 }
230
231 static void
232 async_invoke_thread (gpointer data)
233 {
234         MonoDomain *domain;
235         MonoThread *thread;
236         int workers, min;
237  
238         thread = mono_thread_current ();
239         thread->threadpool_thread = TRUE;
240         thread->state |= ThreadState_Background;
241         for (;;) {
242                 MonoAsyncResult *ar;
243
244                 ar = (MonoAsyncResult *) data;
245                 if (ar) {
246                         /* worker threads invokes methods in different domains,
247                          * so we need to set the right domain here */
248                         domain = ((MonoObject *)ar)->vtable->domain;
249                         if (mono_domain_set (domain, FALSE))
250                                 mono_async_invoke (ar);
251                         InterlockedDecrement (&busy_worker_threads);
252                 }
253
254                 data = dequeue_job ();
255         
256                 if (!data) {
257                         guint32 wr;
258                         int timeout = 500;
259                         guint32 start_time = GetTickCount ();
260                         
261                         do {
262                                 wr = WaitForSingleObjectEx (job_added, (guint32)timeout, TRUE);
263                                 mono_thread_interruption_checkpoint ();
264                         
265                                 timeout -= GetTickCount () - start_time;
266                         
267                                 if (wr != WAIT_TIMEOUT)
268                                         data = dequeue_job ();
269                         }
270                         while (!data && timeout > 0);
271                 }
272                 
273                 if (!data) {
274                         workers = (int) InterlockedCompareExchange (&mono_worker_threads, 0, -1); 
275                         min = (int) InterlockedCompareExchange (&mono_min_worker_threads, 0, -1); 
276         
277                         while (!data && workers <= min) {
278                                 WaitForSingleObjectEx (job_added, INFINITE, TRUE);
279                                 mono_thread_interruption_checkpoint ();
280                         
281                                 data = dequeue_job ();
282                                 workers = (int) InterlockedCompareExchange (&mono_worker_threads, 0, -1); 
283                                 min = (int) InterlockedCompareExchange (&mono_min_worker_threads, 0, -1); 
284                         }
285                 }
286         
287                 if (!data) {
288                         InterlockedDecrement (&mono_worker_threads);
289                         return;
290                 }
291                 
292                 InterlockedIncrement (&busy_worker_threads);
293         }
294
295         g_assert_not_reached ();
296 }
297
298 void
299 ves_icall_System_Threading_ThreadPool_GetAvailableThreads (gint *workerThreads, gint *completionPortThreads)
300 {
301         gint busy;
302
303         MONO_ARCH_SAVE_REGS;
304
305         busy = (gint) InterlockedCompareExchange (&busy_worker_threads, 0, -1);
306         *workerThreads = mono_max_worker_threads - busy;
307         *completionPortThreads = 0;
308 }
309
310 void
311 ves_icall_System_Threading_ThreadPool_GetMaxThreads (gint *workerThreads, gint *completionPortThreads)
312 {
313         MONO_ARCH_SAVE_REGS;
314
315         *workerThreads = mono_max_worker_threads;
316         *completionPortThreads = 0;
317 }
318
319 void
320 ves_icall_System_Threading_ThreadPool_GetMinThreads (gint *workerThreads, gint *completionPortThreads)
321 {
322         gint workers;
323
324         MONO_ARCH_SAVE_REGS;
325
326         workers = (gint) InterlockedCompareExchange (&mono_min_worker_threads, 0, -1);
327         *workerThreads = workers;
328         *completionPortThreads = 0;
329 }
330
331 MonoBoolean
332 ves_icall_System_Threading_ThreadPool_SetMinThreads (gint workerThreads, gint completionPortThreads)
333 {
334         MONO_ARCH_SAVE_REGS;
335
336         if (workerThreads < 0 || workerThreads > mono_max_worker_threads)
337                 return FALSE;
338         InterlockedExchange (&mono_min_worker_threads, workerThreads);
339         /* FIXME: should actually start the idle threads if needed */
340         return TRUE;
341 }
342
343 static void
344 overlapped_callback (guint32 error, guint32 numbytes, WapiOverlapped *overlapped)
345 {
346         MonoFSAsyncResult *ares;
347         MonoThread *thread;
348  
349         MONO_ARCH_SAVE_REGS;
350
351         ares = (MonoFSAsyncResult *) overlapped->handle1;
352         ares->completed = TRUE;
353         if (ares->bytes_read != -1)
354                 ares->bytes_read = numbytes;
355         else
356                 ares->count = numbytes;
357
358         thread = mono_thread_attach (mono_object_domain (ares));
359         if (ares->async_callback != NULL) {
360                 gpointer p [1];
361
362                 *p = ares;
363                 mono_runtime_invoke (ares->async_callback->method_info->method, NULL, p, NULL);
364         }
365
366         SetEvent (ares->wait_handle->handle);
367         mono_thread_detach (thread);
368         g_free (overlapped);
369 }
370
371 MonoBoolean
372 ves_icall_System_Threading_ThreadPool_BindHandle (gpointer handle)
373 {
374         MONO_ARCH_SAVE_REGS;
375
376 #ifdef PLATFORM_WIN32
377         return FALSE;
378 #else
379         if (!BindIoCompletionCallback (handle, overlapped_callback, 0)) {
380                 gint error = GetLastError ();
381                 MonoException *exc;
382                 gchar *msg;
383
384                 if (error == ERROR_INVALID_PARAMETER) {
385                         exc = mono_get_exception_argument (NULL, "Invalid parameter.");
386                 } else {
387                         msg = g_strdup_printf ("Win32 error %d.", error);
388                         exc = mono_exception_from_name_msg (mono_defaults.corlib,
389                                                             "System",
390                                                             "ApplicationException", msg);
391                         g_free (msg);
392                 }
393
394                 mono_raise_exception (exc);
395         }
396
397         return TRUE;
398 #endif
399 }
400