2 * threadpool.c: global thread pool
5 * Dietmar Maurer (dietmar@ximian.com)
6 * Gonzalo Paniagua Javier (gonzalo@ximian.com)
8 * (C) 2001-2003 Ximian, Inc.
9 * (c) 2004 Novell, Inc. (http://www.novell.com)
17 #define _WIN32_WINNT 0x0500
20 #include <mono/metadata/domain-internals.h>
21 #include <mono/metadata/tabledefs.h>
22 #include <mono/metadata/threads.h>
23 #include <mono/metadata/threads-types.h>
24 #include <mono/metadata/exception.h>
25 #include <mono/metadata/file-io.h>
26 #include <mono/metadata/monitor.h>
27 #include <mono/metadata/marshal.h>
28 #include <mono/io-layer/io-layer.h>
29 #include <mono/os/gc_wrapper.h>
31 #include "threadpool.h"
33 /* maximum number of worker threads */
34 int mono_max_worker_threads = 25; /* per available CPU? */
35 static int mono_min_worker_threads = 0;
37 /* current number of worker threads */
38 static int mono_worker_threads = 0;
40 /* current number of busy threads */
41 static int busy_worker_threads = 0;
43 /* we use this to store a reference to the AsyncResult to avoid GC */
44 static MonoGHashTable *ares_htable = NULL;
46 static CRITICAL_SECTION ares_lock;
49 static HANDLE job_added;
52 MonoMethodMessage *msg;
54 MonoMethod *cb_method;
55 MonoDelegate *cb_target;
61 static void async_invoke_thread (gpointer data);
62 static void append_job (MonoAsyncResult *ar);
64 static GList *async_call_queue = NULL;
67 mono_async_invoke (MonoAsyncResult *ares)
69 ASyncCall *ac = (ASyncCall *)ares->data;
72 ac->res = mono_message_invoke (ares->async_delegate, ac->msg,
73 &ac->msg->exc, &ac->out_args);
77 /* call async callback if cb_method != null*/
79 MonoObject *exc = NULL;
81 mono_runtime_invoke (ac->cb_method, ac->cb_target, pa, &exc);
86 /* notify listeners */
87 if(!mono_monitor_enter ((MonoObject *) ares))
90 if (ares->handle != NULL) {
91 ac->wait_event = ((MonoWaitHandle *) ares->handle)->handle;
92 SetEvent (ac->wait_event);
94 mono_monitor_exit ((MonoObject *) ares);
96 EnterCriticalSection (&ares_lock);
97 mono_g_hash_table_remove (ares_htable, ares);
98 LeaveCriticalSection (&ares_lock);
103 mono_thread_pool_add (MonoObject *target, MonoMethodMessage *msg, MonoDelegate *async_callback,
106 MonoDomain *domain = mono_domain_get ();
107 MonoAsyncResult *ares;
112 ac = GC_MALLOC (sizeof (ASyncCall));
114 /* We'll leak the event if creaated... */
115 ac = g_new0 (ASyncCall, 1);
117 ac->wait_event = NULL;
121 if (async_callback) {
122 ac->cb_method = mono_get_delegate_invoke (((MonoObject *)async_callback)->vtable->klass);
123 ac->cb_target = async_callback;
126 ares = mono_async_result_new (domain, NULL, ac->state, ac);
127 ares->async_delegate = target;
130 InitializeCriticalSection (&ares_lock);
131 ares_htable = mono_g_hash_table_new (NULL, NULL);
132 job_added = CreateSemaphore (NULL, 0, 0x7fffffff, NULL);
135 EnterCriticalSection (&ares_lock);
136 mono_g_hash_table_insert (ares_htable, ares, ares);
137 LeaveCriticalSection (&ares_lock);
139 busy = (int) InterlockedCompareExchange (&busy_worker_threads, 0, -1);
140 worker = (int) InterlockedCompareExchange (&mono_worker_threads, 0, -1);
141 if (worker <= ++busy &&
142 worker < mono_max_worker_threads) {
143 InterlockedIncrement (&mono_worker_threads);
144 InterlockedIncrement (&busy_worker_threads);
145 mono_thread_create (domain, async_invoke_thread, ares);
148 ReleaseSemaphore (job_added, 1, NULL);
155 mono_thread_pool_finish (MonoAsyncResult *ares, MonoArray **out_args, MonoObject **exc)
162 /* check if already finished */
163 if (!mono_monitor_enter ((MonoObject *) ares)) {
167 if (ares->endinvoke_called) {
168 *exc = (MonoObject *)mono_exception_from_name (mono_defaults.corlib, "System",
169 "InvalidOperationException");
170 mono_monitor_exit ((MonoObject *) ares);
174 ares->endinvoke_called = 1;
175 ac = (ASyncCall *)ares->data;
177 g_assert (ac != NULL);
179 /* wait until we are really finished */
180 if (!ares->completed) {
181 if (ares->handle == NULL) {
182 ac->wait_event = CreateEvent (NULL, TRUE, FALSE, NULL);
183 ares->handle = (MonoObject *) mono_wait_handle_new (mono_object_domain (ares), ac->wait_event);
185 mono_monitor_exit ((MonoObject *) ares);
186 WaitForSingleObjectEx (ac->wait_event, INFINITE, TRUE);
188 mono_monitor_exit ((MonoObject *) ares);
192 *out_args = ac->out_args;
198 mono_thread_pool_cleanup (void)
202 EnterCriticalSection (&mono_delegate_section);
203 g_list_free (async_call_queue);
204 async_call_queue = NULL;
205 release = (gint) InterlockedCompareExchange (&busy_worker_threads, 0, -1);
206 LeaveCriticalSection (&mono_delegate_section);
208 ReleaseSemaphore (job_added, release, NULL);
212 append_job (MonoAsyncResult *ar)
216 EnterCriticalSection (&mono_delegate_section);
217 if (async_call_queue == NULL) {
218 async_call_queue = g_list_append (async_call_queue, ar);
220 for (tmp = async_call_queue; tmp && tmp->data != NULL; tmp = tmp->next);
222 async_call_queue = g_list_append (async_call_queue, ar);
227 LeaveCriticalSection (&mono_delegate_section);
230 static MonoAsyncResult *
233 MonoAsyncResult *ar = NULL;
236 EnterCriticalSection (&mono_delegate_section);
237 tmp = async_call_queue;
239 ar = (MonoAsyncResult *) tmp->data;
242 for (tmp2 = tmp; tmp2->next != NULL; tmp2 = tmp2->next);
244 async_call_queue = tmp->next;
250 LeaveCriticalSection (&mono_delegate_section);
256 async_invoke_thread (gpointer data)
262 thread = mono_thread_current ();
263 thread->threadpool_thread = TRUE;
264 thread->state |= ThreadState_Background;
268 ar = (MonoAsyncResult *) data;
270 /* worker threads invokes methods in different domains,
271 * so we need to set the right domain here */
272 domain = ((MonoObject *)ar)->vtable->domain;
273 if (mono_domain_set (domain, FALSE))
274 mono_async_invoke (ar);
275 InterlockedDecrement (&busy_worker_threads);
278 data = dequeue_job ();
283 guint32 start_time = GetTickCount ();
286 wr = WaitForSingleObjectEx (job_added, (guint32)timeout, TRUE);
287 mono_thread_interruption_checkpoint ();
289 timeout -= GetTickCount () - start_time;
291 if (wr != WAIT_TIMEOUT)
292 data = dequeue_job ();
294 while (!data && timeout > 0);
298 workers = (int) InterlockedCompareExchange (&mono_worker_threads, 0, -1);
299 min = (int) InterlockedCompareExchange (&mono_min_worker_threads, 0, -1);
301 while (!data && workers <= min) {
302 WaitForSingleObjectEx (job_added, INFINITE, TRUE);
303 mono_thread_interruption_checkpoint ();
305 data = dequeue_job ();
306 workers = (int) InterlockedCompareExchange (&mono_worker_threads, 0, -1);
307 min = (int) InterlockedCompareExchange (&mono_min_worker_threads, 0, -1);
312 InterlockedDecrement (&mono_worker_threads);
316 InterlockedIncrement (&busy_worker_threads);
319 g_assert_not_reached ();
323 ves_icall_System_Threading_ThreadPool_GetAvailableThreads (gint *workerThreads, gint *completionPortThreads)
329 busy = (gint) InterlockedCompareExchange (&busy_worker_threads, 0, -1);
330 *workerThreads = mono_max_worker_threads - busy;
331 *completionPortThreads = 0;
335 ves_icall_System_Threading_ThreadPool_GetMaxThreads (gint *workerThreads, gint *completionPortThreads)
339 *workerThreads = mono_max_worker_threads;
340 *completionPortThreads = 0;
344 ves_icall_System_Threading_ThreadPool_GetMinThreads (gint *workerThreads, gint *completionPortThreads)
350 workers = (gint) InterlockedCompareExchange (&mono_min_worker_threads, 0, -1);
351 *workerThreads = workers;
352 *completionPortThreads = 0;
356 ves_icall_System_Threading_ThreadPool_SetMinThreads (gint workerThreads, gint completionPortThreads)
360 if (workerThreads < 0 || workerThreads > mono_max_worker_threads)
362 InterlockedExchange (&mono_min_worker_threads, workerThreads);
363 /* FIXME: should actually start the idle threads if needed */
368 overlapped_callback (guint32 error, guint32 numbytes, WapiOverlapped *overlapped)
370 MonoFSAsyncResult *ares;
375 ares = (MonoFSAsyncResult *) overlapped->handle1;
376 ares->completed = TRUE;
377 if (ares->bytes_read != -1)
378 ares->bytes_read = numbytes;
380 ares->count = numbytes;
382 thread = mono_thread_attach (mono_object_domain (ares));
383 if (ares->async_callback != NULL) {
387 mono_runtime_invoke (ares->async_callback->method_info->method, NULL, p, NULL);
390 SetEvent (ares->wait_handle->handle);
391 mono_thread_detach (thread);
396 ves_icall_System_Threading_ThreadPool_BindHandle (gpointer handle)
400 #ifdef PLATFORM_WIN32
403 if (!BindIoCompletionCallback (handle, overlapped_callback, 0)) {
404 gint error = GetLastError ();
408 if (error == ERROR_INVALID_PARAMETER) {
409 exc = mono_get_exception_argument (NULL, "Invalid parameter.");
411 msg = g_strdup_printf ("Win32 error %d.", error);
412 exc = mono_exception_from_name_msg (mono_defaults.corlib,
414 "ApplicationException", msg);
418 mono_raise_exception (exc);