* culture-info.h: Make defines more consistent, add calendar data
[mono.git] / mono / metadata / threadpool.c
1 /*
2  * threadpool.c: global thread pool
3  *
4  * Authors:
5  *   Dietmar Maurer (dietmar@ximian.com)
6  *   Gonzalo Paniagua Javier (gonzalo@ximian.com)
7  *
8  * (C) 2001-2003 Ximian, Inc.
9  * (c) 2004 Novell, Inc. (http://www.novell.com)
10  */
11
12 #include <config.h>
13 #include <glib.h>
14
15 #ifdef PLATFORM_WIN32
16 #define WINVER 0x0500
17 #define _WIN32_WINNT 0x0500
18 #endif
19
20 #include <mono/metadata/appdomain.h>
21 #include <mono/metadata/tabledefs.h>
22 #include <mono/metadata/threads.h>
23 #include <mono/metadata/exception.h>
24 #include <mono/metadata/file-io.h>
25 #include <mono/metadata/monitor.h>
26 #include <mono/metadata/marshal.h>
27 #include <mono/io-layer/io-layer.h>
28 #include <mono/os/gc_wrapper.h>
29
30 #include "threadpool.h"
31
32 /* maximum number of worker threads */
33 int mono_max_worker_threads = 25; /* fixme: should be 25 per available CPU */
34 int mono_min_worker_threads = 0;
35
36 /* current number of worker threads */
37 static int mono_worker_threads = 0;
38
39 /* current number of busy threads */
40 int busy_worker_threads = 0;
41
42 /* we use this to store a reference to the AsyncResult to avoid GC */
43 static MonoGHashTable *ares_htable = NULL;
44
45 /* we append a job */
46 static HANDLE job_added;
47
48 typedef struct {
49         MonoMethodMessage *msg;
50         HANDLE             wait_event;
51         MonoMethod        *cb_method;
52         MonoDelegate      *cb_target;
53         MonoObject        *state;
54         MonoObject        *res;
55         MonoArray         *out_args;
56 } ASyncCall;
57
58 static void async_invoke_thread (gpointer data);
59 static void append_job (MonoAsyncResult *ar);
60
61 static GList *async_call_queue = NULL;
62
63 static void
64 mono_async_invoke (MonoAsyncResult *ares)
65 {
66         ASyncCall *ac = (ASyncCall *)ares->data;
67
68         ac->msg->exc = NULL;
69         ac->res = mono_message_invoke (ares->async_delegate, ac->msg, 
70                                        &ac->msg->exc, &ac->out_args);
71
72         ares->completed = 1;
73
74         /* call async callback if cb_method != null*/
75         if (ac->cb_method) {
76                 MonoObject *exc = NULL;
77                 void *pa = &ares;
78                 mono_runtime_invoke (ac->cb_method, ac->cb_target, pa, &exc);
79                 if (!ac->msg->exc)
80                         ac->msg->exc = exc;
81         }
82
83         /* notify listeners */
84         mono_monitor_try_enter ((MonoObject *) ares, INFINITE);
85         if (ares->handle != NULL) {
86                 ac->wait_event = ((MonoWaitHandle *) ares->handle)->handle;
87                 SetEvent (ac->wait_event);
88         }
89         mono_monitor_exit ((MonoObject *) ares);
90
91         mono_g_hash_table_remove (ares_htable, ares);
92 }
93
94 MonoAsyncResult *
95 mono_thread_pool_add (MonoObject *target, MonoMethodMessage *msg, MonoDelegate *async_callback,
96                       MonoObject *state)
97 {
98         MonoDomain *domain = mono_domain_get ();
99         MonoAsyncResult *ares;
100         ASyncCall *ac;
101         int busy, worker;
102
103 #ifdef HAVE_BOEHM_GC
104         ac = GC_MALLOC (sizeof (ASyncCall));
105 #else
106         /* We'll leak the event if creaated... */
107         ac = g_new0 (ASyncCall, 1);
108 #endif
109         ac->wait_event = NULL;
110         ac->msg = msg;
111         ac->state = state;
112
113         if (async_callback) {
114                 ac->cb_method = mono_get_delegate_invoke (((MonoObject *)async_callback)->vtable->klass);
115                 ac->cb_target = async_callback;
116         }
117
118         ares = mono_async_result_new (domain, NULL, ac->state, ac);
119         ares->async_delegate = target;
120
121         if (!ares_htable) {
122                 ares_htable = mono_g_hash_table_new (NULL, NULL);
123                 job_added = CreateSemaphore (NULL, 0, 0x7fffffff, NULL);
124         }
125
126         mono_g_hash_table_insert (ares_htable, ares, ares);
127
128         busy = (int) InterlockedCompareExchange (&busy_worker_threads, 0, -1);
129         worker = (int) InterlockedCompareExchange (&mono_worker_threads, 0, -1); 
130         if (worker <= ++busy &&
131             worker < mono_max_worker_threads) {
132                 InterlockedIncrement (&mono_worker_threads);
133                 InterlockedIncrement (&busy_worker_threads);
134                 mono_thread_create (domain, async_invoke_thread, ares);
135         } else {
136                 append_job (ares);
137                 ReleaseSemaphore (job_added, 1, NULL);
138         }
139
140         return ares;
141 }
142
143 MonoObject *
144 mono_thread_pool_finish (MonoAsyncResult *ares, MonoArray **out_args, MonoObject **exc)
145 {
146         ASyncCall *ac;
147
148         *exc = NULL;
149         *out_args = NULL;
150
151         /* check if already finished */
152         mono_monitor_try_enter ((MonoObject *) ares, INFINITE);
153         if (ares->endinvoke_called) {
154                 *exc = (MonoObject *)mono_exception_from_name (mono_defaults.corlib, "System", 
155                                               "InvalidOperationException");
156                 mono_monitor_exit ((MonoObject *) ares);
157                 return NULL;
158         }
159
160         ares->endinvoke_called = 1;
161         ac = (ASyncCall *)ares->data;
162
163         g_assert (ac != NULL);
164
165         /* wait until we are really finished */
166         if (!ares->completed) {
167                 if (ares->handle == NULL) {
168                         ac->wait_event = CreateEvent (NULL, TRUE, FALSE, NULL);
169                         ares->handle = (MonoObject *) mono_wait_handle_new (mono_object_domain (ares), ac->wait_event);
170                 }
171                 mono_monitor_exit ((MonoObject *) ares);
172                 WaitForSingleObject (ac->wait_event, INFINITE);
173         } else {
174                 mono_monitor_exit ((MonoObject *) ares);
175         }
176
177         *exc = ac->msg->exc;
178         *out_args = ac->out_args;
179
180         return ac->res;
181 }
182
183 void
184 mono_thread_pool_cleanup (void)
185 {
186         gint release;
187
188         EnterCriticalSection (&mono_delegate_section);
189         g_list_free (async_call_queue);
190         async_call_queue = NULL;
191         release = (gint) InterlockedCompareExchange (&busy_worker_threads, 0, -1);
192         LeaveCriticalSection (&mono_delegate_section);
193         if (job_added)
194                 ReleaseSemaphore (job_added, release, NULL);
195 }
196
197 static void
198 append_job (MonoAsyncResult *ar)
199 {
200         EnterCriticalSection (&mono_delegate_section);
201         async_call_queue = g_list_append (async_call_queue, ar); 
202         LeaveCriticalSection (&mono_delegate_section);
203 }
204
205 static MonoAsyncResult *
206 dequeue_job (void)
207 {
208         MonoAsyncResult *ar = NULL;
209         GList *tmp = NULL;
210
211         EnterCriticalSection (&mono_delegate_section);
212         if (async_call_queue) {
213                 ar = (MonoAsyncResult *)async_call_queue->data;
214                 tmp = async_call_queue;
215                 async_call_queue = g_list_remove_link (tmp, tmp); 
216         }
217         LeaveCriticalSection (&mono_delegate_section);
218         if (tmp)
219                 g_list_free_1 (tmp);
220
221         return ar;
222 }
223
224 static void
225 async_invoke_thread (gpointer data)
226 {
227         MonoDomain *domain;
228         MonoThread *thread;
229         int workers, min;
230  
231         thread = mono_thread_current ();
232         thread->threadpool_thread = TRUE;
233         thread->state |= ThreadState_Background;
234         for (;;) {
235                 MonoAsyncResult *ar;
236
237                 ar = (MonoAsyncResult *) data;
238                 if (ar) {
239                         /* worker threads invokes methods in different domains,
240                          * so we need to set the right domain here */
241                         domain = ((MonoObject *)ar)->vtable->domain;
242                         if (mono_domain_set (domain, FALSE))
243                                 mono_async_invoke (ar);
244                         InterlockedDecrement (&busy_worker_threads);
245                 }
246
247                 data = dequeue_job ();
248         
249                 if (!data && WaitForSingleObject (job_added, 500) != WAIT_TIMEOUT)
250                         data = dequeue_job ();
251                 
252                 if (!data) {
253                         workers = (int) InterlockedCompareExchange (&mono_worker_threads, 0, -1); 
254                         min = (int) InterlockedCompareExchange (&mono_min_worker_threads, 0, -1); 
255         
256                         while (!data && workers <= min) {
257                                 WaitForSingleObject (job_added, INFINITE);
258                                 data = dequeue_job ();
259                                 workers = (int) InterlockedCompareExchange (&mono_worker_threads, 0, -1); 
260                                 min = (int) InterlockedCompareExchange (&mono_min_worker_threads, 0, -1); 
261                         }
262                 }
263         
264                 if (!data) {
265                         InterlockedDecrement (&mono_worker_threads);
266                         return;
267                 }
268                 
269                 InterlockedIncrement (&busy_worker_threads);
270         }
271
272         g_assert_not_reached ();
273 }
274
275 void
276 ves_icall_System_Threading_ThreadPool_GetAvailableThreads (gint *workerThreads, gint *completionPortThreads)
277 {
278         gint busy;
279
280         MONO_ARCH_SAVE_REGS;
281
282         busy = (gint) InterlockedCompareExchange (&busy_worker_threads, 0, -1);
283         *workerThreads = mono_max_worker_threads - busy;
284         *completionPortThreads = 0;
285 }
286
287 void
288 ves_icall_System_Threading_ThreadPool_GetMaxThreads (gint *workerThreads, gint *completionPortThreads)
289 {
290         MONO_ARCH_SAVE_REGS;
291
292         *workerThreads = mono_max_worker_threads;
293         *completionPortThreads = 0;
294 }
295
296 void
297 ves_icall_System_Threading_ThreadPool_GetMinThreads (gint *workerThreads, gint *completionPortThreads)
298 {
299         gint workers;
300
301         MONO_ARCH_SAVE_REGS;
302
303         workers = (gint) InterlockedCompareExchange (&mono_min_worker_threads, 0, -1);
304         *workerThreads = workers;
305         *completionPortThreads = 0;
306 }
307
308 void
309 ves_icall_System_Threading_ThreadPool_SetMinThreads (gint workerThreads, gint completionPortThreads)
310 {
311         MONO_ARCH_SAVE_REGS;
312
313         InterlockedExchange (&mono_min_worker_threads, workerThreads);
314 }
315
316 static void
317 overlapped_callback (guint32 error, guint32 numbytes, WapiOverlapped *overlapped)
318 {
319         MonoFSAsyncResult *ares;
320         MonoThread *thread;
321  
322         MONO_ARCH_SAVE_REGS;
323
324         ares = (MonoFSAsyncResult *) overlapped->handle1;
325         ares->completed = TRUE;
326         if (ares->bytes_read != -1)
327                 ares->bytes_read = numbytes;
328         else
329                 ares->count = numbytes;
330
331         thread = mono_thread_attach (mono_object_domain (ares));
332         if (ares->async_callback != NULL) {
333                 gpointer p [1];
334
335                 *p = ares;
336                 mono_runtime_invoke (ares->async_callback->method_info->method, NULL, p, NULL);
337         }
338
339         SetEvent (ares->wait_handle->handle);
340         mono_thread_detach (thread);
341         g_free (overlapped);
342 }
343
344 MonoBoolean
345 ves_icall_System_Threading_ThreadPool_BindHandle (gpointer handle)
346 {
347         MONO_ARCH_SAVE_REGS;
348
349 #ifdef PLATFORM_WIN32
350         return FALSE;
351 #else
352         if (!BindIoCompletionCallback (handle, overlapped_callback, 0)) {
353                 gint error = GetLastError ();
354                 MonoException *exc;
355                 gchar *msg;
356
357                 if (error == ERROR_INVALID_PARAMETER) {
358                         exc = mono_get_exception_argument (NULL, "Invalid parameter.");
359                 } else {
360                         msg = g_strdup_printf ("Win32 error %d.", error);
361                         exc = mono_exception_from_name_msg (mono_defaults.corlib,
362                                                             "System",
363                                                             "ApplicationException", msg);
364                         g_free (msg);
365                 }
366
367                 mono_raise_exception (exc);
368         }
369
370         return TRUE;
371 #endif
372 }
373