[sgen] Split up concurrent sweep from worker logic
[mono.git] / mono / sgen / sgen-thread-pool.c
1 /**
2  * \file
3  * Threadpool for all concurrent GC work.
4  *
5  * Copyright (C) 2015 Xamarin Inc
6  *
7  * Licensed under the MIT license. See LICENSE file in the project root for full license information.
8  */
9
10 #include "config.h"
11 #ifdef HAVE_SGEN_GC
12
13 #include "mono/sgen/sgen-gc.h"
14 #include "mono/sgen/sgen-thread-pool.h"
15 #include "mono/utils/mono-os-mutex.h"
16
17 enum {
18         STATE_WAITING,
19         STATE_IN_PROGRESS,
20         STATE_DONE
21 };
22
23 /* Assumes that the lock is held. */
24 static SgenThreadPoolJob*
25 get_job_and_set_in_progress (SgenThreadPool *pool)
26 {
27         for (size_t i = 0; i < pool->job_queue.next_slot; ++i) {
28                 SgenThreadPoolJob *job = (SgenThreadPoolJob *)pool->job_queue.data [i];
29                 if (job->state == STATE_WAITING) {
30                         job->state = STATE_IN_PROGRESS;
31                         return job;
32                 }
33         }
34         return NULL;
35 }
36
37 /* Assumes that the lock is held. */
38 static ssize_t
39 find_job_in_queue (SgenThreadPool *pool, SgenThreadPoolJob *job)
40 {
41         for (ssize_t i = 0; i < pool->job_queue.next_slot; ++i) {
42                 if (pool->job_queue.data [i] == job)
43                         return i;
44         }
45         return -1;
46 }
47
48 /* Assumes that the lock is held. */
49 static void
50 remove_job (SgenThreadPool *pool, SgenThreadPoolJob *job)
51 {
52         ssize_t index;
53         SGEN_ASSERT (0, job->state == STATE_DONE, "Why are we removing a job that's not done?");
54         index = find_job_in_queue (pool, job);
55         SGEN_ASSERT (0, index >= 0, "Why is the job we're trying to remove not in the queue?");
56         pool->job_queue.data [index] = NULL;
57         sgen_pointer_queue_remove_nulls (&pool->job_queue);
58         sgen_thread_pool_job_free (job);
59 }
60
61 static gboolean
62 continue_idle_job (SgenThreadPool *pool, void *thread_data)
63 {
64         if (!pool->continue_idle_job_func)
65                 return FALSE;
66         return pool->continue_idle_job_func (thread_data);
67 }
68
69 static gboolean
70 should_work (SgenThreadPool *pool, void *thread_data)
71 {
72         if (!pool->should_work_func)
73                 return TRUE;
74         return pool->should_work_func (thread_data);
75 }
76
77 static mono_native_thread_return_t
78 thread_func (SgenThreadPoolData *thread_data)
79 {
80         SgenThreadPool *pool = thread_data->pool;
81
82         pool->thread_init_func (thread_data);
83
84         mono_os_mutex_lock (&pool->lock);
85         for (;;) {
86                 gboolean do_idle;
87                 SgenThreadPoolJob *job;
88
89                 if (!should_work (pool, thread_data) && !pool->threadpool_shutdown) {
90                         mono_os_cond_wait (&pool->work_cond, &pool->lock);
91                         continue;
92                 }
93                 /*
94                  * It's important that we check the continue idle flag with the lock held.
95                  * Suppose we didn't check with the lock held, and the result is FALSE.  The
96                  * main thread might then set continue idle and signal us before we can take
97                  * the lock, and we'd lose the signal.
98                  */
99                 do_idle = continue_idle_job (pool, thread_data);
100                 job = get_job_and_set_in_progress (pool);
101
102                 if (!job && !do_idle && !pool->threadpool_shutdown) {
103                         /*
104                          * pthread_cond_wait() can return successfully despite the condition
105                          * not being signalled, so we have to run this in a loop until we
106                          * really have work to do.
107                          */
108                         mono_os_cond_wait (&pool->work_cond, &pool->lock);
109                         continue;
110                 }
111
112                 mono_os_mutex_unlock (&pool->lock);
113
114                 if (job) {
115                         job->func (thread_data, job);
116
117                         mono_os_mutex_lock (&pool->lock);
118
119                         SGEN_ASSERT (0, job->state == STATE_IN_PROGRESS, "The job should still be in progress.");
120                         job->state = STATE_DONE;
121                         remove_job (pool, job);
122                         /*
123                          * Only the main GC thread will ever wait on the done condition, so we don't
124                          * have to broadcast.
125                          */
126                         mono_os_cond_signal (&pool->done_cond);
127                 } else if (do_idle) {
128                         SGEN_ASSERT (0, pool->idle_job_func, "Why do we have idle work when there's no idle job function?");
129                         do {
130                                 pool->idle_job_func (thread_data);
131                                 do_idle = continue_idle_job (pool, thread_data);
132                         } while (do_idle && !pool->job_queue.next_slot);
133
134                         mono_os_mutex_lock (&pool->lock);
135
136                         if (!do_idle)
137                                 mono_os_cond_signal (&pool->done_cond);
138                 } else {
139                         SGEN_ASSERT (0, pool->threadpool_shutdown, "Why did we unlock if no jobs and not shutting down?");
140                         mono_os_mutex_lock (&pool->lock);
141                         pool->threads_finished++;
142                         mono_os_cond_signal (&pool->done_cond);
143                         mono_os_mutex_unlock (&pool->lock);
144                         return 0;
145                 }
146         }
147
148         return (mono_native_thread_return_t)0;
149 }
150
151 void
152 sgen_thread_pool_init (SgenThreadPool *pool, int num_threads, SgenThreadPoolThreadInitFunc init_func, SgenThreadPoolIdleJobFunc idle_func, SgenThreadPoolContinueIdleJobFunc continue_idle_func, SgenThreadPoolShouldWorkFunc should_work_func_p, SgenThreadPoolData **thread_datas)
153 {
154         int i;
155
156         SGEN_ASSERT (0, num_threads > 0, "Why are we creating a threadpool with no threads?");
157
158         pool->threads_num = (num_threads < MAX_NUM_THREADS) ? num_threads : MAX_NUM_THREADS;
159
160         mono_os_mutex_init (&pool->lock);
161         mono_os_cond_init (&pool->work_cond);
162         mono_os_cond_init (&pool->done_cond);
163
164         pool->thread_init_func = init_func;
165         pool->idle_job_func = idle_func;
166         pool->continue_idle_job_func = continue_idle_func;
167         pool->should_work_func = should_work_func_p;
168
169         sgen_pointer_queue_init (&pool->job_queue, 0);
170         pool->threads_finished = 0;
171         pool->threadpool_shutdown = FALSE;
172
173         for (i = 0; i < pool->threads_num; i++) {
174                 thread_datas [i]->pool = pool;
175                 mono_native_thread_create (&pool->threads [i], thread_func, thread_datas [i]);
176         }
177 }
178
179 void
180 sgen_thread_pool_shutdown (SgenThreadPool *pool)
181 {
182         if (!pool)
183                 return;
184
185         mono_os_mutex_lock (&pool->lock);
186         pool->threadpool_shutdown = TRUE;
187         mono_os_cond_broadcast (&pool->work_cond);
188         while (pool->threads_finished < pool->threads_num)
189                 mono_os_cond_wait (&pool->done_cond, &pool->lock);
190         mono_os_mutex_unlock (&pool->lock);
191
192         mono_os_mutex_destroy (&pool->lock);
193         mono_os_cond_destroy (&pool->work_cond);
194         mono_os_cond_destroy (&pool->done_cond);
195 }
196
197 SgenThreadPoolJob*
198 sgen_thread_pool_job_alloc (const char *name, SgenThreadPoolJobFunc func, size_t size)
199 {
200         SgenThreadPoolJob *job = (SgenThreadPoolJob *)sgen_alloc_internal_dynamic (size, INTERNAL_MEM_THREAD_POOL_JOB, TRUE);
201         job->name = name;
202         job->size = size;
203         job->state = STATE_WAITING;
204         job->func = func;
205         return job;
206 }
207
208 void
209 sgen_thread_pool_job_free (SgenThreadPoolJob *job)
210 {
211         sgen_free_internal_dynamic (job, job->size, INTERNAL_MEM_THREAD_POOL_JOB);
212 }
213
214 void
215 sgen_thread_pool_job_enqueue (SgenThreadPool *pool, SgenThreadPoolJob *job)
216 {
217         mono_os_mutex_lock (&pool->lock);
218
219         sgen_pointer_queue_add (&pool->job_queue, job);
220         mono_os_cond_signal (&pool->work_cond);
221
222         mono_os_mutex_unlock (&pool->lock);
223 }
224
225 void
226 sgen_thread_pool_job_wait (SgenThreadPool *pool, SgenThreadPoolJob *job)
227 {
228         SGEN_ASSERT (0, job, "Where's the job?");
229
230         mono_os_mutex_lock (&pool->lock);
231
232         while (find_job_in_queue (pool, job) >= 0)
233                 mono_os_cond_wait (&pool->done_cond, &pool->lock);
234
235         mono_os_mutex_unlock (&pool->lock);
236 }
237
238 void
239 sgen_thread_pool_idle_signal (SgenThreadPool *pool)
240 {
241         SGEN_ASSERT (0, pool->idle_job_func, "Why are we signaling idle without an idle function?");
242
243         mono_os_mutex_lock (&pool->lock);
244
245         if (pool->continue_idle_job_func (NULL))
246                 mono_os_cond_broadcast (&pool->work_cond);
247
248         mono_os_mutex_unlock (&pool->lock);
249 }
250
251 void
252 sgen_thread_pool_idle_wait (SgenThreadPool *pool)
253 {
254         SGEN_ASSERT (0, pool->idle_job_func, "Why are we waiting for idle without an idle function?");
255
256         mono_os_mutex_lock (&pool->lock);
257
258         while (pool->continue_idle_job_func (NULL))
259                 mono_os_cond_wait (&pool->done_cond, &pool->lock);
260
261         mono_os_mutex_unlock (&pool->lock);
262 }
263
264 void
265 sgen_thread_pool_wait_for_all_jobs (SgenThreadPool *pool)
266 {
267         mono_os_mutex_lock (&pool->lock);
268
269         while (!sgen_pointer_queue_is_empty (&pool->job_queue))
270                 mono_os_cond_wait (&pool->done_cond, &pool->lock);
271
272         mono_os_mutex_unlock (&pool->lock);
273 }
274
275 /* Return 0 if is not a thread pool thread or the thread number otherwise */
276 int
277 sgen_thread_pool_is_thread_pool_thread (SgenThreadPool *pool, MonoNativeThreadId some_thread)
278 {
279         int i;
280
281         if (!pool)
282                 return 0;
283
284         for (i = 0; i < pool->threads_num; i++) {
285                 if (some_thread == pool->threads [i])
286                         return i + 1;
287         }
288
289         return 0;
290 }
291
292 #endif