Merge pull request #3806 from BrzVlad/feature-parallel-gc-final
[mono.git] / mono / sgen / sgen-thread-pool.c
1 /*
2  * sgen-thread-pool.c: Threadpool for all concurrent GC work.
3  *
4  * Copyright (C) 2015 Xamarin Inc
5  *
6  * Licensed under the MIT license. See LICENSE file in the project root for full license information.
7  */
8
9 #include "config.h"
10 #ifdef HAVE_SGEN_GC
11
12 #include "mono/sgen/sgen-gc.h"
13 #include "mono/sgen/sgen-thread-pool.h"
14 #include "mono/sgen/sgen-pointer-queue.h"
15 #include "mono/utils/mono-os-mutex.h"
16 #ifndef SGEN_WITHOUT_MONO
17 #include "mono/utils/mono-threads.h"
18 #endif
19
20 #define MAX_NUM_THREADS 8
21
22 static mono_mutex_t lock;
23 static mono_cond_t work_cond;
24 static mono_cond_t done_cond;
25
26 static int threads_num = 0;
27 static MonoNativeThreadId threads [MAX_NUM_THREADS];
28
29 /* Only accessed with the lock held. */
30 static SgenPointerQueue job_queue;
31
32 static SgenThreadPoolThreadInitFunc thread_init_func;
33 static SgenThreadPoolIdleJobFunc idle_job_func;
34 static SgenThreadPoolContinueIdleJobFunc continue_idle_job_func;
35 static SgenThreadPoolShouldWorkFunc should_work_func;
36
37 static volatile gboolean threadpool_shutdown;
38 static volatile int threads_finished = 0;
39
40 enum {
41         STATE_WAITING,
42         STATE_IN_PROGRESS,
43         STATE_DONE
44 };
45
46 /* Assumes that the lock is held. */
47 static SgenThreadPoolJob*
48 get_job_and_set_in_progress (void)
49 {
50         for (size_t i = 0; i < job_queue.next_slot; ++i) {
51                 SgenThreadPoolJob *job = (SgenThreadPoolJob *)job_queue.data [i];
52                 if (job->state == STATE_WAITING) {
53                         job->state = STATE_IN_PROGRESS;
54                         return job;
55                 }
56         }
57         return NULL;
58 }
59
60 /* Assumes that the lock is held. */
61 static ssize_t
62 find_job_in_queue (SgenThreadPoolJob *job)
63 {
64         for (ssize_t i = 0; i < job_queue.next_slot; ++i) {
65                 if (job_queue.data [i] == job)
66                         return i;
67         }
68         return -1;
69 }
70
71 /* Assumes that the lock is held. */
72 static void
73 remove_job (SgenThreadPoolJob *job)
74 {
75         ssize_t index;
76         SGEN_ASSERT (0, job->state == STATE_DONE, "Why are we removing a job that's not done?");
77         index = find_job_in_queue (job);
78         SGEN_ASSERT (0, index >= 0, "Why is the job we're trying to remove not in the queue?");
79         job_queue.data [index] = NULL;
80         sgen_pointer_queue_remove_nulls (&job_queue);
81         sgen_thread_pool_job_free (job);
82 }
83
84 static gboolean
85 continue_idle_job (void *thread_data)
86 {
87         if (!continue_idle_job_func)
88                 return FALSE;
89         return continue_idle_job_func (thread_data);
90 }
91
92 static gboolean
93 should_work (void *thread_data)
94 {
95         if (!should_work_func)
96                 return TRUE;
97         return should_work_func (thread_data);
98 }
99
100 static mono_native_thread_return_t
101 thread_func (void *thread_data)
102 {
103         thread_init_func (thread_data);
104
105         mono_os_mutex_lock (&lock);
106         for (;;) {
107                 gboolean do_idle;
108                 SgenThreadPoolJob *job;
109
110                 if (!should_work (thread_data)) {
111                         mono_os_cond_wait (&work_cond, &lock);
112                         continue;
113                 }
114                 /*
115                  * It's important that we check the continue idle flag with the lock held.
116                  * Suppose we didn't check with the lock held, and the result is FALSE.  The
117                  * main thread might then set continue idle and signal us before we can take
118                  * the lock, and we'd lose the signal.
119                  */
120                 do_idle = continue_idle_job (thread_data);
121                 job = get_job_and_set_in_progress ();
122
123                 if (!job && !do_idle && !threadpool_shutdown) {
124                         /*
125                          * pthread_cond_wait() can return successfully despite the condition
126                          * not being signalled, so we have to run this in a loop until we
127                          * really have work to do.
128                          */
129                         mono_os_cond_wait (&work_cond, &lock);
130                         continue;
131                 }
132
133                 mono_os_mutex_unlock (&lock);
134
135                 if (job) {
136                         job->func (thread_data, job);
137
138                         mono_os_mutex_lock (&lock);
139
140                         SGEN_ASSERT (0, job->state == STATE_IN_PROGRESS, "The job should still be in progress.");
141                         job->state = STATE_DONE;
142                         remove_job (job);
143                         /*
144                          * Only the main GC thread will ever wait on the done condition, so we don't
145                          * have to broadcast.
146                          */
147                         mono_os_cond_signal (&done_cond);
148                 } else if (do_idle) {
149                         SGEN_ASSERT (0, idle_job_func, "Why do we have idle work when there's no idle job function?");
150                         do {
151                                 idle_job_func (thread_data);
152                                 do_idle = continue_idle_job (thread_data);
153                         } while (do_idle && !job_queue.next_slot);
154
155                         mono_os_mutex_lock (&lock);
156
157                         if (!do_idle)
158                                 mono_os_cond_signal (&done_cond);
159                 } else {
160                         SGEN_ASSERT (0, threadpool_shutdown, "Why did we unlock if no jobs and not shutting down?");
161                         mono_os_mutex_lock (&lock);
162                         threads_finished++;
163                         mono_os_cond_signal (&done_cond);
164                         mono_os_mutex_unlock (&lock);
165                         return 0;
166                 }
167         }
168
169         return (mono_native_thread_return_t)0;
170 }
171
172 void
173 sgen_thread_pool_init (int num_threads, SgenThreadPoolThreadInitFunc init_func, SgenThreadPoolIdleJobFunc idle_func, SgenThreadPoolContinueIdleJobFunc continue_idle_func, SgenThreadPoolShouldWorkFunc should_work_func_p, void **thread_datas)
174 {
175         int i;
176
177         threads_num = (num_threads < MAX_NUM_THREADS) ? num_threads : MAX_NUM_THREADS;
178
179         mono_os_mutex_init (&lock);
180         mono_os_cond_init (&work_cond);
181         mono_os_cond_init (&done_cond);
182
183         thread_init_func = init_func;
184         idle_job_func = idle_func;
185         continue_idle_job_func = continue_idle_func;
186         should_work_func = should_work_func_p;
187
188         for (i = 0; i < threads_num; i++)
189                 mono_native_thread_create (&threads [i], thread_func, thread_datas ? thread_datas [i] : NULL);
190 }
191
192 void
193 sgen_thread_pool_shutdown (void)
194 {
195         if (!threads_num)
196                 return;
197
198         mono_os_mutex_lock (&lock);
199         threadpool_shutdown = TRUE;
200         mono_os_cond_broadcast (&work_cond);
201         while (threads_finished < threads_num)
202                 mono_os_cond_wait (&done_cond, &lock);
203         mono_os_mutex_unlock (&lock);
204
205         mono_os_mutex_destroy (&lock);
206         mono_os_cond_destroy (&work_cond);
207         mono_os_cond_destroy (&done_cond);
208 }
209
210 SgenThreadPoolJob*
211 sgen_thread_pool_job_alloc (const char *name, SgenThreadPoolJobFunc func, size_t size)
212 {
213         SgenThreadPoolJob *job = (SgenThreadPoolJob *)sgen_alloc_internal_dynamic (size, INTERNAL_MEM_THREAD_POOL_JOB, TRUE);
214         job->name = name;
215         job->size = size;
216         job->state = STATE_WAITING;
217         job->func = func;
218         return job;
219 }
220
221 void
222 sgen_thread_pool_job_free (SgenThreadPoolJob *job)
223 {
224         sgen_free_internal_dynamic (job, job->size, INTERNAL_MEM_THREAD_POOL_JOB);
225 }
226
227 void
228 sgen_thread_pool_job_enqueue (SgenThreadPoolJob *job)
229 {
230         mono_os_mutex_lock (&lock);
231
232         sgen_pointer_queue_add (&job_queue, job);
233         mono_os_cond_signal (&work_cond);
234
235         mono_os_mutex_unlock (&lock);
236 }
237
238 void
239 sgen_thread_pool_job_wait (SgenThreadPoolJob *job)
240 {
241         SGEN_ASSERT (0, job, "Where's the job?");
242
243         mono_os_mutex_lock (&lock);
244
245         while (find_job_in_queue (job) >= 0)
246                 mono_os_cond_wait (&done_cond, &lock);
247
248         mono_os_mutex_unlock (&lock);
249 }
250
251 void
252 sgen_thread_pool_idle_signal (void)
253 {
254         SGEN_ASSERT (0, idle_job_func, "Why are we signaling idle without an idle function?");
255
256         mono_os_mutex_lock (&lock);
257
258         if (continue_idle_job_func (NULL))
259                 mono_os_cond_broadcast (&work_cond);
260
261         mono_os_mutex_unlock (&lock);
262 }
263
264 void
265 sgen_thread_pool_idle_wait (void)
266 {
267         SGEN_ASSERT (0, idle_job_func, "Why are we waiting for idle without an idle function?");
268
269         mono_os_mutex_lock (&lock);
270
271         while (continue_idle_job_func (NULL))
272                 mono_os_cond_wait (&done_cond, &lock);
273
274         mono_os_mutex_unlock (&lock);
275 }
276
277 void
278 sgen_thread_pool_wait_for_all_jobs (void)
279 {
280         mono_os_mutex_lock (&lock);
281
282         while (!sgen_pointer_queue_is_empty (&job_queue))
283                 mono_os_cond_wait (&done_cond, &lock);
284
285         mono_os_mutex_unlock (&lock);
286 }
287
288 /* Return 0 if is not a thread pool thread or the thread number otherwise */
289 int
290 sgen_thread_pool_is_thread_pool_thread (MonoNativeThreadId some_thread)
291 {
292         int i;
293
294         for (i = 0; i < threads_num; i++) {
295                 if (some_thread == threads [i])
296                         return i + 1;
297         }
298
299         return 0;
300 }
301
302 #endif