6a03bc3d69d164b5cd949b0b71760fc84f41d04b
[mono.git] / mono / sgen / sgen-thread-pool.c
1 /**
2  * \file
3  * Threadpool for all concurrent GC work.
4  *
5  * Copyright (C) 2015 Xamarin Inc
6  *
7  * Licensed under the MIT license. See LICENSE file in the project root for full license information.
8  */
9
10 #include "config.h"
11 #ifdef HAVE_SGEN_GC
12
13 #include "mono/sgen/sgen-gc.h"
14 #include "mono/sgen/sgen-thread-pool.h"
15 #include "mono/sgen/sgen-client.h"
16 #include "mono/utils/mono-os-mutex.h"
17
18 static mono_mutex_t lock;
19 static mono_cond_t work_cond;
20 static mono_cond_t done_cond;
21
22 static int threads_num;
23 static MonoNativeThreadId threads [SGEN_THREADPOOL_MAX_NUM_THREADS];
24
25 static volatile gboolean threadpool_shutdown;
26 static volatile int threads_finished;
27
28 static int contexts_num;
29 static SgenThreadPoolContext pool_contexts [SGEN_THREADPOOL_MAX_NUM_CONTEXTS];
30
31 enum {
32         STATE_WAITING,
33         STATE_IN_PROGRESS,
34         STATE_DONE
35 };
36
37 /* Assumes that the lock is held. */
38 static SgenThreadPoolJob*
39 get_job_and_set_in_progress (SgenThreadPoolContext *context)
40 {
41         for (size_t i = 0; i < context->job_queue.next_slot; ++i) {
42                 SgenThreadPoolJob *job = (SgenThreadPoolJob *)context->job_queue.data [i];
43                 if (job->state == STATE_WAITING) {
44                         job->state = STATE_IN_PROGRESS;
45                         return job;
46                 }
47         }
48         return NULL;
49 }
50
51 /* Assumes that the lock is held. */
52 static ssize_t
53 find_job_in_queue (SgenThreadPoolContext *context, SgenThreadPoolJob *job)
54 {
55         for (ssize_t i = 0; i < context->job_queue.next_slot; ++i) {
56                 if (context->job_queue.data [i] == job)
57                         return i;
58         }
59         return -1;
60 }
61
62 /* Assumes that the lock is held. */
63 static void
64 remove_job (SgenThreadPoolContext *context, SgenThreadPoolJob *job)
65 {
66         ssize_t index;
67         SGEN_ASSERT (0, job->state == STATE_DONE, "Why are we removing a job that's not done?");
68         index = find_job_in_queue (context, job);
69         SGEN_ASSERT (0, index >= 0, "Why is the job we're trying to remove not in the queue?");
70         context->job_queue.data [index] = NULL;
71         sgen_pointer_queue_remove_nulls (&context->job_queue);
72         sgen_thread_pool_job_free (job);
73 }
74
75 static gboolean
76 continue_idle_job (SgenThreadPoolContext *context, void *thread_data)
77 {
78         if (!context->continue_idle_job_func)
79                 return FALSE;
80         return context->continue_idle_job_func (thread_data, context - pool_contexts);
81 }
82
83 static gboolean
84 should_work (SgenThreadPoolContext *context, void *thread_data)
85 {
86         if (!context->should_work_func)
87                 return TRUE;
88         return context->should_work_func (thread_data);
89 }
90
91 /*
92  * Tells whether we should lock and attempt to get work from
93  * a higher priority context.
94  */
95 static gboolean
96 has_priority_work (int worker_index, int current_context)
97 {
98         int i;
99
100         for (i = 0; i < current_context; i++) {
101                 SgenThreadPoolContext *context = &pool_contexts [i];
102                 void *thread_data;
103
104                 if (worker_index >= context->num_threads)
105                         continue;
106                 thread_data = (context->thread_datas) ? context->thread_datas [worker_index] : NULL;
107                 if (!should_work (context, thread_data))
108                         continue;
109                 if (context->job_queue.next_slot > 0)
110                         return TRUE;
111                 if (continue_idle_job (context, thread_data))
112                         return TRUE;
113         }
114
115         /* Return if job enqueued on current context. Jobs have priority over idle work */
116         if (pool_contexts [current_context].job_queue.next_slot > 0)
117                 return TRUE;
118
119         return FALSE;
120 }
121
122 /*
123  * Gets the highest priority work. If there is none, it waits
124  * for work_cond. Should always be called with lock held.
125  */
126 static void
127 get_work (int worker_index, int *work_context, int *do_idle, SgenThreadPoolJob **job)
128 {
129         while (!threadpool_shutdown) {
130                 int i;
131
132                 for (i = 0; i < contexts_num; i++) {
133                         SgenThreadPoolContext *context = &pool_contexts [i];
134                         void *thread_data;
135
136                         if (worker_index >= context->num_threads)
137                                 continue;
138                         thread_data = (context->thread_datas) ? context->thread_datas [worker_index] : NULL;
139
140                         if (!should_work (context, thread_data))
141                                 continue;
142
143                         /*
144                          * It's important that we check the continue idle flag with the lock held.
145                          * Suppose we didn't check with the lock held, and the result is FALSE.  The
146                          * main thread might then set continue idle and signal us before we can take
147                          * the lock, and we'd lose the signal.
148                          */
149                         *do_idle = continue_idle_job (context, thread_data);
150                         *job = get_job_and_set_in_progress (context);
151
152                         if (*job || *do_idle) {
153                                 *work_context = i;
154                                 return;
155                         }
156                 }
157
158                 /*
159                  * Nothing to do on any context
160                  * pthread_cond_wait() can return successfully despite the condition
161                  * not being signalled, so we have to run this in a loop until we
162                  * really have work to do.
163                  */
164                 mono_os_cond_wait (&work_cond, &lock);
165         }
166 }
167
168 static mono_native_thread_return_t
169 thread_func (int worker_index)
170 {
171         int current_context;
172         void *thread_data = NULL;
173
174         sgen_client_thread_register_worker ();
175
176         for (current_context = 0; current_context < contexts_num; current_context++) {
177                 if (worker_index >= pool_contexts [current_context].num_threads ||
178                                 !pool_contexts [current_context].thread_init_func)
179                         break;
180
181                 thread_data = (pool_contexts [current_context].thread_datas) ? pool_contexts [current_context].thread_datas [worker_index] : NULL;
182                 pool_contexts [current_context].thread_init_func (thread_data);
183         }
184
185         current_context = 0;
186
187         mono_os_mutex_lock (&lock);
188         for (;;) {
189                 gboolean do_idle = FALSE;
190                 SgenThreadPoolJob *job = NULL;
191                 SgenThreadPoolContext *context = NULL;
192
193                 get_work (worker_index, &current_context, &do_idle, &job);
194
195                 if (!threadpool_shutdown) {
196                         context = &pool_contexts [current_context];
197                         thread_data = (context->thread_datas) ? context->thread_datas [worker_index] : NULL;
198                 }
199
200                 mono_os_mutex_unlock (&lock);
201
202                 if (job) {
203                         job->func (thread_data, job);
204
205                         mono_os_mutex_lock (&lock);
206
207                         SGEN_ASSERT (0, job->state == STATE_IN_PROGRESS, "The job should still be in progress.");
208                         job->state = STATE_DONE;
209                         remove_job (context, job);
210                         /*
211                          * Only the main GC thread will ever wait on the done condition, so we don't
212                          * have to broadcast.
213                          */
214                         mono_os_cond_signal (&done_cond);
215                 } else if (do_idle) {
216                         SGEN_ASSERT (0, context->idle_job_func, "Why do we have idle work when there's no idle job function?");
217                         do {
218                                 context->idle_job_func (thread_data);
219                                 do_idle = continue_idle_job (context, thread_data);
220                         } while (do_idle && !has_priority_work (worker_index, current_context));
221
222                         mono_os_mutex_lock (&lock);
223
224                         if (!do_idle)
225                                 mono_os_cond_signal (&done_cond);
226                 } else {
227                         SGEN_ASSERT (0, threadpool_shutdown, "Why did we unlock if no jobs and not shutting down?");
228                         mono_os_mutex_lock (&lock);
229                         threads_finished++;
230                         mono_os_cond_signal (&done_cond);
231                         mono_os_mutex_unlock (&lock);
232                         return 0;
233                 }
234         }
235
236         return (mono_native_thread_return_t)0;
237 }
238
239 int
240 sgen_thread_pool_create_context (int num_threads, SgenThreadPoolThreadInitFunc init_func, SgenThreadPoolIdleJobFunc idle_func, SgenThreadPoolContinueIdleJobFunc continue_idle_func, SgenThreadPoolShouldWorkFunc should_work_func, void **thread_datas)
241 {
242         int context_id = contexts_num;
243
244         SGEN_ASSERT (0, contexts_num < SGEN_THREADPOOL_MAX_NUM_CONTEXTS, "Maximum sgen thread pool contexts reached");
245
246         pool_contexts [context_id].thread_init_func = init_func;
247         pool_contexts [context_id].idle_job_func = idle_func;
248         pool_contexts [context_id].continue_idle_job_func = continue_idle_func;
249         pool_contexts [context_id].should_work_func = should_work_func;
250         pool_contexts [context_id].thread_datas = thread_datas;
251
252         SGEN_ASSERT (0, num_threads <= SGEN_THREADPOOL_MAX_NUM_THREADS, "Maximum sgen thread pool threads exceeded");
253
254         pool_contexts [context_id].num_threads = num_threads;
255
256         sgen_pointer_queue_init (&pool_contexts [contexts_num].job_queue, 0);
257
258         contexts_num++;
259
260         return context_id;
261 }
262
263 void
264 sgen_thread_pool_start (void)
265 {
266         int i;
267
268         for (i = 0; i < contexts_num; i++) {
269                 if (threads_num < pool_contexts [i].num_threads)
270                         threads_num = pool_contexts [i].num_threads;
271         }
272
273         if (!threads_num)
274                 return;
275
276         mono_os_mutex_init (&lock);
277         mono_os_cond_init (&work_cond);
278         mono_os_cond_init (&done_cond);
279
280         threads_finished = 0;
281         threadpool_shutdown = FALSE;
282
283         for (i = 0; i < threads_num; i++) {
284                 mono_native_thread_create (&threads [i], thread_func, (void*)(gsize)i);
285         }
286 }
287
288 void
289 sgen_thread_pool_shutdown (void)
290 {
291         if (!threads_num)
292                 return;
293
294         mono_os_mutex_lock (&lock);
295         threadpool_shutdown = TRUE;
296         mono_os_cond_broadcast (&work_cond);
297         while (threads_finished < threads_num)
298                 mono_os_cond_wait (&done_cond, &lock);
299         mono_os_mutex_unlock (&lock);
300
301         mono_os_mutex_destroy (&lock);
302         mono_os_cond_destroy (&work_cond);
303         mono_os_cond_destroy (&done_cond);
304 }
305
306 SgenThreadPoolJob*
307 sgen_thread_pool_job_alloc (const char *name, SgenThreadPoolJobFunc func, size_t size)
308 {
309         SgenThreadPoolJob *job = (SgenThreadPoolJob *)sgen_alloc_internal_dynamic (size, INTERNAL_MEM_THREAD_POOL_JOB, TRUE);
310         job->name = name;
311         job->size = size;
312         job->state = STATE_WAITING;
313         job->func = func;
314         return job;
315 }
316
317 void
318 sgen_thread_pool_job_free (SgenThreadPoolJob *job)
319 {
320         sgen_free_internal_dynamic (job, job->size, INTERNAL_MEM_THREAD_POOL_JOB);
321 }
322
323 void
324 sgen_thread_pool_job_enqueue (int context_id, SgenThreadPoolJob *job)
325 {
326         mono_os_mutex_lock (&lock);
327
328         sgen_pointer_queue_add (&pool_contexts [context_id].job_queue, job);
329         mono_os_cond_broadcast (&work_cond);
330
331         mono_os_mutex_unlock (&lock);
332 }
333
334 void
335 sgen_thread_pool_job_wait (int context_id, SgenThreadPoolJob *job)
336 {
337         SGEN_ASSERT (0, job, "Where's the job?");
338
339         mono_os_mutex_lock (&lock);
340
341         while (find_job_in_queue (&pool_contexts [context_id], job) >= 0)
342                 mono_os_cond_wait (&done_cond, &lock);
343
344         mono_os_mutex_unlock (&lock);
345 }
346
347 void
348 sgen_thread_pool_idle_signal (int context_id)
349 {
350         SGEN_ASSERT (0, pool_contexts [context_id].idle_job_func, "Why are we signaling idle without an idle function?");
351
352         mono_os_mutex_lock (&lock);
353
354         if (pool_contexts [context_id].continue_idle_job_func (NULL, context_id))
355                 mono_os_cond_broadcast (&work_cond);
356
357         mono_os_mutex_unlock (&lock);
358 }
359
360 void
361 sgen_thread_pool_idle_wait (int context_id)
362 {
363         SGEN_ASSERT (0, pool_contexts [context_id].idle_job_func, "Why are we waiting for idle without an idle function?");
364
365         mono_os_mutex_lock (&lock);
366
367         while (pool_contexts [context_id].continue_idle_job_func (NULL, context_id))
368                 mono_os_cond_wait (&done_cond, &lock);
369
370         mono_os_mutex_unlock (&lock);
371 }
372
373 void
374 sgen_thread_pool_wait_for_all_jobs (int context_id)
375 {
376         mono_os_mutex_lock (&lock);
377
378         while (!sgen_pointer_queue_is_empty (&pool_contexts [context_id].job_queue))
379                 mono_os_cond_wait (&done_cond, &lock);
380
381         mono_os_mutex_unlock (&lock);
382 }
383
384 /* Return 0 if is not a thread pool thread or the thread number otherwise */
385 int
386 sgen_thread_pool_is_thread_pool_thread (MonoNativeThreadId some_thread)
387 {
388         int i;
389
390         for (i = 0; i < threads_num; i++) {
391                 if (some_thread == threads [i])
392                         return i + 1;
393         }
394
395         return 0;
396 }
397
398 #endif