Merge pull request #5714 from alexischr/update_bockbuild
[mono.git] / mono / sgen / sgen-thread-pool.c
1 /**
2  * \file
3  * Threadpool for all concurrent GC work.
4  *
5  * Copyright (C) 2015 Xamarin Inc
6  *
7  * Licensed under the MIT license. See LICENSE file in the project root for full license information.
8  */
9
10 #include "config.h"
11 #ifdef HAVE_SGEN_GC
12
13 #include "mono/sgen/sgen-gc.h"
14 #include "mono/sgen/sgen-thread-pool.h"
15 #include "mono/sgen/sgen-client.h"
16 #include "mono/utils/mono-os-mutex.h"
17
18 static mono_mutex_t lock;
19 static mono_cond_t work_cond;
20 static mono_cond_t done_cond;
21
22 static int threads_num;
23 static MonoNativeThreadId threads [SGEN_THREADPOOL_MAX_NUM_THREADS];
24 static int threads_context [SGEN_THREADPOOL_MAX_NUM_THREADS];
25
26 static volatile gboolean threadpool_shutdown;
27 static volatile int threads_finished;
28
29 static int contexts_num;
30 static SgenThreadPoolContext pool_contexts [SGEN_THREADPOOL_MAX_NUM_CONTEXTS];
31
32 enum {
33         STATE_WAITING,
34         STATE_IN_PROGRESS,
35         STATE_DONE
36 };
37
38 /* Assumes that the lock is held. */
39 static SgenThreadPoolJob*
40 get_job_and_set_in_progress (SgenThreadPoolContext *context)
41 {
42         for (size_t i = 0; i < context->job_queue.next_slot; ++i) {
43                 SgenThreadPoolJob *job = (SgenThreadPoolJob *)context->job_queue.data [i];
44                 if (job->state == STATE_WAITING) {
45                         job->state = STATE_IN_PROGRESS;
46                         return job;
47                 }
48         }
49         return NULL;
50 }
51
52 /* Assumes that the lock is held. */
53 static ssize_t
54 find_job_in_queue (SgenThreadPoolContext *context, SgenThreadPoolJob *job)
55 {
56         for (ssize_t i = 0; i < context->job_queue.next_slot; ++i) {
57                 if (context->job_queue.data [i] == job)
58                         return i;
59         }
60         return -1;
61 }
62
63 /* Assumes that the lock is held. */
64 static void
65 remove_job (SgenThreadPoolContext *context, SgenThreadPoolJob *job)
66 {
67         ssize_t index;
68         SGEN_ASSERT (0, job->state == STATE_DONE, "Why are we removing a job that's not done?");
69         index = find_job_in_queue (context, job);
70         SGEN_ASSERT (0, index >= 0, "Why is the job we're trying to remove not in the queue?");
71         context->job_queue.data [index] = NULL;
72         sgen_pointer_queue_remove_nulls (&context->job_queue);
73         sgen_thread_pool_job_free (job);
74 }
75
76 static gboolean
77 continue_idle_job (SgenThreadPoolContext *context, void *thread_data)
78 {
79         if (!context->continue_idle_job_func)
80                 return FALSE;
81         return context->continue_idle_job_func (thread_data, context - pool_contexts);
82 }
83
84 static gboolean
85 should_work (SgenThreadPoolContext *context, void *thread_data)
86 {
87         if (!context->should_work_func)
88                 return TRUE;
89         return context->should_work_func (thread_data);
90 }
91
92 /*
93  * Tells whether we should lock and attempt to get work from
94  * a higher priority context.
95  */
96 static gboolean
97 has_priority_work (int worker_index, int current_context)
98 {
99         int i;
100
101         for (i = 0; i < current_context; i++) {
102                 SgenThreadPoolContext *context = &pool_contexts [i];
103                 void *thread_data;
104
105                 if (worker_index >= context->num_threads)
106                         continue;
107                 thread_data = (context->thread_datas) ? context->thread_datas [worker_index] : NULL;
108                 if (!should_work (context, thread_data))
109                         continue;
110                 if (context->job_queue.next_slot > 0)
111                         return TRUE;
112                 if (continue_idle_job (context, thread_data))
113                         return TRUE;
114         }
115
116         /* Return if job enqueued on current context. Jobs have priority over idle work */
117         if (pool_contexts [current_context].job_queue.next_slot > 0)
118                 return TRUE;
119
120         return FALSE;
121 }
122
123 /*
124  * Gets the highest priority work. If there is none, it waits
125  * for work_cond. Should always be called with lock held.
126  */
127 static void
128 get_work (int worker_index, int *work_context, int *do_idle, SgenThreadPoolJob **job)
129 {
130         while (!threadpool_shutdown) {
131                 int i;
132
133                 for (i = 0; i < contexts_num; i++) {
134                         SgenThreadPoolContext *context = &pool_contexts [i];
135                         void *thread_data;
136
137                         if (worker_index >= context->num_threads)
138                                 continue;
139                         thread_data = (context->thread_datas) ? context->thread_datas [worker_index] : NULL;
140
141                         if (!should_work (context, thread_data))
142                                 continue;
143
144                         /*
145                          * It's important that we check the continue idle flag with the lock held.
146                          * Suppose we didn't check with the lock held, and the result is FALSE.  The
147                          * main thread might then set continue idle and signal us before we can take
148                          * the lock, and we'd lose the signal.
149                          */
150                         *do_idle = continue_idle_job (context, thread_data);
151                         *job = get_job_and_set_in_progress (context);
152
153                         if (*job || *do_idle) {
154                                 *work_context = i;
155                                 return;
156                         }
157                 }
158
159                 /*
160                  * Nothing to do on any context
161                  * pthread_cond_wait() can return successfully despite the condition
162                  * not being signalled, so we have to run this in a loop until we
163                  * really have work to do.
164                  */
165                 mono_os_cond_wait (&work_cond, &lock);
166         }
167 }
168
169 static mono_native_thread_return_t
170 thread_func (void *data)
171 {
172         int worker_index = (int)(gsize)data;
173         int current_context;
174         void *thread_data = NULL;
175
176         sgen_client_thread_register_worker ();
177
178         for (current_context = 0; current_context < contexts_num; current_context++) {
179                 if (worker_index >= pool_contexts [current_context].num_threads ||
180                                 !pool_contexts [current_context].thread_init_func)
181                         break;
182
183                 thread_data = (pool_contexts [current_context].thread_datas) ? pool_contexts [current_context].thread_datas [worker_index] : NULL;
184                 pool_contexts [current_context].thread_init_func (thread_data);
185         }
186
187         current_context = 0;
188
189         mono_os_mutex_lock (&lock);
190         for (;;) {
191                 gboolean do_idle = FALSE;
192                 SgenThreadPoolJob *job = NULL;
193                 SgenThreadPoolContext *context = NULL;
194
195                 threads_context [worker_index] = -1;
196                 get_work (worker_index, &current_context, &do_idle, &job);
197                 threads_context [worker_index] = current_context;
198
199                 if (!threadpool_shutdown) {
200                         context = &pool_contexts [current_context];
201                         thread_data = (context->thread_datas) ? context->thread_datas [worker_index] : NULL;
202                 }
203
204                 mono_os_mutex_unlock (&lock);
205
206                 if (job) {
207                         job->func (thread_data, job);
208
209                         mono_os_mutex_lock (&lock);
210
211                         SGEN_ASSERT (0, job->state == STATE_IN_PROGRESS, "The job should still be in progress.");
212                         job->state = STATE_DONE;
213                         remove_job (context, job);
214                         /*
215                          * Only the main GC thread will ever wait on the done condition, so we don't
216                          * have to broadcast.
217                          */
218                         mono_os_cond_signal (&done_cond);
219                 } else if (do_idle) {
220                         SGEN_ASSERT (0, context->idle_job_func, "Why do we have idle work when there's no idle job function?");
221                         do {
222                                 context->idle_job_func (thread_data);
223                                 do_idle = continue_idle_job (context, thread_data);
224                         } while (do_idle && !has_priority_work (worker_index, current_context));
225
226                         mono_os_mutex_lock (&lock);
227
228                         if (!do_idle)
229                                 mono_os_cond_signal (&done_cond);
230                 } else {
231                         SGEN_ASSERT (0, threadpool_shutdown, "Why did we unlock if no jobs and not shutting down?");
232                         mono_os_mutex_lock (&lock);
233                         threads_finished++;
234                         mono_os_cond_signal (&done_cond);
235                         mono_os_mutex_unlock (&lock);
236                         return 0;
237                 }
238         }
239
240         return (mono_native_thread_return_t)0;
241 }
242
243 int
244 sgen_thread_pool_create_context (int num_threads, SgenThreadPoolThreadInitFunc init_func, SgenThreadPoolIdleJobFunc idle_func, SgenThreadPoolContinueIdleJobFunc continue_idle_func, SgenThreadPoolShouldWorkFunc should_work_func, void **thread_datas)
245 {
246         int context_id = contexts_num;
247
248         SGEN_ASSERT (0, contexts_num < SGEN_THREADPOOL_MAX_NUM_CONTEXTS, "Maximum sgen thread pool contexts reached");
249
250         pool_contexts [context_id].thread_init_func = init_func;
251         pool_contexts [context_id].idle_job_func = idle_func;
252         pool_contexts [context_id].continue_idle_job_func = continue_idle_func;
253         pool_contexts [context_id].should_work_func = should_work_func;
254         pool_contexts [context_id].thread_datas = thread_datas;
255
256         SGEN_ASSERT (0, num_threads <= SGEN_THREADPOOL_MAX_NUM_THREADS, "Maximum sgen thread pool threads exceeded");
257
258         pool_contexts [context_id].num_threads = num_threads;
259
260         sgen_pointer_queue_init (&pool_contexts [contexts_num].job_queue, 0);
261
262         contexts_num++;
263
264         return context_id;
265 }
266
267 void
268 sgen_thread_pool_start (void)
269 {
270         int i;
271
272         for (i = 0; i < contexts_num; i++) {
273                 if (threads_num < pool_contexts [i].num_threads)
274                         threads_num = pool_contexts [i].num_threads;
275         }
276
277         if (!threads_num)
278                 return;
279
280         mono_os_mutex_init (&lock);
281         mono_os_cond_init (&work_cond);
282         mono_os_cond_init (&done_cond);
283
284         threads_finished = 0;
285         threadpool_shutdown = FALSE;
286
287         for (i = 0; i < threads_num; i++) {
288                 mono_native_thread_create (&threads [i], thread_func, (void*)(gsize)i);
289         }
290 }
291
292 void
293 sgen_thread_pool_shutdown (void)
294 {
295         if (!threads_num)
296                 return;
297
298         mono_os_mutex_lock (&lock);
299         threadpool_shutdown = TRUE;
300         mono_os_cond_broadcast (&work_cond);
301         while (threads_finished < threads_num)
302                 mono_os_cond_wait (&done_cond, &lock);
303         mono_os_mutex_unlock (&lock);
304
305         mono_os_mutex_destroy (&lock);
306         mono_os_cond_destroy (&work_cond);
307         mono_os_cond_destroy (&done_cond);
308
309         for (int i = 0; i < threads_num; i++) {
310                 mono_threads_add_joinable_thread ((gpointer)threads [i]);
311         }
312 }
313
314 SgenThreadPoolJob*
315 sgen_thread_pool_job_alloc (const char *name, SgenThreadPoolJobFunc func, size_t size)
316 {
317         SgenThreadPoolJob *job = (SgenThreadPoolJob *)sgen_alloc_internal_dynamic (size, INTERNAL_MEM_THREAD_POOL_JOB, TRUE);
318         job->name = name;
319         job->size = size;
320         job->state = STATE_WAITING;
321         job->func = func;
322         return job;
323 }
324
325 void
326 sgen_thread_pool_job_free (SgenThreadPoolJob *job)
327 {
328         sgen_free_internal_dynamic (job, job->size, INTERNAL_MEM_THREAD_POOL_JOB);
329 }
330
331 void
332 sgen_thread_pool_job_enqueue (int context_id, SgenThreadPoolJob *job)
333 {
334         mono_os_mutex_lock (&lock);
335
336         sgen_pointer_queue_add (&pool_contexts [context_id].job_queue, job);
337         mono_os_cond_broadcast (&work_cond);
338
339         mono_os_mutex_unlock (&lock);
340 }
341
342 void
343 sgen_thread_pool_job_wait (int context_id, SgenThreadPoolJob *job)
344 {
345         SGEN_ASSERT (0, job, "Where's the job?");
346
347         mono_os_mutex_lock (&lock);
348
349         while (find_job_in_queue (&pool_contexts [context_id], job) >= 0)
350                 mono_os_cond_wait (&done_cond, &lock);
351
352         mono_os_mutex_unlock (&lock);
353 }
354
355 void
356 sgen_thread_pool_idle_signal (int context_id)
357 {
358         SGEN_ASSERT (0, pool_contexts [context_id].idle_job_func, "Why are we signaling idle without an idle function?");
359
360         mono_os_mutex_lock (&lock);
361
362         if (pool_contexts [context_id].continue_idle_job_func (NULL, context_id))
363                 mono_os_cond_broadcast (&work_cond);
364
365         mono_os_mutex_unlock (&lock);
366 }
367
368 void
369 sgen_thread_pool_idle_wait (int context_id, SgenThreadPoolContinueIdleWaitFunc continue_wait)
370 {
371         SGEN_ASSERT (0, pool_contexts [context_id].idle_job_func, "Why are we waiting for idle without an idle function?");
372
373         mono_os_mutex_lock (&lock);
374
375         while (continue_wait (context_id, threads_context))
376                 mono_os_cond_wait (&done_cond, &lock);
377
378         mono_os_mutex_unlock (&lock);
379 }
380
381 void
382 sgen_thread_pool_wait_for_all_jobs (int context_id)
383 {
384         mono_os_mutex_lock (&lock);
385
386         while (!sgen_pointer_queue_is_empty (&pool_contexts [context_id].job_queue))
387                 mono_os_cond_wait (&done_cond, &lock);
388
389         mono_os_mutex_unlock (&lock);
390 }
391
392 /* Return 0 if is not a thread pool thread or the thread number otherwise */
393 int
394 sgen_thread_pool_is_thread_pool_thread (MonoNativeThreadId some_thread)
395 {
396         int i;
397
398         for (i = 0; i < threads_num; i++) {
399                 if (some_thread == threads [i])
400                         return i + 1;
401         }
402
403         return 0;
404 }
405
406 #endif