2003-08-27 Zoltan Varga <vargaz@freemail.hu>
[mono.git] / mono / metadata / threadpool.c
1 /*
2  * threadpool.c: global thread pool
3  *
4  * Authors:
5  *   Dietmar Maurer (dietmar@ximian.com)
6  *
7  * (C) 2001 Ximian, Inc.
8  */
9
10 #include <config.h>
11 #include <glib.h>
12
13 #include <mono/metadata/appdomain.h>
14 #include <mono/metadata/tabledefs.h>
15 #include <mono/metadata/threads.h>
16 #include <mono/metadata/exception.h>
17 #include <mono/io-layer/io-layer.h>
18 #include <mono/os/gc_wrapper.h>
19
20 #include "threadpool.h"
21
22 /* maximum number of worker threads */
23 int mono_max_worker_threads = 25; /* fixme: should be 25 per available CPU */
24 /* current number of worker threads */
25 int mono_worker_threads = 0;
26
27 /* current number of busy threads */
28 static int busy_worker_threads = 0;
29
30 /* we use this to store a reference to the AsyncResult to avoid GC */
31 static MonoGHashTable *ares_htable = NULL;
32
33 typedef struct {
34         MonoMethodMessage *msg;
35         HANDLE             wait_semaphore;
36         MonoMethod        *cb_method;
37         MonoDelegate      *cb_target;
38         MonoObject        *state;
39         MonoObject        *res;
40         MonoArray         *out_args;
41 } ASyncCall;
42
43 static void async_invoke_thread (void);
44
45 static GList *async_call_queue = NULL;
46
47 static void
48 mono_async_invoke (MonoAsyncResult *ares)
49 {
50         ASyncCall *ac = (ASyncCall *)ares->data;
51
52         ac->msg->exc = NULL;
53         ac->res = mono_message_invoke (ares->async_delegate, ac->msg, 
54                                        &ac->msg->exc, &ac->out_args);
55
56         ares->completed = 1;
57                 
58         /* notify listeners */
59         ReleaseSemaphore (ac->wait_semaphore, 0x7fffffff, NULL);
60
61         /* call async callback if cb_method != null*/
62         if (ac->cb_method) {
63                 MonoObject *exc = NULL;
64                 void *pa = &ares;
65                 mono_runtime_invoke (ac->cb_method, ac->cb_target, pa, &exc);
66                 if (!ac->msg->exc)
67                         ac->msg->exc = exc;
68         }
69
70         mono_g_hash_table_remove (ares_htable, ares);
71 }
72
73 MonoAsyncResult *
74 mono_thread_pool_add (MonoObject *target, MonoMethodMessage *msg, MonoDelegate *async_callback,
75                       MonoObject *state)
76 {
77         MonoDomain *domain = mono_domain_get ();
78         MonoAsyncResult *ares;
79         ASyncCall *ac;
80
81 #ifdef HAVE_BOEHM_GC
82         ac = GC_MALLOC (sizeof (ASyncCall));
83 #else
84         /* We'll leak the semaphore... */
85         ac = g_new0 (ASyncCall, 1);
86 #endif
87         ac->wait_semaphore = CreateSemaphore (NULL, 0, 0x7fffffff, NULL);       
88         ac->msg = msg;
89         ac->state = state;
90
91         if (async_callback) {
92                 ac->cb_method = mono_get_delegate_invoke (((MonoObject *)async_callback)->vtable->klass);
93                 ac->cb_target = async_callback;
94         }
95
96         if (!ares_htable)
97                 ares_htable = mono_g_hash_table_new (NULL, NULL);
98
99         ares = mono_async_result_new (domain, ac->wait_semaphore, ac->state, ac);
100         ares->async_delegate = target;
101
102         mono_g_hash_table_insert (ares_htable, ares, ares);
103
104         EnterCriticalSection (&mono_delegate_section);  
105         async_call_queue = g_list_append (async_call_queue, ares); 
106         ReleaseSemaphore (mono_delegate_semaphore, 1, NULL);
107
108         if (mono_worker_threads <= busy_worker_threads &&
109             mono_worker_threads < mono_max_worker_threads) {
110                 mono_worker_threads++;
111                 mono_thread_create (domain, async_invoke_thread, NULL);
112         }
113         LeaveCriticalSection (&mono_delegate_section);
114
115         return ares;
116 }
117
118 MonoObject *
119 mono_thread_pool_finish (MonoAsyncResult *ares, MonoArray **out_args, MonoObject **exc)
120 {
121         ASyncCall *ac;
122         GList *l;
123
124         *exc = NULL;
125         *out_args = NULL;
126
127         EnterCriticalSection (&mono_delegate_section);  
128         /* check if already finished */
129         if (ares->endinvoke_called) {
130                 *exc = (MonoObject *)mono_exception_from_name (mono_defaults.corlib, "System", 
131                                               "InvalidOperationException");
132                 LeaveCriticalSection (&mono_delegate_section);
133                 return NULL;
134         }
135
136         ares->endinvoke_called = 1;
137         ac = (ASyncCall *)ares->data;
138
139         g_assert (ac != NULL);
140
141         if ((l = g_list_find (async_call_queue, ares))) {
142                 async_call_queue = g_list_remove_link (async_call_queue, l);
143         }       
144         
145         LeaveCriticalSection (&mono_delegate_section);
146
147         if (l) {
148                 g_list_free_1 (l);
149                 mono_async_invoke (ares);
150         }
151                 
152         
153         /* wait until we are really finished */
154         WaitForSingleObject (ac->wait_semaphore, INFINITE);
155
156         *exc = ac->msg->exc;
157         *out_args = ac->out_args;
158
159
160         return ac->res;
161 }
162
163 static void
164 async_invoke_thread ()
165 {
166         MonoDomain *domain;
167         MonoThread *thread;
168  
169         thread = mono_thread_current ();
170         thread->threadpool_thread = TRUE;
171         for (;;) {
172                 MonoAsyncResult *ar;
173
174                 if (WaitForSingleObject (mono_delegate_semaphore, 500) == WAIT_TIMEOUT) {
175                         EnterCriticalSection (&mono_delegate_section);
176                         mono_worker_threads--;
177                         LeaveCriticalSection (&mono_delegate_section);
178                         ExitThread (0);
179                 }
180                 
181                 ar = NULL;
182                 EnterCriticalSection (&mono_delegate_section);
183                 
184                 if (async_call_queue) {
185                         GList *tmp;
186
187                         ar = (MonoAsyncResult *)async_call_queue->data;
188                         tmp = async_call_queue;
189                         async_call_queue = g_list_remove_link (async_call_queue, async_call_queue); 
190                         g_list_free_1 (tmp);
191                 }
192
193                 if (!ar) {
194                         LeaveCriticalSection (&mono_delegate_section);
195                         continue;
196                 }
197                 
198                 busy_worker_threads++;
199
200                 LeaveCriticalSection (&mono_delegate_section);
201                 /* worker threads invokes methods in different domains,
202                  * so we need to set the right domain here */
203                 domain = ((MonoObject *)ar)->vtable->domain;
204                 mono_domain_set (domain);
205
206                 mono_async_invoke (ar);
207
208                 EnterCriticalSection (&mono_delegate_section);
209                 busy_worker_threads--;
210                 LeaveCriticalSection (&mono_delegate_section);
211
212         }
213
214         g_assert_not_reached ();
215 }