2004-01-19 Zoltan Varga <vargaz@freemail.hu>
[mono.git] / mono / metadata / threadpool.c
1 /*
2  * threadpool.c: global thread pool
3  *
4  * Authors:
5  *   Dietmar Maurer (dietmar@ximian.com)
6  *   Gonzalo Paniagua Javier (gonzalo@ximian.com)
7  *
8  * (C) 2001 Ximian, Inc.
9  */
10
11 #include <config.h>
12 #include <glib.h>
13
14 #include <mono/metadata/appdomain.h>
15 #include <mono/metadata/tabledefs.h>
16 #include <mono/metadata/threads.h>
17 #include <mono/metadata/exception.h>
18 #include <mono/io-layer/io-layer.h>
19 #include <mono/os/gc_wrapper.h>
20
21 #include "threadpool.h"
22
23 /* maximum number of worker threads */
24 int mono_max_worker_threads = 25; /* fixme: should be 25 per available CPU */
25 /* current number of worker threads */
26 static int mono_worker_threads = 0;
27
28 /* current number of busy threads */
29 int busy_worker_threads = 0;
30
31 /* we use this to store a reference to the AsyncResult to avoid GC */
32 static MonoGHashTable *ares_htable = NULL;
33
34 /* we append a job */
35 static HANDLE job_added;
36
37 typedef struct {
38         MonoMethodMessage *msg;
39         HANDLE             wait_semaphore;
40         MonoMethod        *cb_method;
41         MonoDelegate      *cb_target;
42         MonoObject        *state;
43         MonoObject        *res;
44         MonoArray         *out_args;
45 } ASyncCall;
46
47 static void async_invoke_thread (gpointer data);
48 static void append_job (MonoAsyncResult *ar);
49
50 static GList *async_call_queue = NULL;
51
52 static void
53 mono_async_invoke (MonoAsyncResult *ares)
54 {
55         ASyncCall *ac = (ASyncCall *)ares->data;
56
57         ac->msg->exc = NULL;
58         ac->res = mono_message_invoke (ares->async_delegate, ac->msg, 
59                                        &ac->msg->exc, &ac->out_args);
60
61         ares->completed = 1;
62
63         /* call async callback if cb_method != null*/
64         if (ac->cb_method) {
65                 MonoObject *exc = NULL;
66                 void *pa = &ares;
67                 mono_runtime_invoke (ac->cb_method, ac->cb_target, pa, &exc);
68                 if (!ac->msg->exc)
69                         ac->msg->exc = exc;
70         }
71
72         /* notify listeners */
73         ReleaseSemaphore (ac->wait_semaphore, 0x7fffffff, NULL);
74
75         mono_g_hash_table_remove (ares_htable, ares);
76 }
77
78 MonoAsyncResult *
79 mono_thread_pool_add (MonoObject *target, MonoMethodMessage *msg, MonoDelegate *async_callback,
80                       MonoObject *state)
81 {
82         MonoDomain *domain = mono_domain_get ();
83         MonoAsyncResult *ares;
84         ASyncCall *ac;
85         int busy, worker;
86
87 #ifdef HAVE_BOEHM_GC
88         ac = GC_MALLOC (sizeof (ASyncCall));
89 #else
90         /* We'll leak the semaphore... */
91         ac = g_new0 (ASyncCall, 1);
92 #endif
93         ac->wait_semaphore = CreateSemaphore (NULL, 0, 0x7fffffff, NULL);       
94         ac->msg = msg;
95         ac->state = state;
96
97         if (async_callback) {
98                 ac->cb_method = mono_get_delegate_invoke (((MonoObject *)async_callback)->vtable->klass);
99                 ac->cb_target = async_callback;
100         }
101
102         ares = mono_async_result_new (domain, ac->wait_semaphore, ac->state, ac);
103         ares->async_delegate = target;
104
105         if (!ares_htable) {
106                 ares_htable = mono_g_hash_table_new (NULL, NULL);
107                 job_added = CreateSemaphore (NULL, 0, 0x7fffffff, NULL);
108         }
109
110         mono_g_hash_table_insert (ares_htable, ares, ares);
111
112         busy = (int) InterlockedCompareExchange (&busy_worker_threads, 0, -1);
113         worker = (int) InterlockedCompareExchange (&mono_worker_threads, 0, -1); 
114         if (worker <= ++busy &&
115             worker < mono_max_worker_threads) {
116                 InterlockedIncrement (&mono_worker_threads);
117                 InterlockedIncrement (&busy_worker_threads);
118                 mono_thread_create (domain, async_invoke_thread, ares);
119         } else {
120                 append_job (ares);
121                 ReleaseSemaphore (job_added, 1, NULL);
122         }
123
124         return ares;
125 }
126
127 MonoObject *
128 mono_thread_pool_finish (MonoAsyncResult *ares, MonoArray **out_args, MonoObject **exc)
129 {
130         ASyncCall *ac;
131
132         *exc = NULL;
133         *out_args = NULL;
134
135         /* check if already finished */
136         if (ares->endinvoke_called) {
137                 *exc = (MonoObject *)mono_exception_from_name (mono_defaults.corlib, "System", 
138                                               "InvalidOperationException");
139                 return NULL;
140         }
141
142         ares->endinvoke_called = 1;
143         ac = (ASyncCall *)ares->data;
144
145         g_assert (ac != NULL);
146
147         /* wait until we are really finished */
148         if (!ares->completed)
149                 WaitForSingleObject (ac->wait_semaphore, INFINITE);
150
151         *exc = ac->msg->exc;
152         *out_args = ac->out_args;
153
154         return ac->res;
155 }
156
157 static void
158 append_job (MonoAsyncResult *ar)
159 {
160         EnterCriticalSection (&mono_delegate_section);
161         async_call_queue = g_list_append (async_call_queue, ar); 
162         LeaveCriticalSection (&mono_delegate_section);
163 }
164
165 static MonoAsyncResult *
166 dequeue_job (void)
167 {
168         MonoAsyncResult *ar = NULL;
169         GList *tmp = NULL;
170
171         EnterCriticalSection (&mono_delegate_section);
172         if (async_call_queue) {
173                 ar = (MonoAsyncResult *)async_call_queue->data;
174                 tmp = async_call_queue;
175                 async_call_queue = g_list_remove_link (tmp, tmp); 
176         }
177         LeaveCriticalSection (&mono_delegate_section);
178         if (tmp)
179                 g_list_free_1 (tmp);
180
181         return ar;
182 }
183
184 static void
185 async_invoke_thread (gpointer data)
186 {
187         MonoDomain *domain;
188         MonoThread *thread;
189  
190         thread = mono_thread_current ();
191         thread->threadpool_thread = TRUE;
192         thread->state |= ThreadState_Background;
193         for (;;) {
194                 MonoAsyncResult *ar;
195
196                 ar = (MonoAsyncResult *) data;
197                 if (ar) {
198                         /* worker threads invokes methods in different domains,
199                          * so we need to set the right domain here */
200                         domain = ((MonoObject *)ar)->vtable->domain;
201                         if (mono_domain_set (domain, FALSE))
202                                 mono_async_invoke (ar);
203                         InterlockedDecrement (&busy_worker_threads);
204                 }
205
206                 data = dequeue_job ();
207                 if (!data && WaitForSingleObject (job_added, 500) != WAIT_TIMEOUT)
208                         data = dequeue_job ();
209
210                 if (!data) {
211                         InterlockedDecrement (&mono_worker_threads);
212                         ExitThread (0);
213                 }
214                 InterlockedIncrement (&busy_worker_threads);
215         }
216
217         g_assert_not_reached ();
218 }
219