[sgen] Shutdown thread pool worker even if it's not active
[mono.git] / mono / sgen / sgen-thread-pool.c
1 /**
2  * \file
3  * Threadpool for all concurrent GC work.
4  *
5  * Copyright (C) 2015 Xamarin Inc
6  *
7  * Licensed under the MIT license. See LICENSE file in the project root for full license information.
8  */
9
10 #include "config.h"
11 #ifdef HAVE_SGEN_GC
12
13 #include "mono/sgen/sgen-gc.h"
14 #include "mono/sgen/sgen-thread-pool.h"
15 #include "mono/sgen/sgen-pointer-queue.h"
16 #include "mono/utils/mono-os-mutex.h"
17 #ifndef SGEN_WITHOUT_MONO
18 #include "mono/utils/mono-threads.h"
19 #endif
20
21 #define MAX_NUM_THREADS 8
22
23 static mono_mutex_t lock;
24 static mono_cond_t work_cond;
25 static mono_cond_t done_cond;
26
27 static int threads_num = 0;
28 static MonoNativeThreadId threads [MAX_NUM_THREADS];
29
30 /* Only accessed with the lock held. */
31 static SgenPointerQueue job_queue;
32
33 static SgenThreadPoolThreadInitFunc thread_init_func;
34 static SgenThreadPoolIdleJobFunc idle_job_func;
35 static SgenThreadPoolContinueIdleJobFunc continue_idle_job_func;
36 static SgenThreadPoolShouldWorkFunc should_work_func;
37
38 static volatile gboolean threadpool_shutdown;
39 static volatile int threads_finished = 0;
40
41 enum {
42         STATE_WAITING,
43         STATE_IN_PROGRESS,
44         STATE_DONE
45 };
46
47 /* Assumes that the lock is held. */
48 static SgenThreadPoolJob*
49 get_job_and_set_in_progress (void)
50 {
51         for (size_t i = 0; i < job_queue.next_slot; ++i) {
52                 SgenThreadPoolJob *job = (SgenThreadPoolJob *)job_queue.data [i];
53                 if (job->state == STATE_WAITING) {
54                         job->state = STATE_IN_PROGRESS;
55                         return job;
56                 }
57         }
58         return NULL;
59 }
60
61 /* Assumes that the lock is held. */
62 static ssize_t
63 find_job_in_queue (SgenThreadPoolJob *job)
64 {
65         for (ssize_t i = 0; i < job_queue.next_slot; ++i) {
66                 if (job_queue.data [i] == job)
67                         return i;
68         }
69         return -1;
70 }
71
72 /* Assumes that the lock is held. */
73 static void
74 remove_job (SgenThreadPoolJob *job)
75 {
76         ssize_t index;
77         SGEN_ASSERT (0, job->state == STATE_DONE, "Why are we removing a job that's not done?");
78         index = find_job_in_queue (job);
79         SGEN_ASSERT (0, index >= 0, "Why is the job we're trying to remove not in the queue?");
80         job_queue.data [index] = NULL;
81         sgen_pointer_queue_remove_nulls (&job_queue);
82         sgen_thread_pool_job_free (job);
83 }
84
85 static gboolean
86 continue_idle_job (void *thread_data)
87 {
88         if (!continue_idle_job_func)
89                 return FALSE;
90         return continue_idle_job_func (thread_data);
91 }
92
93 static gboolean
94 should_work (void *thread_data)
95 {
96         if (!should_work_func)
97                 return TRUE;
98         return should_work_func (thread_data);
99 }
100
101 static mono_native_thread_return_t
102 thread_func (void *thread_data)
103 {
104         thread_init_func (thread_data);
105
106         mono_os_mutex_lock (&lock);
107         for (;;) {
108                 gboolean do_idle;
109                 SgenThreadPoolJob *job;
110
111                 if (!should_work (thread_data) && !threadpool_shutdown) {
112                         mono_os_cond_wait (&work_cond, &lock);
113                         continue;
114                 }
115                 /*
116                  * It's important that we check the continue idle flag with the lock held.
117                  * Suppose we didn't check with the lock held, and the result is FALSE.  The
118                  * main thread might then set continue idle and signal us before we can take
119                  * the lock, and we'd lose the signal.
120                  */
121                 do_idle = continue_idle_job (thread_data);
122                 job = get_job_and_set_in_progress ();
123
124                 if (!job && !do_idle && !threadpool_shutdown) {
125                         /*
126                          * pthread_cond_wait() can return successfully despite the condition
127                          * not being signalled, so we have to run this in a loop until we
128                          * really have work to do.
129                          */
130                         mono_os_cond_wait (&work_cond, &lock);
131                         continue;
132                 }
133
134                 mono_os_mutex_unlock (&lock);
135
136                 if (job) {
137                         job->func (thread_data, job);
138
139                         mono_os_mutex_lock (&lock);
140
141                         SGEN_ASSERT (0, job->state == STATE_IN_PROGRESS, "The job should still be in progress.");
142                         job->state = STATE_DONE;
143                         remove_job (job);
144                         /*
145                          * Only the main GC thread will ever wait on the done condition, so we don't
146                          * have to broadcast.
147                          */
148                         mono_os_cond_signal (&done_cond);
149                 } else if (do_idle) {
150                         SGEN_ASSERT (0, idle_job_func, "Why do we have idle work when there's no idle job function?");
151                         do {
152                                 idle_job_func (thread_data);
153                                 do_idle = continue_idle_job (thread_data);
154                         } while (do_idle && !job_queue.next_slot);
155
156                         mono_os_mutex_lock (&lock);
157
158                         if (!do_idle)
159                                 mono_os_cond_signal (&done_cond);
160                 } else {
161                         SGEN_ASSERT (0, threadpool_shutdown, "Why did we unlock if no jobs and not shutting down?");
162                         mono_os_mutex_lock (&lock);
163                         threads_finished++;
164                         mono_os_cond_signal (&done_cond);
165                         mono_os_mutex_unlock (&lock);
166                         return 0;
167                 }
168         }
169
170         return (mono_native_thread_return_t)0;
171 }
172
173 void
174 sgen_thread_pool_init (int num_threads, SgenThreadPoolThreadInitFunc init_func, SgenThreadPoolIdleJobFunc idle_func, SgenThreadPoolContinueIdleJobFunc continue_idle_func, SgenThreadPoolShouldWorkFunc should_work_func_p, void **thread_datas)
175 {
176         int i;
177
178         threads_num = (num_threads < MAX_NUM_THREADS) ? num_threads : MAX_NUM_THREADS;
179
180         mono_os_mutex_init (&lock);
181         mono_os_cond_init (&work_cond);
182         mono_os_cond_init (&done_cond);
183
184         thread_init_func = init_func;
185         idle_job_func = idle_func;
186         continue_idle_job_func = continue_idle_func;
187         should_work_func = should_work_func_p;
188
189         for (i = 0; i < threads_num; i++)
190                 mono_native_thread_create (&threads [i], thread_func, thread_datas ? thread_datas [i] : NULL);
191 }
192
193 void
194 sgen_thread_pool_shutdown (void)
195 {
196         if (!threads_num)
197                 return;
198
199         mono_os_mutex_lock (&lock);
200         threadpool_shutdown = TRUE;
201         mono_os_cond_broadcast (&work_cond);
202         while (threads_finished < threads_num)
203                 mono_os_cond_wait (&done_cond, &lock);
204         mono_os_mutex_unlock (&lock);
205
206         mono_os_mutex_destroy (&lock);
207         mono_os_cond_destroy (&work_cond);
208         mono_os_cond_destroy (&done_cond);
209 }
210
211 SgenThreadPoolJob*
212 sgen_thread_pool_job_alloc (const char *name, SgenThreadPoolJobFunc func, size_t size)
213 {
214         SgenThreadPoolJob *job = (SgenThreadPoolJob *)sgen_alloc_internal_dynamic (size, INTERNAL_MEM_THREAD_POOL_JOB, TRUE);
215         job->name = name;
216         job->size = size;
217         job->state = STATE_WAITING;
218         job->func = func;
219         return job;
220 }
221
222 void
223 sgen_thread_pool_job_free (SgenThreadPoolJob *job)
224 {
225         sgen_free_internal_dynamic (job, job->size, INTERNAL_MEM_THREAD_POOL_JOB);
226 }
227
228 void
229 sgen_thread_pool_job_enqueue (SgenThreadPoolJob *job)
230 {
231         mono_os_mutex_lock (&lock);
232
233         sgen_pointer_queue_add (&job_queue, job);
234         mono_os_cond_signal (&work_cond);
235
236         mono_os_mutex_unlock (&lock);
237 }
238
239 void
240 sgen_thread_pool_job_wait (SgenThreadPoolJob *job)
241 {
242         SGEN_ASSERT (0, job, "Where's the job?");
243
244         mono_os_mutex_lock (&lock);
245
246         while (find_job_in_queue (job) >= 0)
247                 mono_os_cond_wait (&done_cond, &lock);
248
249         mono_os_mutex_unlock (&lock);
250 }
251
252 void
253 sgen_thread_pool_idle_signal (void)
254 {
255         SGEN_ASSERT (0, idle_job_func, "Why are we signaling idle without an idle function?");
256
257         mono_os_mutex_lock (&lock);
258
259         if (continue_idle_job_func (NULL))
260                 mono_os_cond_broadcast (&work_cond);
261
262         mono_os_mutex_unlock (&lock);
263 }
264
265 void
266 sgen_thread_pool_idle_wait (void)
267 {
268         SGEN_ASSERT (0, idle_job_func, "Why are we waiting for idle without an idle function?");
269
270         mono_os_mutex_lock (&lock);
271
272         while (continue_idle_job_func (NULL))
273                 mono_os_cond_wait (&done_cond, &lock);
274
275         mono_os_mutex_unlock (&lock);
276 }
277
278 void
279 sgen_thread_pool_wait_for_all_jobs (void)
280 {
281         mono_os_mutex_lock (&lock);
282
283         while (!sgen_pointer_queue_is_empty (&job_queue))
284                 mono_os_cond_wait (&done_cond, &lock);
285
286         mono_os_mutex_unlock (&lock);
287 }
288
289 /* Return 0 if is not a thread pool thread or the thread number otherwise */
290 int
291 sgen_thread_pool_is_thread_pool_thread (MonoNativeThreadId some_thread)
292 {
293         int i;
294
295         for (i = 0; i < threads_num; i++) {
296                 if (some_thread == threads [i])
297                         return i + 1;
298         }
299
300         return 0;
301 }
302
303 #endif