This commit was manufactured by cvs2svn to create branch 'mono-1-0'.
[mono.git] / mono / metadata / threadpool.c
1 /*
2  * threadpool.c: global thread pool
3  *
4  * Authors:
5  *   Dietmar Maurer (dietmar@ximian.com)
6  *   Gonzalo Paniagua Javier (gonzalo@ximian.com)
7  *
8  * (C) 2001-2003 Ximian, Inc.
9  * (c) 2004 Novell, Inc. (http://www.novell.com)
10  */
11
12 #include <config.h>
13 #include <glib.h>
14
15 #ifdef PLATFORM_WIN32
16 #define WINVER 0x0500
17 #define _WIN32_WINNT 0x0500
18 #endif
19
20 #include <mono/metadata/domain-internals.h>
21 #include <mono/metadata/tabledefs.h>
22 #include <mono/metadata/threads.h>
23 #include <mono/metadata/threads-types.h>
24 #include <mono/metadata/exception.h>
25 #include <mono/metadata/file-io.h>
26 #include <mono/metadata/monitor.h>
27 #include <mono/metadata/marshal.h>
28 #include <mono/io-layer/io-layer.h>
29 #include <mono/os/gc_wrapper.h>
30
31 #include "threadpool.h"
32
33 /* maximum number of worker threads */
34 int mono_max_worker_threads = 25; /* per available CPU? */
35 static int mono_min_worker_threads = 0;
36
37 /* current number of worker threads */
38 static int mono_worker_threads = 0;
39
40 /* current number of busy threads */
41 static int busy_worker_threads = 0;
42
43 /* we use this to store a reference to the AsyncResult to avoid GC */
44 static MonoGHashTable *ares_htable = NULL;
45
46 static CRITICAL_SECTION ares_lock;
47
48 /* we append a job */
49 static HANDLE job_added;
50
51 typedef struct {
52         MonoMethodMessage *msg;
53         HANDLE             wait_event;
54         MonoMethod        *cb_method;
55         MonoDelegate      *cb_target;
56         MonoObject        *state;
57         MonoObject        *res;
58         MonoArray         *out_args;
59 } ASyncCall;
60
61 static void async_invoke_thread (gpointer data);
62 static void append_job (MonoAsyncResult *ar);
63
64 static GList *async_call_queue = NULL;
65
66 static void
67 mono_async_invoke (MonoAsyncResult *ares)
68 {
69         ASyncCall *ac = (ASyncCall *)ares->data;
70
71         ac->msg->exc = NULL;
72         ac->res = mono_message_invoke (ares->async_delegate, ac->msg, 
73                                        &ac->msg->exc, &ac->out_args);
74
75         ares->completed = 1;
76
77         /* call async callback if cb_method != null*/
78         if (ac->cb_method) {
79                 MonoObject *exc = NULL;
80                 void *pa = &ares;
81                 mono_runtime_invoke (ac->cb_method, ac->cb_target, pa, &exc);
82                 if (!ac->msg->exc)
83                         ac->msg->exc = exc;
84         }
85
86         /* notify listeners */
87         if(!mono_monitor_enter ((MonoObject *) ares))
88                 return;
89         
90         if (ares->handle != NULL) {
91                 ac->wait_event = ((MonoWaitHandle *) ares->handle)->handle;
92                 SetEvent (ac->wait_event);
93         }
94         mono_monitor_exit ((MonoObject *) ares);
95
96         EnterCriticalSection (&ares_lock);
97         mono_g_hash_table_remove (ares_htable, ares);
98         LeaveCriticalSection (&ares_lock);
99 }
100
101
102 MonoAsyncResult *
103 mono_thread_pool_add (MonoObject *target, MonoMethodMessage *msg, MonoDelegate *async_callback,
104                       MonoObject *state)
105 {
106         MonoDomain *domain = mono_domain_get ();
107         MonoAsyncResult *ares;
108         ASyncCall *ac;
109         int busy, worker;
110
111 #ifdef HAVE_BOEHM_GC
112         ac = GC_MALLOC (sizeof (ASyncCall));
113 #else
114         /* We'll leak the event if creaated... */
115         ac = g_new0 (ASyncCall, 1);
116 #endif
117         ac->wait_event = NULL;
118         ac->msg = msg;
119         ac->state = state;
120
121         if (async_callback) {
122                 ac->cb_method = mono_get_delegate_invoke (((MonoObject *)async_callback)->vtable->klass);
123                 ac->cb_target = async_callback;
124         }
125
126         ares = mono_async_result_new (domain, NULL, ac->state, ac);
127         ares->async_delegate = target;
128
129         if (!ares_htable) {
130                 InitializeCriticalSection (&ares_lock);
131                 ares_htable = mono_g_hash_table_new (NULL, NULL);
132                 job_added = CreateSemaphore (NULL, 0, 0x7fffffff, NULL);
133         }
134
135         EnterCriticalSection (&ares_lock);
136         mono_g_hash_table_insert (ares_htable, ares, ares);
137         LeaveCriticalSection (&ares_lock);
138
139         busy = (int) InterlockedCompareExchange (&busy_worker_threads, 0, -1);
140         worker = (int) InterlockedCompareExchange (&mono_worker_threads, 0, -1); 
141         if (worker <= ++busy &&
142             worker < mono_max_worker_threads) {
143                 InterlockedIncrement (&mono_worker_threads);
144                 InterlockedIncrement (&busy_worker_threads);
145                 mono_thread_create (domain, async_invoke_thread, ares);
146         } else {
147                 append_job (ares);
148                 ReleaseSemaphore (job_added, 1, NULL);
149         }
150
151         return ares;
152 }
153
154 MonoObject *
155 mono_thread_pool_finish (MonoAsyncResult *ares, MonoArray **out_args, MonoObject **exc)
156 {
157         ASyncCall *ac;
158
159         *exc = NULL;
160         *out_args = NULL;
161
162         /* check if already finished */
163         if (!mono_monitor_enter ((MonoObject *) ares)) {
164                 return NULL;
165         }
166         
167         if (ares->endinvoke_called) {
168                 *exc = (MonoObject *)mono_exception_from_name (mono_defaults.corlib, "System", 
169                                               "InvalidOperationException");
170                 mono_monitor_exit ((MonoObject *) ares);
171                 return NULL;
172         }
173
174         ares->endinvoke_called = 1;
175         ac = (ASyncCall *)ares->data;
176
177         g_assert (ac != NULL);
178
179         /* wait until we are really finished */
180         if (!ares->completed) {
181                 if (ares->handle == NULL) {
182                         ac->wait_event = CreateEvent (NULL, TRUE, FALSE, NULL);
183                         ares->handle = (MonoObject *) mono_wait_handle_new (mono_object_domain (ares), ac->wait_event);
184                 }
185                 mono_monitor_exit ((MonoObject *) ares);
186                 WaitForSingleObjectEx (ac->wait_event, INFINITE, TRUE);
187         } else {
188                 mono_monitor_exit ((MonoObject *) ares);
189         }
190
191         *exc = ac->msg->exc;
192         *out_args = ac->out_args;
193
194         return ac->res;
195 }
196
197 void
198 mono_thread_pool_cleanup (void)
199 {
200         gint release;
201
202         EnterCriticalSection (&mono_delegate_section);
203         g_list_free (async_call_queue);
204         async_call_queue = NULL;
205         release = (gint) InterlockedCompareExchange (&busy_worker_threads, 0, -1);
206         LeaveCriticalSection (&mono_delegate_section);
207         if (job_added)
208                 ReleaseSemaphore (job_added, release, NULL);
209 }
210
211 static void
212 append_job (MonoAsyncResult *ar)
213 {
214         GList *tmp;
215
216         EnterCriticalSection (&mono_delegate_section);
217         if (async_call_queue == NULL) {
218                 async_call_queue = g_list_append (async_call_queue, ar); 
219         } else {
220                 for (tmp = async_call_queue; tmp && tmp->data != NULL; tmp = tmp->next);
221                 if (tmp == NULL) {
222                         async_call_queue = g_list_append (async_call_queue, ar); 
223                 } else {
224                         tmp->data = ar;
225                 }
226         }
227         LeaveCriticalSection (&mono_delegate_section);
228 }
229
230 static MonoAsyncResult *
231 dequeue_job (void)
232 {
233         MonoAsyncResult *ar = NULL;
234         GList *tmp, *tmp2;
235
236         EnterCriticalSection (&mono_delegate_section);
237         tmp = async_call_queue;
238         if (tmp) {
239                 ar = (MonoAsyncResult *) tmp->data;
240                 tmp->data = NULL;
241                 tmp2 = tmp;
242                 for (tmp2 = tmp; tmp2->next != NULL; tmp2 = tmp2->next);
243                 if (tmp2 != tmp) {
244                         async_call_queue = tmp->next;
245                         tmp->next = NULL;
246                         tmp2->next = tmp;
247                         tmp->prev = tmp2;
248                 }
249         }
250         LeaveCriticalSection (&mono_delegate_section);
251
252         return ar;
253 }
254
255 static void
256 async_invoke_thread (gpointer data)
257 {
258         MonoDomain *domain;
259         MonoThread *thread;
260         int workers, min;
261  
262         thread = mono_thread_current ();
263         thread->threadpool_thread = TRUE;
264         thread->state |= ThreadState_Background;
265         for (;;) {
266                 MonoAsyncResult *ar;
267
268                 ar = (MonoAsyncResult *) data;
269                 if (ar) {
270                         /* worker threads invokes methods in different domains,
271                          * so we need to set the right domain here */
272                         domain = ((MonoObject *)ar)->vtable->domain;
273                         if (mono_domain_set (domain, FALSE))
274                                 mono_async_invoke (ar);
275                         InterlockedDecrement (&busy_worker_threads);
276                 }
277
278                 data = dequeue_job ();
279         
280                 if (!data) {
281                         guint32 wr;
282                         int timeout = 500;
283                         guint32 start_time = GetTickCount ();
284                         
285                         do {
286                                 wr = WaitForSingleObjectEx (job_added, (guint32)timeout, TRUE);
287                                 mono_thread_interruption_checkpoint ();
288                         
289                                 timeout -= GetTickCount () - start_time;
290                         
291                                 if (wr != WAIT_TIMEOUT)
292                                         data = dequeue_job ();
293                         }
294                         while (!data && timeout > 0);
295                 }
296
297                 if (!data) {
298                         workers = (int) InterlockedCompareExchange (&mono_worker_threads, 0, -1); 
299                         min = (int) InterlockedCompareExchange (&mono_min_worker_threads, 0, -1); 
300         
301                         while (!data && workers <= min) {
302                                 WaitForSingleObjectEx (job_added, INFINITE, TRUE);
303                                 mono_thread_interruption_checkpoint ();
304                         
305                                 data = dequeue_job ();
306                                 workers = (int) InterlockedCompareExchange (&mono_worker_threads, 0, -1); 
307                                 min = (int) InterlockedCompareExchange (&mono_min_worker_threads, 0, -1); 
308                         }
309                 }
310         
311                 if (!data) {
312                         InterlockedDecrement (&mono_worker_threads);
313                         return;
314                 }
315                 
316                 InterlockedIncrement (&busy_worker_threads);
317         }
318
319         g_assert_not_reached ();
320 }
321
322 void
323 ves_icall_System_Threading_ThreadPool_GetAvailableThreads (gint *workerThreads, gint *completionPortThreads)
324 {
325         gint busy;
326
327         MONO_ARCH_SAVE_REGS;
328
329         busy = (gint) InterlockedCompareExchange (&busy_worker_threads, 0, -1);
330         *workerThreads = mono_max_worker_threads - busy;
331         *completionPortThreads = 0;
332 }
333
334 void
335 ves_icall_System_Threading_ThreadPool_GetMaxThreads (gint *workerThreads, gint *completionPortThreads)
336 {
337         MONO_ARCH_SAVE_REGS;
338
339         *workerThreads = mono_max_worker_threads;
340         *completionPortThreads = 0;
341 }
342
343 void
344 ves_icall_System_Threading_ThreadPool_GetMinThreads (gint *workerThreads, gint *completionPortThreads)
345 {
346         gint workers;
347
348         MONO_ARCH_SAVE_REGS;
349
350         workers = (gint) InterlockedCompareExchange (&mono_min_worker_threads, 0, -1);
351         *workerThreads = workers;
352         *completionPortThreads = 0;
353 }
354
355 MonoBoolean
356 ves_icall_System_Threading_ThreadPool_SetMinThreads (gint workerThreads, gint completionPortThreads)
357 {
358         MONO_ARCH_SAVE_REGS;
359
360         if (workerThreads < 0 || workerThreads > mono_max_worker_threads)
361                 return FALSE;
362         InterlockedExchange (&mono_min_worker_threads, workerThreads);
363         /* FIXME: should actually start the idle threads if needed */
364         return TRUE;
365 }
366
367 static void
368 overlapped_callback (guint32 error, guint32 numbytes, WapiOverlapped *overlapped)
369 {
370         MonoFSAsyncResult *ares;
371         MonoThread *thread;
372  
373         MONO_ARCH_SAVE_REGS;
374
375         ares = (MonoFSAsyncResult *) overlapped->handle1;
376         ares->completed = TRUE;
377         if (ares->bytes_read != -1)
378                 ares->bytes_read = numbytes;
379         else
380                 ares->count = numbytes;
381
382         thread = mono_thread_attach (mono_object_domain (ares));
383         if (ares->async_callback != NULL) {
384                 gpointer p [1];
385
386                 *p = ares;
387                 mono_runtime_invoke (ares->async_callback->method_info->method, NULL, p, NULL);
388         }
389
390         SetEvent (ares->wait_handle->handle);
391         mono_thread_detach (thread);
392         g_free (overlapped);
393 }
394
395 MonoBoolean
396 ves_icall_System_Threading_ThreadPool_BindHandle (gpointer handle)
397 {
398         MONO_ARCH_SAVE_REGS;
399
400 #ifdef PLATFORM_WIN32
401         return FALSE;
402 #else
403         if (!BindIoCompletionCallback (handle, overlapped_callback, 0)) {
404                 gint error = GetLastError ();
405                 MonoException *exc;
406                 gchar *msg;
407
408                 if (error == ERROR_INVALID_PARAMETER) {
409                         exc = mono_get_exception_argument (NULL, "Invalid parameter.");
410                 } else {
411                         msg = g_strdup_printf ("Win32 error %d.", error);
412                         exc = mono_exception_from_name_msg (mono_defaults.corlib,
413                                                             "System",
414                                                             "ApplicationException", msg);
415                         g_free (msg);
416                 }
417
418                 mono_raise_exception (exc);
419         }
420
421         return TRUE;
422 #endif
423 }
424