2 * threadpool.c: global thread pool
5 * Dietmar Maurer (dietmar@ximian.com)
6 * Gonzalo Paniagua Javier (gonzalo@ximian.com)
8 * (C) 2001 Ximian, Inc.
14 #include <mono/metadata/appdomain.h>
15 #include <mono/metadata/tabledefs.h>
16 #include <mono/metadata/threads.h>
17 #include <mono/metadata/exception.h>
18 #include <mono/io-layer/io-layer.h>
19 #include <mono/os/gc_wrapper.h>
21 #include "threadpool.h"
23 /* maximum number of worker threads */
24 int mono_max_worker_threads = 25; /* fixme: should be 25 per available CPU */
25 /* current number of worker threads */
26 static int mono_worker_threads = 0;
28 /* current number of busy threads */
29 int busy_worker_threads = 0;
31 /* we use this to store a reference to the AsyncResult to avoid GC */
32 static MonoGHashTable *ares_htable = NULL;
35 static HANDLE job_added;
38 MonoMethodMessage *msg;
39 HANDLE wait_semaphore;
40 MonoMethod *cb_method;
41 MonoDelegate *cb_target;
47 static void async_invoke_thread (gpointer data);
48 static void append_job (MonoAsyncResult *ar);
50 static GList *async_call_queue = NULL;
53 mono_async_invoke (MonoAsyncResult *ares)
55 ASyncCall *ac = (ASyncCall *)ares->data;
58 ac->res = mono_message_invoke (ares->async_delegate, ac->msg,
59 &ac->msg->exc, &ac->out_args);
63 /* call async callback if cb_method != null*/
65 MonoObject *exc = NULL;
67 mono_runtime_invoke (ac->cb_method, ac->cb_target, pa, &exc);
72 /* notify listeners */
73 ReleaseSemaphore (ac->wait_semaphore, 0x7fffffff, NULL);
75 mono_g_hash_table_remove (ares_htable, ares);
79 mono_thread_pool_add (MonoObject *target, MonoMethodMessage *msg, MonoDelegate *async_callback,
82 MonoDomain *domain = mono_domain_get ();
83 MonoAsyncResult *ares;
88 ac = GC_MALLOC (sizeof (ASyncCall));
90 /* We'll leak the semaphore... */
91 ac = g_new0 (ASyncCall, 1);
93 ac->wait_semaphore = CreateSemaphore (NULL, 0, 0x7fffffff, NULL);
98 ac->cb_method = mono_get_delegate_invoke (((MonoObject *)async_callback)->vtable->klass);
99 ac->cb_target = async_callback;
102 ares = mono_async_result_new (domain, ac->wait_semaphore, ac->state, ac);
103 ares->async_delegate = target;
106 ares_htable = mono_g_hash_table_new (NULL, NULL);
107 job_added = CreateSemaphore (NULL, 0, 0x7fffffff, NULL);
110 mono_g_hash_table_insert (ares_htable, ares, ares);
112 busy = (int) InterlockedCompareExchange (&busy_worker_threads, 0, -1);
113 worker = (int) InterlockedCompareExchange (&mono_worker_threads, 0, -1);
114 if (worker <= ++busy &&
115 worker < mono_max_worker_threads) {
116 InterlockedIncrement (&mono_worker_threads);
117 InterlockedIncrement (&busy_worker_threads);
118 mono_thread_create (domain, async_invoke_thread, ares);
121 ReleaseSemaphore (job_added, 1, NULL);
128 mono_thread_pool_finish (MonoAsyncResult *ares, MonoArray **out_args, MonoObject **exc)
135 /* check if already finished */
136 if (ares->endinvoke_called) {
137 *exc = (MonoObject *)mono_exception_from_name (mono_defaults.corlib, "System",
138 "InvalidOperationException");
142 ares->endinvoke_called = 1;
143 ac = (ASyncCall *)ares->data;
145 g_assert (ac != NULL);
147 /* wait until we are really finished */
148 if (!ares->completed)
149 WaitForSingleObject (ac->wait_semaphore, INFINITE);
152 *out_args = ac->out_args;
158 append_job (MonoAsyncResult *ar)
160 EnterCriticalSection (&mono_delegate_section);
161 async_call_queue = g_list_append (async_call_queue, ar);
162 LeaveCriticalSection (&mono_delegate_section);
165 static MonoAsyncResult *
168 MonoAsyncResult *ar = NULL;
171 EnterCriticalSection (&mono_delegate_section);
172 if (async_call_queue) {
173 ar = (MonoAsyncResult *)async_call_queue->data;
174 tmp = async_call_queue;
175 async_call_queue = g_list_remove_link (tmp, tmp);
177 LeaveCriticalSection (&mono_delegate_section);
185 async_invoke_thread (gpointer data)
190 thread = mono_thread_current ();
191 thread->threadpool_thread = TRUE;
192 thread->state |= ThreadState_Background;
196 ar = (MonoAsyncResult *) data;
198 /* worker threads invokes methods in different domains,
199 * so we need to set the right domain here */
200 domain = ((MonoObject *)ar)->vtable->domain;
201 if (mono_domain_set (domain, FALSE))
202 mono_async_invoke (ar);
203 InterlockedDecrement (&busy_worker_threads);
206 data = dequeue_job ();
207 if (!data && WaitForSingleObject (job_added, 500) != WAIT_TIMEOUT)
208 data = dequeue_job ();
211 InterlockedDecrement (&mono_worker_threads);
214 InterlockedIncrement (&busy_worker_threads);
217 g_assert_not_reached ();