3 * Threadpool for all concurrent GC work.
5 * Copyright (C) 2015 Xamarin Inc
7 * Licensed under the MIT license. See LICENSE file in the project root for full license information.
13 #include "mono/sgen/sgen-gc.h"
14 #include "mono/sgen/sgen-thread-pool.h"
15 #include "mono/sgen/sgen-client.h"
16 #include "mono/utils/mono-os-mutex.h"
18 static mono_mutex_t lock;
19 static mono_cond_t work_cond;
20 static mono_cond_t done_cond;
22 static int threads_num;
23 static MonoNativeThreadId threads [SGEN_THREADPOOL_MAX_NUM_THREADS];
25 static volatile gboolean threadpool_shutdown;
26 static volatile int threads_finished;
28 static int contexts_num;
29 static SgenThreadPoolContext pool_contexts [SGEN_THREADPOOL_MAX_NUM_CONTEXTS];
37 /* Assumes that the lock is held. */
38 static SgenThreadPoolJob*
39 get_job_and_set_in_progress (SgenThreadPoolContext *context)
41 for (size_t i = 0; i < context->job_queue.next_slot; ++i) {
42 SgenThreadPoolJob *job = (SgenThreadPoolJob *)context->job_queue.data [i];
43 if (job->state == STATE_WAITING) {
44 job->state = STATE_IN_PROGRESS;
51 /* Assumes that the lock is held. */
53 find_job_in_queue (SgenThreadPoolContext *context, SgenThreadPoolJob *job)
55 for (ssize_t i = 0; i < context->job_queue.next_slot; ++i) {
56 if (context->job_queue.data [i] == job)
62 /* Assumes that the lock is held. */
64 remove_job (SgenThreadPoolContext *context, SgenThreadPoolJob *job)
67 SGEN_ASSERT (0, job->state == STATE_DONE, "Why are we removing a job that's not done?");
68 index = find_job_in_queue (context, job);
69 SGEN_ASSERT (0, index >= 0, "Why is the job we're trying to remove not in the queue?");
70 context->job_queue.data [index] = NULL;
71 sgen_pointer_queue_remove_nulls (&context->job_queue);
72 sgen_thread_pool_job_free (job);
76 continue_idle_job (SgenThreadPoolContext *context, void *thread_data)
78 if (!context->continue_idle_job_func)
80 return context->continue_idle_job_func (thread_data, context - pool_contexts);
84 should_work (SgenThreadPoolContext *context, void *thread_data)
86 if (!context->should_work_func)
88 return context->should_work_func (thread_data);
92 * Tells whether we should lock and attempt to get work from
93 * a higher priority context.
96 has_priority_work (int worker_index, int current_context)
100 for (i = 0; i < current_context; i++) {
101 SgenThreadPoolContext *context = &pool_contexts [i];
104 if (worker_index >= context->num_threads)
106 thread_data = (context->thread_datas) ? context->thread_datas [worker_index] : NULL;
107 if (!should_work (context, thread_data))
109 if (context->job_queue.next_slot > 0)
111 if (continue_idle_job (context, thread_data))
115 /* Return if job enqueued on current context. Jobs have priority over idle work */
116 if (pool_contexts [current_context].job_queue.next_slot > 0)
123 * Gets the highest priority work. If there is none, it waits
124 * for work_cond. Should always be called with lock held.
127 get_work (int worker_index, int *work_context, int *do_idle, SgenThreadPoolJob **job)
129 while (!threadpool_shutdown) {
132 for (i = 0; i < contexts_num; i++) {
133 SgenThreadPoolContext *context = &pool_contexts [i];
136 if (worker_index >= context->num_threads)
138 thread_data = (context->thread_datas) ? context->thread_datas [worker_index] : NULL;
140 if (!should_work (context, thread_data))
144 * It's important that we check the continue idle flag with the lock held.
145 * Suppose we didn't check with the lock held, and the result is FALSE. The
146 * main thread might then set continue idle and signal us before we can take
147 * the lock, and we'd lose the signal.
149 *do_idle = continue_idle_job (context, thread_data);
150 *job = get_job_and_set_in_progress (context);
152 if (*job || *do_idle) {
159 * Nothing to do on any context
160 * pthread_cond_wait() can return successfully despite the condition
161 * not being signalled, so we have to run this in a loop until we
162 * really have work to do.
164 mono_os_cond_wait (&work_cond, &lock);
168 static mono_native_thread_return_t
169 thread_func (int worker_index)
172 void *thread_data = NULL;
174 sgen_client_thread_register_worker ();
176 for (current_context = 0; current_context < contexts_num; current_context++) {
177 if (worker_index >= pool_contexts [current_context].num_threads ||
178 !pool_contexts [current_context].thread_init_func)
181 thread_data = (pool_contexts [current_context].thread_datas) ? pool_contexts [current_context].thread_datas [worker_index] : NULL;
182 pool_contexts [current_context].thread_init_func (thread_data);
187 mono_os_mutex_lock (&lock);
189 gboolean do_idle = FALSE;
190 SgenThreadPoolJob *job = NULL;
191 SgenThreadPoolContext *context = NULL;
193 get_work (worker_index, ¤t_context, &do_idle, &job);
195 if (!threadpool_shutdown) {
196 context = &pool_contexts [current_context];
197 thread_data = (context->thread_datas) ? context->thread_datas [worker_index] : NULL;
200 mono_os_mutex_unlock (&lock);
203 job->func (thread_data, job);
205 mono_os_mutex_lock (&lock);
207 SGEN_ASSERT (0, job->state == STATE_IN_PROGRESS, "The job should still be in progress.");
208 job->state = STATE_DONE;
209 remove_job (context, job);
211 * Only the main GC thread will ever wait on the done condition, so we don't
214 mono_os_cond_signal (&done_cond);
215 } else if (do_idle) {
216 SGEN_ASSERT (0, context->idle_job_func, "Why do we have idle work when there's no idle job function?");
218 context->idle_job_func (thread_data);
219 do_idle = continue_idle_job (context, thread_data);
220 } while (do_idle && !has_priority_work (worker_index, current_context));
222 mono_os_mutex_lock (&lock);
225 mono_os_cond_signal (&done_cond);
227 SGEN_ASSERT (0, threadpool_shutdown, "Why did we unlock if no jobs and not shutting down?");
228 mono_os_mutex_lock (&lock);
230 mono_os_cond_signal (&done_cond);
231 mono_os_mutex_unlock (&lock);
236 return (mono_native_thread_return_t)0;
240 sgen_thread_pool_create_context (int num_threads, SgenThreadPoolThreadInitFunc init_func, SgenThreadPoolIdleJobFunc idle_func, SgenThreadPoolContinueIdleJobFunc continue_idle_func, SgenThreadPoolShouldWorkFunc should_work_func, void **thread_datas)
242 int context_id = contexts_num;
244 SGEN_ASSERT (0, contexts_num < SGEN_THREADPOOL_MAX_NUM_CONTEXTS, "Maximum sgen thread pool contexts reached");
246 pool_contexts [context_id].thread_init_func = init_func;
247 pool_contexts [context_id].idle_job_func = idle_func;
248 pool_contexts [context_id].continue_idle_job_func = continue_idle_func;
249 pool_contexts [context_id].should_work_func = should_work_func;
250 pool_contexts [context_id].thread_datas = thread_datas;
252 SGEN_ASSERT (0, num_threads <= SGEN_THREADPOOL_MAX_NUM_THREADS, "Maximum sgen thread pool threads exceeded");
254 pool_contexts [context_id].num_threads = num_threads;
256 sgen_pointer_queue_init (&pool_contexts [contexts_num].job_queue, 0);
264 sgen_thread_pool_start (void)
268 for (i = 0; i < contexts_num; i++) {
269 if (threads_num < pool_contexts [i].num_threads)
270 threads_num = pool_contexts [i].num_threads;
276 mono_os_mutex_init (&lock);
277 mono_os_cond_init (&work_cond);
278 mono_os_cond_init (&done_cond);
280 threads_finished = 0;
281 threadpool_shutdown = FALSE;
283 for (i = 0; i < threads_num; i++) {
284 mono_native_thread_create (&threads [i], thread_func, (void*)(gsize)i);
289 sgen_thread_pool_shutdown (void)
294 mono_os_mutex_lock (&lock);
295 threadpool_shutdown = TRUE;
296 mono_os_cond_broadcast (&work_cond);
297 while (threads_finished < threads_num)
298 mono_os_cond_wait (&done_cond, &lock);
299 mono_os_mutex_unlock (&lock);
301 mono_os_mutex_destroy (&lock);
302 mono_os_cond_destroy (&work_cond);
303 mono_os_cond_destroy (&done_cond);
307 sgen_thread_pool_job_alloc (const char *name, SgenThreadPoolJobFunc func, size_t size)
309 SgenThreadPoolJob *job = (SgenThreadPoolJob *)sgen_alloc_internal_dynamic (size, INTERNAL_MEM_THREAD_POOL_JOB, TRUE);
312 job->state = STATE_WAITING;
318 sgen_thread_pool_job_free (SgenThreadPoolJob *job)
320 sgen_free_internal_dynamic (job, job->size, INTERNAL_MEM_THREAD_POOL_JOB);
324 sgen_thread_pool_job_enqueue (int context_id, SgenThreadPoolJob *job)
326 mono_os_mutex_lock (&lock);
328 sgen_pointer_queue_add (&pool_contexts [context_id].job_queue, job);
329 mono_os_cond_broadcast (&work_cond);
331 mono_os_mutex_unlock (&lock);
335 sgen_thread_pool_job_wait (int context_id, SgenThreadPoolJob *job)
337 SGEN_ASSERT (0, job, "Where's the job?");
339 mono_os_mutex_lock (&lock);
341 while (find_job_in_queue (&pool_contexts [context_id], job) >= 0)
342 mono_os_cond_wait (&done_cond, &lock);
344 mono_os_mutex_unlock (&lock);
348 sgen_thread_pool_idle_signal (int context_id)
350 SGEN_ASSERT (0, pool_contexts [context_id].idle_job_func, "Why are we signaling idle without an idle function?");
352 mono_os_mutex_lock (&lock);
354 if (pool_contexts [context_id].continue_idle_job_func (NULL, context_id))
355 mono_os_cond_broadcast (&work_cond);
357 mono_os_mutex_unlock (&lock);
361 sgen_thread_pool_idle_wait (int context_id)
363 SGEN_ASSERT (0, pool_contexts [context_id].idle_job_func, "Why are we waiting for idle without an idle function?");
365 mono_os_mutex_lock (&lock);
367 while (pool_contexts [context_id].continue_idle_job_func (NULL, context_id))
368 mono_os_cond_wait (&done_cond, &lock);
370 mono_os_mutex_unlock (&lock);
374 sgen_thread_pool_wait_for_all_jobs (int context_id)
376 mono_os_mutex_lock (&lock);
378 while (!sgen_pointer_queue_is_empty (&pool_contexts [context_id].job_queue))
379 mono_os_cond_wait (&done_cond, &lock);
381 mono_os_mutex_unlock (&lock);
384 /* Return 0 if is not a thread pool thread or the thread number otherwise */
386 sgen_thread_pool_is_thread_pool_thread (MonoNativeThreadId some_thread)
390 for (i = 0; i < threads_num; i++) {
391 if (some_thread == threads [i])