[sgen] Compact allocated block list
[mono.git] / mono / sgen / sgen-thread-pool.c
1 /**
2  * \file
3  * Threadpool for all concurrent GC work.
4  *
5  * Copyright (C) 2015 Xamarin Inc
6  *
7  * Licensed under the MIT license. See LICENSE file in the project root for full license information.
8  */
9
10 #include "config.h"
11 #ifdef HAVE_SGEN_GC
12
13 #include "mono/sgen/sgen-gc.h"
14 #include "mono/sgen/sgen-thread-pool.h"
15 #include "mono/sgen/sgen-client.h"
16 #include "mono/utils/mono-os-mutex.h"
17
18 static mono_mutex_t lock;
19 static mono_cond_t work_cond;
20 static mono_cond_t done_cond;
21
22 static int threads_num;
23 static MonoNativeThreadId threads [SGEN_THREADPOOL_MAX_NUM_THREADS];
24 static int threads_context [SGEN_THREADPOOL_MAX_NUM_THREADS];
25
26 static volatile gboolean threadpool_shutdown;
27 static volatile int threads_finished;
28
29 static int contexts_num;
30 static SgenThreadPoolContext pool_contexts [SGEN_THREADPOOL_MAX_NUM_CONTEXTS];
31
32 enum {
33         STATE_WAITING,
34         STATE_IN_PROGRESS,
35         STATE_DONE
36 };
37
38 /* Assumes that the lock is held. */
39 static SgenThreadPoolJob*
40 get_job_and_set_in_progress (SgenThreadPoolContext *context)
41 {
42         for (size_t i = 0; i < context->job_queue.next_slot; ++i) {
43                 SgenThreadPoolJob *job = (SgenThreadPoolJob *)context->job_queue.data [i];
44                 if (job->state == STATE_WAITING) {
45                         job->state = STATE_IN_PROGRESS;
46                         return job;
47                 }
48         }
49         return NULL;
50 }
51
52 /* Assumes that the lock is held. */
53 static ssize_t
54 find_job_in_queue (SgenThreadPoolContext *context, SgenThreadPoolJob *job)
55 {
56         for (ssize_t i = 0; i < context->job_queue.next_slot; ++i) {
57                 if (context->job_queue.data [i] == job)
58                         return i;
59         }
60         return -1;
61 }
62
63 /* Assumes that the lock is held. */
64 static void
65 remove_job (SgenThreadPoolContext *context, SgenThreadPoolJob *job)
66 {
67         ssize_t index;
68         SGEN_ASSERT (0, job->state == STATE_DONE, "Why are we removing a job that's not done?");
69         index = find_job_in_queue (context, job);
70         SGEN_ASSERT (0, index >= 0, "Why is the job we're trying to remove not in the queue?");
71         context->job_queue.data [index] = NULL;
72         sgen_pointer_queue_remove_nulls (&context->job_queue);
73         sgen_thread_pool_job_free (job);
74 }
75
76 static gboolean
77 continue_idle_job (SgenThreadPoolContext *context, void *thread_data)
78 {
79         if (!context->continue_idle_job_func)
80                 return FALSE;
81         return context->continue_idle_job_func (thread_data, context - pool_contexts);
82 }
83
84 static gboolean
85 should_work (SgenThreadPoolContext *context, void *thread_data)
86 {
87         if (!context->should_work_func)
88                 return TRUE;
89         return context->should_work_func (thread_data);
90 }
91
92 /*
93  * Tells whether we should lock and attempt to get work from
94  * a higher priority context.
95  */
96 static gboolean
97 has_priority_work (int worker_index, int current_context)
98 {
99         int i;
100
101         for (i = 0; i < current_context; i++) {
102                 SgenThreadPoolContext *context = &pool_contexts [i];
103                 void *thread_data;
104
105                 if (worker_index >= context->num_threads)
106                         continue;
107                 thread_data = (context->thread_datas) ? context->thread_datas [worker_index] : NULL;
108                 if (!should_work (context, thread_data))
109                         continue;
110                 if (context->job_queue.next_slot > 0)
111                         return TRUE;
112                 if (continue_idle_job (context, thread_data))
113                         return TRUE;
114         }
115
116         /* Return if job enqueued on current context. Jobs have priority over idle work */
117         if (pool_contexts [current_context].job_queue.next_slot > 0)
118                 return TRUE;
119
120         return FALSE;
121 }
122
123 /*
124  * Gets the highest priority work. If there is none, it waits
125  * for work_cond. Should always be called with lock held.
126  */
127 static void
128 get_work (int worker_index, int *work_context, int *do_idle, SgenThreadPoolJob **job)
129 {
130         while (!threadpool_shutdown) {
131                 int i;
132
133                 for (i = 0; i < contexts_num; i++) {
134                         SgenThreadPoolContext *context = &pool_contexts [i];
135                         void *thread_data;
136
137                         if (worker_index >= context->num_threads)
138                                 continue;
139                         thread_data = (context->thread_datas) ? context->thread_datas [worker_index] : NULL;
140
141                         if (!should_work (context, thread_data))
142                                 continue;
143
144                         /*
145                          * It's important that we check the continue idle flag with the lock held.
146                          * Suppose we didn't check with the lock held, and the result is FALSE.  The
147                          * main thread might then set continue idle and signal us before we can take
148                          * the lock, and we'd lose the signal.
149                          */
150                         *do_idle = continue_idle_job (context, thread_data);
151                         *job = get_job_and_set_in_progress (context);
152
153                         if (*job || *do_idle) {
154                                 *work_context = i;
155                                 return;
156                         }
157                 }
158
159                 /*
160                  * Nothing to do on any context
161                  * pthread_cond_wait() can return successfully despite the condition
162                  * not being signalled, so we have to run this in a loop until we
163                  * really have work to do.
164                  */
165                 mono_os_cond_wait (&work_cond, &lock);
166         }
167 }
168
169 static mono_native_thread_return_t
170 thread_func (int worker_index)
171 {
172         int current_context;
173         void *thread_data = NULL;
174
175         sgen_client_thread_register_worker ();
176
177         for (current_context = 0; current_context < contexts_num; current_context++) {
178                 if (worker_index >= pool_contexts [current_context].num_threads ||
179                                 !pool_contexts [current_context].thread_init_func)
180                         break;
181
182                 thread_data = (pool_contexts [current_context].thread_datas) ? pool_contexts [current_context].thread_datas [worker_index] : NULL;
183                 pool_contexts [current_context].thread_init_func (thread_data);
184         }
185
186         current_context = 0;
187
188         mono_os_mutex_lock (&lock);
189         for (;;) {
190                 gboolean do_idle = FALSE;
191                 SgenThreadPoolJob *job = NULL;
192                 SgenThreadPoolContext *context = NULL;
193
194                 threads_context [worker_index] = -1;
195                 get_work (worker_index, &current_context, &do_idle, &job);
196                 threads_context [worker_index] = current_context;
197
198                 if (!threadpool_shutdown) {
199                         context = &pool_contexts [current_context];
200                         thread_data = (context->thread_datas) ? context->thread_datas [worker_index] : NULL;
201                 }
202
203                 mono_os_mutex_unlock (&lock);
204
205                 if (job) {
206                         job->func (thread_data, job);
207
208                         mono_os_mutex_lock (&lock);
209
210                         SGEN_ASSERT (0, job->state == STATE_IN_PROGRESS, "The job should still be in progress.");
211                         job->state = STATE_DONE;
212                         remove_job (context, job);
213                         /*
214                          * Only the main GC thread will ever wait on the done condition, so we don't
215                          * have to broadcast.
216                          */
217                         mono_os_cond_signal (&done_cond);
218                 } else if (do_idle) {
219                         SGEN_ASSERT (0, context->idle_job_func, "Why do we have idle work when there's no idle job function?");
220                         do {
221                                 context->idle_job_func (thread_data);
222                                 do_idle = continue_idle_job (context, thread_data);
223                         } while (do_idle && !has_priority_work (worker_index, current_context));
224
225                         mono_os_mutex_lock (&lock);
226
227                         if (!do_idle)
228                                 mono_os_cond_signal (&done_cond);
229                 } else {
230                         SGEN_ASSERT (0, threadpool_shutdown, "Why did we unlock if no jobs and not shutting down?");
231                         mono_os_mutex_lock (&lock);
232                         threads_finished++;
233                         mono_os_cond_signal (&done_cond);
234                         mono_os_mutex_unlock (&lock);
235                         return 0;
236                 }
237         }
238
239         return (mono_native_thread_return_t)0;
240 }
241
242 int
243 sgen_thread_pool_create_context (int num_threads, SgenThreadPoolThreadInitFunc init_func, SgenThreadPoolIdleJobFunc idle_func, SgenThreadPoolContinueIdleJobFunc continue_idle_func, SgenThreadPoolShouldWorkFunc should_work_func, void **thread_datas)
244 {
245         int context_id = contexts_num;
246
247         SGEN_ASSERT (0, contexts_num < SGEN_THREADPOOL_MAX_NUM_CONTEXTS, "Maximum sgen thread pool contexts reached");
248
249         pool_contexts [context_id].thread_init_func = init_func;
250         pool_contexts [context_id].idle_job_func = idle_func;
251         pool_contexts [context_id].continue_idle_job_func = continue_idle_func;
252         pool_contexts [context_id].should_work_func = should_work_func;
253         pool_contexts [context_id].thread_datas = thread_datas;
254
255         SGEN_ASSERT (0, num_threads <= SGEN_THREADPOOL_MAX_NUM_THREADS, "Maximum sgen thread pool threads exceeded");
256
257         pool_contexts [context_id].num_threads = num_threads;
258
259         sgen_pointer_queue_init (&pool_contexts [contexts_num].job_queue, 0);
260
261         contexts_num++;
262
263         return context_id;
264 }
265
266 void
267 sgen_thread_pool_start (void)
268 {
269         int i;
270
271         for (i = 0; i < contexts_num; i++) {
272                 if (threads_num < pool_contexts [i].num_threads)
273                         threads_num = pool_contexts [i].num_threads;
274         }
275
276         if (!threads_num)
277                 return;
278
279         mono_os_mutex_init (&lock);
280         mono_os_cond_init (&work_cond);
281         mono_os_cond_init (&done_cond);
282
283         threads_finished = 0;
284         threadpool_shutdown = FALSE;
285
286         for (i = 0; i < threads_num; i++) {
287                 mono_native_thread_create (&threads [i], thread_func, (void*)(gsize)i);
288         }
289 }
290
291 void
292 sgen_thread_pool_shutdown (void)
293 {
294         if (!threads_num)
295                 return;
296
297         mono_os_mutex_lock (&lock);
298         threadpool_shutdown = TRUE;
299         mono_os_cond_broadcast (&work_cond);
300         while (threads_finished < threads_num)
301                 mono_os_cond_wait (&done_cond, &lock);
302         mono_os_mutex_unlock (&lock);
303
304         mono_os_mutex_destroy (&lock);
305         mono_os_cond_destroy (&work_cond);
306         mono_os_cond_destroy (&done_cond);
307 }
308
309 SgenThreadPoolJob*
310 sgen_thread_pool_job_alloc (const char *name, SgenThreadPoolJobFunc func, size_t size)
311 {
312         SgenThreadPoolJob *job = (SgenThreadPoolJob *)sgen_alloc_internal_dynamic (size, INTERNAL_MEM_THREAD_POOL_JOB, TRUE);
313         job->name = name;
314         job->size = size;
315         job->state = STATE_WAITING;
316         job->func = func;
317         return job;
318 }
319
320 void
321 sgen_thread_pool_job_free (SgenThreadPoolJob *job)
322 {
323         sgen_free_internal_dynamic (job, job->size, INTERNAL_MEM_THREAD_POOL_JOB);
324 }
325
326 void
327 sgen_thread_pool_job_enqueue (int context_id, SgenThreadPoolJob *job)
328 {
329         mono_os_mutex_lock (&lock);
330
331         sgen_pointer_queue_add (&pool_contexts [context_id].job_queue, job);
332         mono_os_cond_broadcast (&work_cond);
333
334         mono_os_mutex_unlock (&lock);
335 }
336
337 void
338 sgen_thread_pool_job_wait (int context_id, SgenThreadPoolJob *job)
339 {
340         SGEN_ASSERT (0, job, "Where's the job?");
341
342         mono_os_mutex_lock (&lock);
343
344         while (find_job_in_queue (&pool_contexts [context_id], job) >= 0)
345                 mono_os_cond_wait (&done_cond, &lock);
346
347         mono_os_mutex_unlock (&lock);
348 }
349
350 void
351 sgen_thread_pool_idle_signal (int context_id)
352 {
353         SGEN_ASSERT (0, pool_contexts [context_id].idle_job_func, "Why are we signaling idle without an idle function?");
354
355         mono_os_mutex_lock (&lock);
356
357         if (pool_contexts [context_id].continue_idle_job_func (NULL, context_id))
358                 mono_os_cond_broadcast (&work_cond);
359
360         mono_os_mutex_unlock (&lock);
361 }
362
363 void
364 sgen_thread_pool_idle_wait (int context_id, SgenThreadPoolContinueIdleWaitFunc continue_wait)
365 {
366         SGEN_ASSERT (0, pool_contexts [context_id].idle_job_func, "Why are we waiting for idle without an idle function?");
367
368         mono_os_mutex_lock (&lock);
369
370         while (continue_wait (context_id, threads_context))
371                 mono_os_cond_wait (&done_cond, &lock);
372
373         mono_os_mutex_unlock (&lock);
374 }
375
376 void
377 sgen_thread_pool_wait_for_all_jobs (int context_id)
378 {
379         mono_os_mutex_lock (&lock);
380
381         while (!sgen_pointer_queue_is_empty (&pool_contexts [context_id].job_queue))
382                 mono_os_cond_wait (&done_cond, &lock);
383
384         mono_os_mutex_unlock (&lock);
385 }
386
387 /* Return 0 if is not a thread pool thread or the thread number otherwise */
388 int
389 sgen_thread_pool_is_thread_pool_thread (MonoNativeThreadId some_thread)
390 {
391         int i;
392
393         for (i = 0; i < threads_num; i++) {
394                 if (some_thread == threads [i])
395                         return i + 1;
396         }
397
398         return 0;
399 }
400
401 #endif