2003-06-17 Zoltan Varga <vargaz@freemail.hu>
[mono.git] / mono / metadata / threadpool.c
1 /*
2  * threadpool.c: global thread pool
3  *
4  * Authors:
5  *   Dietmar Maurer (dietmar@ximian.com)
6  *
7  * (C) 2001 Ximian, Inc.
8  */
9
10 #include <config.h>
11 #include <glib.h>
12
13 #include <mono/metadata/appdomain.h>
14 #include <mono/metadata/tabledefs.h>
15 #include <mono/metadata/threads.h>
16 #include <mono/metadata/exception.h>
17 #include <mono/io-layer/io-layer.h>
18 #include <mono/os/gc_wrapper.h>
19
20 #include "threadpool.h"
21
22 /* FIXME:
23  * - worker threads need to be initialized correctly.
24  * - worker threads should be domain specific
25  */
26
27 /* maximum number of worker threads */
28 int mono_worker_threads = 1;
29
30 static int workers = 0;
31
32 typedef struct {
33         MonoMethodMessage *msg;
34         HANDLE             wait_semaphore;
35         MonoMethod        *cb_method;
36         MonoDelegate      *cb_target;
37         MonoObject        *state;
38         MonoObject        *res;
39         MonoArray         *out_args;
40 } ASyncCall;
41
42 static void async_invoke_thread (void);
43
44 static GList *async_call_queue = NULL;
45
46 static void
47 mono_async_invoke (MonoAsyncResult *ares)
48 {
49         ASyncCall *ac = (ASyncCall *)ares->data;
50
51         ac->msg->exc = NULL;
52         ac->res = mono_message_invoke (ares->async_delegate, ac->msg, 
53                                        &ac->msg->exc, &ac->out_args);
54
55         ares->completed = 1;
56                 
57         /* notify listeners */
58         ReleaseSemaphore (ac->wait_semaphore, 0x7fffffff, NULL);
59
60         /* call async callback if cb_method != null*/
61         if (ac->cb_method) {
62                 MonoObject *exc = NULL;
63                 void *pa = &ares;
64                 mono_runtime_invoke (ac->cb_method, ac->cb_target, pa, &exc);
65                 if (!ac->msg->exc)
66                         ac->msg->exc = exc;
67         }
68 }
69
70 #ifdef HAVE_BOEHM_GC
71 static void
72 async_call_finalizer (void *o, void *data)
73 {
74         ASyncCall *ac = o;
75
76         CloseHandle (ac->wait_semaphore);
77 }
78 #endif
79
80 MonoAsyncResult *
81 mono_thread_pool_add (MonoObject *target, MonoMethodMessage *msg, MonoDelegate *async_callback,
82                       MonoObject *state)
83 {
84         MonoDomain *domain = mono_domain_get ();
85         MonoAsyncResult *ares;
86         ASyncCall *ac;
87
88 #ifdef HAVE_BOEHM_GC
89         ac = GC_MALLOC (sizeof (ASyncCall));
90         GC_REGISTER_FINALIZER_NO_ORDER (ac, async_call_finalizer, NULL, NULL, NULL);
91 #else
92         /* We'll leak the semaphore... */
93         ac = g_new0 (ASyncCall, 1);
94 #endif
95         ac->wait_semaphore = CreateSemaphore (NULL, 0, 0x7fffffff, NULL);       
96         ac->msg = msg;
97         ac->state = state;
98
99         if (async_callback) {
100                 ac->cb_method = mono_get_delegate_invoke (((MonoObject *)async_callback)->vtable->klass);
101                 ac->cb_target = async_callback;
102         }
103
104         ares = mono_async_result_new (domain, ac->wait_semaphore, ac->state, ac);
105         ares->async_delegate = target;
106
107         EnterCriticalSection (&mono_delegate_section);  
108         async_call_queue = g_list_append (async_call_queue, ares); 
109         ReleaseSemaphore (mono_delegate_semaphore, 1, NULL);
110
111         if (workers == 0) {
112                 workers++;
113                 mono_thread_create (domain, async_invoke_thread, NULL);
114         }
115         LeaveCriticalSection (&mono_delegate_section);
116
117         return ares;
118 }
119
120 MonoObject *
121 mono_thread_pool_finish (MonoAsyncResult *ares, MonoArray **out_args, MonoObject **exc)
122 {
123         ASyncCall *ac;
124         GList *l;
125
126         *exc = NULL;
127         *out_args = NULL;
128
129         EnterCriticalSection (&mono_delegate_section);  
130         /* check if already finished */
131         if (ares->endinvoke_called) {
132                 *exc = (MonoObject *)mono_exception_from_name (mono_defaults.corlib, "System", 
133                                               "InvalidOperationException");
134                 LeaveCriticalSection (&mono_delegate_section);
135                 return NULL;
136         }
137
138         ares->endinvoke_called = 1;
139         ac = (ASyncCall *)ares->data;
140
141         g_assert (ac != NULL);
142
143         if ((l = g_list_find (async_call_queue, ares))) {
144                 async_call_queue = g_list_remove_link (async_call_queue, l);
145                 mono_async_invoke (ares);
146         }               
147         LeaveCriticalSection (&mono_delegate_section);
148         
149         /* wait until we are really finished */
150         WaitForSingleObject (ac->wait_semaphore, INFINITE);
151
152         *exc = ac->msg->exc;
153         *out_args = ac->out_args;
154
155         return ac->res;
156 }
157
158 static void
159 async_invoke_thread ()
160 {
161         MonoDomain *domain;
162  
163         for (;;) {
164                 MonoAsyncResult *ar;
165                 gboolean new_worker = FALSE;
166
167                 if (WaitForSingleObject (mono_delegate_semaphore, 500) == WAIT_TIMEOUT) {
168                         EnterCriticalSection (&mono_delegate_section);
169                         workers--;
170                         LeaveCriticalSection (&mono_delegate_section);
171                         ExitThread (0);
172                 }
173                 
174                 ar = NULL;
175                 EnterCriticalSection (&mono_delegate_section);
176                 
177                 if (async_call_queue) {
178                         if ((g_list_length (async_call_queue) > 1) && 
179                             (workers < mono_worker_threads)) {
180                                 new_worker = TRUE;
181                                 workers++;
182                         }
183
184                         ar = (MonoAsyncResult *)async_call_queue->data;
185                         async_call_queue = g_list_remove_link (async_call_queue, async_call_queue); 
186
187                 }
188
189                 LeaveCriticalSection (&mono_delegate_section);
190
191                 if (!ar)
192                         continue;
193                 
194                 /* worker threads invokes methods in different domains,
195                  * so we need to set the right domain here */
196                 domain = ((MonoObject *)ar)->vtable->domain;
197                 mono_domain_set (domain);
198
199                 if (new_worker)
200                         mono_thread_create (domain, async_invoke_thread, NULL);
201
202                 mono_async_invoke (ar);
203         }
204
205         g_assert_not_reached ();
206 }