2 * threadpool.c: global thread pool
5 * Dietmar Maurer (dietmar@ximian.com)
6 * Gonzalo Paniagua Javier (gonzalo@ximian.com)
8 * (C) 2001-2003 Ximian, Inc.
9 * (c) 2004 Novell, Inc. (http://www.novell.com)
17 #define _WIN32_WINNT 0x0500
20 #include <mono/metadata/appdomain.h>
21 #include <mono/metadata/tabledefs.h>
22 #include <mono/metadata/threads.h>
23 #include <mono/metadata/exception.h>
24 #include <mono/metadata/file-io.h>
25 #include <mono/metadata/monitor.h>
26 #include <mono/metadata/marshal.h>
27 #include <mono/io-layer/io-layer.h>
28 #include <mono/os/gc_wrapper.h>
30 #include "threadpool.h"
32 /* maximum number of worker threads */
33 int mono_max_worker_threads = 25; /* fixme: should be 25 per available CPU */
34 int mono_min_worker_threads = 0;
36 /* current number of worker threads */
37 static int mono_worker_threads = 0;
39 /* current number of busy threads */
40 int busy_worker_threads = 0;
42 /* we use this to store a reference to the AsyncResult to avoid GC */
43 static MonoGHashTable *ares_htable = NULL;
46 static HANDLE job_added;
49 MonoMethodMessage *msg;
51 MonoMethod *cb_method;
52 MonoDelegate *cb_target;
58 static void async_invoke_thread (gpointer data);
59 static void append_job (MonoAsyncResult *ar);
61 static GList *async_call_queue = NULL;
64 mono_async_invoke (MonoAsyncResult *ares)
66 ASyncCall *ac = (ASyncCall *)ares->data;
69 ac->res = mono_message_invoke (ares->async_delegate, ac->msg,
70 &ac->msg->exc, &ac->out_args);
74 /* call async callback if cb_method != null*/
76 MonoObject *exc = NULL;
78 mono_runtime_invoke (ac->cb_method, ac->cb_target, pa, &exc);
83 /* notify listeners */
84 mono_monitor_try_enter ((MonoObject *) ares, INFINITE);
85 if (ares->handle != NULL) {
86 ac->wait_event = ((MonoWaitHandle *) ares->handle)->handle;
87 SetEvent (ac->wait_event);
89 mono_monitor_exit ((MonoObject *) ares);
91 mono_g_hash_table_remove (ares_htable, ares);
95 mono_thread_pool_add (MonoObject *target, MonoMethodMessage *msg, MonoDelegate *async_callback,
98 MonoDomain *domain = mono_domain_get ();
99 MonoAsyncResult *ares;
104 ac = GC_MALLOC (sizeof (ASyncCall));
106 /* We'll leak the event if creaated... */
107 ac = g_new0 (ASyncCall, 1);
109 ac->wait_event = NULL;
113 if (async_callback) {
114 ac->cb_method = mono_get_delegate_invoke (((MonoObject *)async_callback)->vtable->klass);
115 ac->cb_target = async_callback;
118 ares = mono_async_result_new (domain, NULL, ac->state, ac);
119 ares->async_delegate = target;
122 ares_htable = mono_g_hash_table_new (NULL, NULL);
123 job_added = CreateSemaphore (NULL, 0, 0x7fffffff, NULL);
126 mono_g_hash_table_insert (ares_htable, ares, ares);
128 busy = (int) InterlockedCompareExchange (&busy_worker_threads, 0, -1);
129 worker = (int) InterlockedCompareExchange (&mono_worker_threads, 0, -1);
130 if (worker <= ++busy &&
131 worker < mono_max_worker_threads) {
132 InterlockedIncrement (&mono_worker_threads);
133 InterlockedIncrement (&busy_worker_threads);
134 mono_thread_create (domain, async_invoke_thread, ares);
137 ReleaseSemaphore (job_added, 1, NULL);
144 mono_thread_pool_finish (MonoAsyncResult *ares, MonoArray **out_args, MonoObject **exc)
151 /* check if already finished */
152 mono_monitor_try_enter ((MonoObject *) ares, INFINITE);
153 if (ares->endinvoke_called) {
154 *exc = (MonoObject *)mono_exception_from_name (mono_defaults.corlib, "System",
155 "InvalidOperationException");
156 mono_monitor_exit ((MonoObject *) ares);
160 ares->endinvoke_called = 1;
161 ac = (ASyncCall *)ares->data;
163 g_assert (ac != NULL);
165 /* wait until we are really finished */
166 if (!ares->completed) {
167 if (ares->handle == NULL) {
168 ac->wait_event = CreateEvent (NULL, TRUE, FALSE, NULL);
169 ares->handle = (MonoObject *) mono_wait_handle_new (mono_object_domain (ares), ac->wait_event);
171 mono_monitor_exit ((MonoObject *) ares);
172 WaitForSingleObject (ac->wait_event, INFINITE);
174 mono_monitor_exit ((MonoObject *) ares);
178 *out_args = ac->out_args;
184 mono_thread_pool_cleanup (void)
188 EnterCriticalSection (&mono_delegate_section);
189 g_list_free (async_call_queue);
190 async_call_queue = NULL;
191 release = (gint) InterlockedCompareExchange (&busy_worker_threads, 0, -1);
192 LeaveCriticalSection (&mono_delegate_section);
194 ReleaseSemaphore (job_added, release, NULL);
198 append_job (MonoAsyncResult *ar)
200 EnterCriticalSection (&mono_delegate_section);
201 async_call_queue = g_list_append (async_call_queue, ar);
202 LeaveCriticalSection (&mono_delegate_section);
205 static MonoAsyncResult *
208 MonoAsyncResult *ar = NULL;
211 EnterCriticalSection (&mono_delegate_section);
212 if (async_call_queue) {
213 ar = (MonoAsyncResult *)async_call_queue->data;
214 tmp = async_call_queue;
215 async_call_queue = g_list_remove_link (tmp, tmp);
217 LeaveCriticalSection (&mono_delegate_section);
225 async_invoke_thread (gpointer data)
231 thread = mono_thread_current ();
232 thread->threadpool_thread = TRUE;
233 thread->state |= ThreadState_Background;
237 ar = (MonoAsyncResult *) data;
239 /* worker threads invokes methods in different domains,
240 * so we need to set the right domain here */
241 domain = ((MonoObject *)ar)->vtable->domain;
242 if (mono_domain_set (domain, FALSE))
243 mono_async_invoke (ar);
244 InterlockedDecrement (&busy_worker_threads);
247 data = dequeue_job ();
249 if (!data && WaitForSingleObject (job_added, 500) != WAIT_TIMEOUT)
250 data = dequeue_job ();
253 workers = (int) InterlockedCompareExchange (&mono_worker_threads, 0, -1);
254 min = (int) InterlockedCompareExchange (&mono_min_worker_threads, 0, -1);
256 while (!data && workers <= min) {
257 WaitForSingleObject (job_added, INFINITE);
258 data = dequeue_job ();
259 workers = (int) InterlockedCompareExchange (&mono_worker_threads, 0, -1);
260 min = (int) InterlockedCompareExchange (&mono_min_worker_threads, 0, -1);
265 InterlockedDecrement (&mono_worker_threads);
269 InterlockedIncrement (&busy_worker_threads);
272 g_assert_not_reached ();
276 ves_icall_System_Threading_ThreadPool_GetAvailableThreads (gint *workerThreads, gint *completionPortThreads)
282 busy = (gint) InterlockedCompareExchange (&busy_worker_threads, 0, -1);
283 *workerThreads = mono_max_worker_threads - busy;
284 *completionPortThreads = 0;
288 ves_icall_System_Threading_ThreadPool_GetMaxThreads (gint *workerThreads, gint *completionPortThreads)
292 *workerThreads = mono_max_worker_threads;
293 *completionPortThreads = 0;
297 ves_icall_System_Threading_ThreadPool_GetMinThreads (gint *workerThreads, gint *completionPortThreads)
303 workers = (gint) InterlockedCompareExchange (&mono_min_worker_threads, 0, -1);
304 *workerThreads = workers;
305 *completionPortThreads = 0;
309 ves_icall_System_Threading_ThreadPool_SetMinThreads (gint workerThreads, gint completionPortThreads)
313 InterlockedExchange (&mono_min_worker_threads, workerThreads);
317 overlapped_callback (guint32 error, guint32 numbytes, WapiOverlapped *overlapped)
319 MonoFSAsyncResult *ares;
324 ares = (MonoFSAsyncResult *) overlapped->handle1;
325 ares->completed = TRUE;
326 if (ares->bytes_read != -1)
327 ares->bytes_read = numbytes;
329 ares->count = numbytes;
331 thread = mono_thread_attach (mono_object_domain (ares));
332 if (ares->async_callback != NULL) {
336 mono_runtime_invoke (ares->async_callback->method_info->method, NULL, p, NULL);
339 SetEvent (ares->wait_handle->handle);
340 mono_thread_detach (thread);
345 ves_icall_System_Threading_ThreadPool_BindHandle (gpointer handle)
349 #ifdef PLATFORM_WIN32
352 if (!BindIoCompletionCallback (handle, overlapped_callback, 0)) {
353 gint error = GetLastError ();
357 if (error == ERROR_INVALID_PARAMETER) {
358 exc = mono_get_exception_argument (NULL, "Invalid parameter.");
360 msg = g_strdup_printf ("Win32 error %d.", error);
361 exc = mono_exception_from_name_msg (mono_defaults.corlib,
363 "ApplicationException", msg);
367 mono_raise_exception (exc);