Some docs.
[mono.git] / mono / metadata / threadpool.c
1 /*
2  * threadpool.c: global thread pool
3  *
4  * Authors:
5  *   Dietmar Maurer (dietmar@ximian.com)
6  *   Gonzalo Paniagua Javier (gonzalo@ximian.com)
7  *
8  * (C) 2001-2003 Ximian, Inc.
9  * (c) 2004 Novell, Inc. (http://www.novell.com)
10  */
11
12 #include <config.h>
13 #include <glib.h>
14
15 #ifdef PLATFORM_WIN32
16 #define WINVER 0x0500
17 #define _WIN32_WINNT 0x0500
18 #endif
19
20 #include <mono/metadata/domain-internals.h>
21 #include <mono/metadata/tabledefs.h>
22 #include <mono/metadata/threads.h>
23 #include <mono/metadata/threads-types.h>
24 #include <mono/metadata/exception.h>
25 #include <mono/metadata/file-io.h>
26 #include <mono/metadata/monitor.h>
27 #include <mono/metadata/marshal.h>
28 #include <mono/io-layer/io-layer.h>
29 #include <mono/os/gc_wrapper.h>
30
31 #include "threadpool.h"
32
33 /* maximum number of worker threads */
34 int mono_max_worker_threads = 25; /* fixme: should be 25 per available CPU */
35 static int mono_min_worker_threads = 0;
36
37 /* current number of worker threads */
38 static int mono_worker_threads = 0;
39
40 /* current number of busy threads */
41 static int busy_worker_threads = 0;
42
43 /* we use this to store a reference to the AsyncResult to avoid GC */
44 static MonoGHashTable *ares_htable = NULL;
45
46 /* we append a job */
47 static HANDLE job_added;
48
49 typedef struct {
50         MonoMethodMessage *msg;
51         HANDLE             wait_event;
52         MonoMethod        *cb_method;
53         MonoDelegate      *cb_target;
54         MonoObject        *state;
55         MonoObject        *res;
56         MonoArray         *out_args;
57 } ASyncCall;
58
59 static void async_invoke_thread (gpointer data);
60 static void append_job (MonoAsyncResult *ar);
61
62 static GList *async_call_queue = NULL;
63
64 static void
65 mono_async_invoke (MonoAsyncResult *ares)
66 {
67         ASyncCall *ac = (ASyncCall *)ares->data;
68
69         ac->msg->exc = NULL;
70         ac->res = mono_message_invoke (ares->async_delegate, ac->msg, 
71                                        &ac->msg->exc, &ac->out_args);
72
73         ares->completed = 1;
74
75         /* call async callback if cb_method != null*/
76         if (ac->cb_method) {
77                 MonoObject *exc = NULL;
78                 void *pa = &ares;
79                 mono_runtime_invoke (ac->cb_method, ac->cb_target, pa, &exc);
80                 if (!ac->msg->exc)
81                         ac->msg->exc = exc;
82         }
83
84         /* notify listeners */
85         mono_monitor_enter ((MonoObject *) ares);
86         
87         if (ares->handle != NULL) {
88                 ac->wait_event = ((MonoWaitHandle *) ares->handle)->handle;
89                 SetEvent (ac->wait_event);
90         }
91         mono_monitor_exit ((MonoObject *) ares);
92
93         mono_g_hash_table_remove (ares_htable, ares);
94 }
95
96 MonoAsyncResult *
97 mono_thread_pool_add (MonoObject *target, MonoMethodMessage *msg, MonoDelegate *async_callback,
98                       MonoObject *state)
99 {
100         MonoDomain *domain = mono_domain_get ();
101         MonoAsyncResult *ares;
102         ASyncCall *ac;
103         int busy, worker;
104
105 #ifdef HAVE_BOEHM_GC
106         ac = GC_MALLOC (sizeof (ASyncCall));
107 #else
108         /* We'll leak the event if creaated... */
109         ac = g_new0 (ASyncCall, 1);
110 #endif
111         ac->wait_event = NULL;
112         ac->msg = msg;
113         ac->state = state;
114
115         if (async_callback) {
116                 ac->cb_method = mono_get_delegate_invoke (((MonoObject *)async_callback)->vtable->klass);
117                 ac->cb_target = async_callback;
118         }
119
120         ares = mono_async_result_new (domain, NULL, ac->state, ac);
121         ares->async_delegate = target;
122
123         if (!ares_htable) {
124                 MONO_GC_REGISTER_ROOT (ares_htable);
125                 ares_htable = mono_g_hash_table_new (NULL, NULL);
126                 job_added = CreateSemaphore (NULL, 0, 0x7fffffff, NULL);
127         }
128
129         mono_g_hash_table_insert (ares_htable, ares, ares);
130
131         busy = (int) InterlockedCompareExchange (&busy_worker_threads, 0, -1);
132         worker = (int) InterlockedCompareExchange (&mono_worker_threads, 0, -1); 
133         if (worker <= ++busy &&
134             worker < mono_max_worker_threads) {
135                 InterlockedIncrement (&mono_worker_threads);
136                 InterlockedIncrement (&busy_worker_threads);
137                 mono_thread_create (domain, async_invoke_thread, ares);
138         } else {
139                 append_job (ares);
140                 ReleaseSemaphore (job_added, 1, NULL);
141         }
142
143         return ares;
144 }
145
146 MonoObject *
147 mono_thread_pool_finish (MonoAsyncResult *ares, MonoArray **out_args, MonoObject **exc)
148 {
149         ASyncCall *ac;
150
151         *exc = NULL;
152         *out_args = NULL;
153
154         /* check if already finished */
155         mono_monitor_enter ((MonoObject *) ares);
156         
157         if (ares->endinvoke_called) {
158                 *exc = (MonoObject *)mono_exception_from_name (mono_defaults.corlib, "System", 
159                                               "InvalidOperationException");
160                 mono_monitor_exit ((MonoObject *) ares);
161                 return NULL;
162         }
163
164         ares->endinvoke_called = 1;
165         ac = (ASyncCall *)ares->data;
166
167         g_assert (ac != NULL);
168
169         /* wait until we are really finished */
170         if (!ares->completed) {
171                 if (ares->handle == NULL) {
172                         ac->wait_event = CreateEvent (NULL, TRUE, FALSE, NULL);
173                         ares->handle = (MonoObject *) mono_wait_handle_new (mono_object_domain (ares), ac->wait_event);
174                 }
175                 mono_monitor_exit ((MonoObject *) ares);
176                 WaitForSingleObjectEx (ac->wait_event, INFINITE, TRUE);
177         } else {
178                 mono_monitor_exit ((MonoObject *) ares);
179         }
180
181         *exc = ac->msg->exc;
182         *out_args = ac->out_args;
183
184         return ac->res;
185 }
186
187 void
188 mono_thread_pool_cleanup (void)
189 {
190         gint release;
191
192         EnterCriticalSection (&mono_delegate_section);
193         g_list_free (async_call_queue);
194         async_call_queue = NULL;
195         release = (gint) InterlockedCompareExchange (&busy_worker_threads, 0, -1);
196         LeaveCriticalSection (&mono_delegate_section);
197         if (job_added)
198                 ReleaseSemaphore (job_added, release, NULL);
199 }
200
201 static void
202 append_job (MonoAsyncResult *ar)
203 {
204         EnterCriticalSection (&mono_delegate_section);
205         async_call_queue = g_list_append (async_call_queue, ar); 
206         LeaveCriticalSection (&mono_delegate_section);
207 }
208
209 static MonoAsyncResult *
210 dequeue_job (void)
211 {
212         MonoAsyncResult *ar = NULL;
213         GList *tmp = NULL;
214
215         EnterCriticalSection (&mono_delegate_section);
216         if (async_call_queue) {
217                 ar = (MonoAsyncResult *)async_call_queue->data;
218                 tmp = async_call_queue;
219                 async_call_queue = g_list_remove_link (tmp, tmp); 
220         }
221         LeaveCriticalSection (&mono_delegate_section);
222         if (tmp)
223                 g_list_free_1 (tmp);
224
225         return ar;
226 }
227
228 static void
229 async_invoke_thread (gpointer data)
230 {
231         MonoDomain *domain;
232         MonoThread *thread;
233         int workers, min;
234  
235         thread = mono_thread_current ();
236         thread->threadpool_thread = TRUE;
237         thread->state |= ThreadState_Background;
238         for (;;) {
239                 MonoAsyncResult *ar;
240
241                 ar = (MonoAsyncResult *) data;
242                 if (ar) {
243                         /* worker threads invokes methods in different domains,
244                          * so we need to set the right domain here */
245                         domain = ((MonoObject *)ar)->vtable->domain;
246                         if (mono_domain_set (domain, FALSE))
247                                 mono_async_invoke (ar);
248                         InterlockedDecrement (&busy_worker_threads);
249                 }
250
251                 data = dequeue_job ();
252         
253                 if (!data) {
254                         guint32 wr;
255                         int timeout = 500;
256                         guint32 start_time = GetTickCount ();
257                         
258                         do {
259                                 wr = WaitForSingleObjectEx (job_added, (guint32)timeout, TRUE);
260                                 mono_thread_interruption_checkpoint ();
261                         
262                                 timeout -= GetTickCount () - start_time;
263                         
264                                 if (wr != WAIT_TIMEOUT)
265                                         data = dequeue_job ();
266                         }
267                         while (!data && timeout > 0);
268                 }
269                 
270                 if (!data) {
271                         workers = (int) InterlockedCompareExchange (&mono_worker_threads, 0, -1); 
272                         min = (int) InterlockedCompareExchange (&mono_min_worker_threads, 0, -1); 
273         
274                         while (!data && workers <= min) {
275                                 WaitForSingleObjectEx (job_added, INFINITE, TRUE);
276                                 mono_thread_interruption_checkpoint ();
277                         
278                                 data = dequeue_job ();
279                                 workers = (int) InterlockedCompareExchange (&mono_worker_threads, 0, -1); 
280                                 min = (int) InterlockedCompareExchange (&mono_min_worker_threads, 0, -1); 
281                         }
282                 }
283         
284                 if (!data) {
285                         InterlockedDecrement (&mono_worker_threads);
286                         return;
287                 }
288                 
289                 InterlockedIncrement (&busy_worker_threads);
290         }
291
292         g_assert_not_reached ();
293 }
294
295 void
296 ves_icall_System_Threading_ThreadPool_GetAvailableThreads (gint *workerThreads, gint *completionPortThreads)
297 {
298         gint busy;
299
300         MONO_ARCH_SAVE_REGS;
301
302         busy = (gint) InterlockedCompareExchange (&busy_worker_threads, 0, -1);
303         *workerThreads = mono_max_worker_threads - busy;
304         *completionPortThreads = 0;
305 }
306
307 void
308 ves_icall_System_Threading_ThreadPool_GetMaxThreads (gint *workerThreads, gint *completionPortThreads)
309 {
310         MONO_ARCH_SAVE_REGS;
311
312         *workerThreads = mono_max_worker_threads;
313         *completionPortThreads = 0;
314 }
315
316 void
317 ves_icall_System_Threading_ThreadPool_GetMinThreads (gint *workerThreads, gint *completionPortThreads)
318 {
319         gint workers;
320
321         MONO_ARCH_SAVE_REGS;
322
323         workers = (gint) InterlockedCompareExchange (&mono_min_worker_threads, 0, -1);
324         *workerThreads = workers;
325         *completionPortThreads = 0;
326 }
327
328 MonoBoolean
329 ves_icall_System_Threading_ThreadPool_SetMinThreads (gint workerThreads, gint completionPortThreads)
330 {
331         MONO_ARCH_SAVE_REGS;
332
333         if (workerThreads < 0 || workerThreads > mono_max_worker_threads)
334                 return FALSE;
335         InterlockedExchange (&mono_min_worker_threads, workerThreads);
336         /* FIXME: should actually start the idle threads if needed */
337         return TRUE;
338 }
339
340 static void
341 overlapped_callback (guint32 error, guint32 numbytes, WapiOverlapped *overlapped)
342 {
343         MonoFSAsyncResult *ares;
344         MonoThread *thread;
345  
346         MONO_ARCH_SAVE_REGS;
347
348         ares = (MonoFSAsyncResult *) overlapped->handle1;
349         ares->completed = TRUE;
350         if (ares->bytes_read != -1)
351                 ares->bytes_read = numbytes;
352         else
353                 ares->count = numbytes;
354
355         thread = mono_thread_attach (mono_object_domain (ares));
356         if (ares->async_callback != NULL) {
357                 gpointer p [1];
358
359                 *p = ares;
360                 mono_runtime_invoke (ares->async_callback->method_info->method, NULL, p, NULL);
361         }
362
363         SetEvent (ares->wait_handle->handle);
364         mono_thread_detach (thread);
365         g_free (overlapped);
366 }
367
368 MonoBoolean
369 ves_icall_System_Threading_ThreadPool_BindHandle (gpointer handle)
370 {
371         MONO_ARCH_SAVE_REGS;
372
373 #ifdef PLATFORM_WIN32
374         return FALSE;
375 #else
376         if (!BindIoCompletionCallback (handle, overlapped_callback, 0)) {
377                 gint error = GetLastError ();
378                 MonoException *exc;
379                 gchar *msg;
380
381                 if (error == ERROR_INVALID_PARAMETER) {
382                         exc = mono_get_exception_argument (NULL, "Invalid parameter.");
383                 } else {
384                         msg = g_strdup_printf ("Win32 error %d.", error);
385                         exc = mono_exception_from_name_msg (mono_defaults.corlib,
386                                                             "System",
387                                                             "ApplicationException", msg);
388                         g_free (msg);
389                 }
390
391                 mono_raise_exception (exc);
392         }
393
394         return TRUE;
395 #endif
396 }
397