Merge pull request #2820 from kumpera/license-change-rebased
[mono.git] / mono / sgen / sgen-thread-pool.c
1 /*
2  * sgen-thread-pool.c: Threadpool for all concurrent GC work.
3  *
4  * Copyright (C) 2015 Xamarin Inc
5  *
6  * Licensed under the MIT license. See LICENSE file in the project root for full license information.
7  */
8
9 #include "config.h"
10 #ifdef HAVE_SGEN_GC
11
12 #include "mono/sgen/sgen-gc.h"
13 #include "mono/sgen/sgen-thread-pool.h"
14 #include "mono/sgen/sgen-pointer-queue.h"
15 #include "mono/utils/mono-os-mutex.h"
16 #ifndef SGEN_WITHOUT_MONO
17 #include "mono/utils/mono-threads.h"
18 #endif
19
20 static mono_mutex_t lock;
21 static mono_cond_t work_cond;
22 static mono_cond_t done_cond;
23
24 static MonoNativeThreadId thread;
25
26 /* Only accessed with the lock held. */
27 static SgenPointerQueue job_queue;
28
29 static SgenThreadPoolThreadInitFunc thread_init_func;
30 static SgenThreadPoolIdleJobFunc idle_job_func;
31 static SgenThreadPoolContinueIdleJobFunc continue_idle_job_func;
32
33 static volatile gboolean threadpool_shutdown;
34 static volatile gboolean thread_finished;
35
36 enum {
37         STATE_WAITING,
38         STATE_IN_PROGRESS,
39         STATE_DONE
40 };
41
42 /* Assumes that the lock is held. */
43 static SgenThreadPoolJob*
44 get_job_and_set_in_progress (void)
45 {
46         for (size_t i = 0; i < job_queue.next_slot; ++i) {
47                 SgenThreadPoolJob *job = (SgenThreadPoolJob *)job_queue.data [i];
48                 if (job->state == STATE_WAITING) {
49                         job->state = STATE_IN_PROGRESS;
50                         return job;
51                 }
52         }
53         return NULL;
54 }
55
56 /* Assumes that the lock is held. */
57 static ssize_t
58 find_job_in_queue (SgenThreadPoolJob *job)
59 {
60         for (ssize_t i = 0; i < job_queue.next_slot; ++i) {
61                 if (job_queue.data [i] == job)
62                         return i;
63         }
64         return -1;
65 }
66
67 /* Assumes that the lock is held. */
68 static void
69 remove_job (SgenThreadPoolJob *job)
70 {
71         ssize_t index;
72         SGEN_ASSERT (0, job->state == STATE_DONE, "Why are we removing a job that's not done?");
73         index = find_job_in_queue (job);
74         SGEN_ASSERT (0, index >= 0, "Why is the job we're trying to remove not in the queue?");
75         job_queue.data [index] = NULL;
76         sgen_pointer_queue_remove_nulls (&job_queue);
77         sgen_thread_pool_job_free (job);
78 }
79
80 static gboolean
81 continue_idle_job (void)
82 {
83         if (!continue_idle_job_func)
84                 return FALSE;
85         return continue_idle_job_func ();
86 }
87
88 static mono_native_thread_return_t
89 thread_func (void *thread_data)
90 {
91         thread_init_func (thread_data);
92
93         mono_os_mutex_lock (&lock);
94         for (;;) {
95                 /*
96                  * It's important that we check the continue idle flag with the lock held.
97                  * Suppose we didn't check with the lock held, and the result is FALSE.  The
98                  * main thread might then set continue idle and signal us before we can take
99                  * the lock, and we'd lose the signal.
100                  */
101                 gboolean do_idle = continue_idle_job ();
102                 SgenThreadPoolJob *job = get_job_and_set_in_progress ();
103
104                 if (!job && !do_idle && !threadpool_shutdown) {
105                         /*
106                          * pthread_cond_wait() can return successfully despite the condition
107                          * not being signalled, so we have to run this in a loop until we
108                          * really have work to do.
109                          */
110                         mono_os_cond_wait (&work_cond, &lock);
111                         continue;
112                 }
113
114                 mono_os_mutex_unlock (&lock);
115
116                 if (job) {
117                         job->func (thread_data, job);
118
119                         mono_os_mutex_lock (&lock);
120
121                         SGEN_ASSERT (0, job->state == STATE_IN_PROGRESS, "The job should still be in progress.");
122                         job->state = STATE_DONE;
123                         remove_job (job);
124                         /*
125                          * Only the main GC thread will ever wait on the done condition, so we don't
126                          * have to broadcast.
127                          */
128                         mono_os_cond_signal (&done_cond);
129                 } else if (do_idle) {
130                         SGEN_ASSERT (0, idle_job_func, "Why do we have idle work when there's no idle job function?");
131                         do {
132                                 idle_job_func (thread_data);
133                                 do_idle = continue_idle_job ();
134                         } while (do_idle && !job_queue.next_slot);
135
136                         mono_os_mutex_lock (&lock);
137
138                         if (!do_idle)
139                                 mono_os_cond_signal (&done_cond);
140                 } else {
141                         SGEN_ASSERT (0, threadpool_shutdown, "Why did we unlock if no jobs and not shutting down?");
142                         mono_os_mutex_lock (&lock);
143                         thread_finished = TRUE;
144                         mono_os_cond_signal (&done_cond);
145                         mono_os_mutex_unlock (&lock);
146                         return 0;
147                 }
148         }
149
150         return (mono_native_thread_return_t)0;
151 }
152
153 void
154 sgen_thread_pool_init (int num_threads, SgenThreadPoolThreadInitFunc init_func, SgenThreadPoolIdleJobFunc idle_func, SgenThreadPoolContinueIdleJobFunc continue_idle_func, void **thread_datas)
155 {
156         SGEN_ASSERT (0, num_threads == 1, "We only support 1 thread pool thread for now.");
157
158         mono_os_mutex_init (&lock);
159         mono_os_cond_init (&work_cond);
160         mono_os_cond_init (&done_cond);
161
162         thread_init_func = init_func;
163         idle_job_func = idle_func;
164         continue_idle_job_func = continue_idle_func;
165
166         mono_native_thread_create (&thread, thread_func, thread_datas ? thread_datas [0] : NULL);
167 }
168
169 void
170 sgen_thread_pool_shutdown (void)
171 {
172         if (!thread)
173                 return;
174
175         mono_os_mutex_lock (&lock);
176         threadpool_shutdown = TRUE;
177         mono_os_cond_signal (&work_cond);
178         while (!thread_finished)
179                 mono_os_cond_wait (&done_cond, &lock);
180         mono_os_mutex_unlock (&lock);
181
182         mono_os_mutex_destroy (&lock);
183         mono_os_cond_destroy (&work_cond);
184         mono_os_cond_destroy (&done_cond);
185 }
186
187 SgenThreadPoolJob*
188 sgen_thread_pool_job_alloc (const char *name, SgenThreadPoolJobFunc func, size_t size)
189 {
190         SgenThreadPoolJob *job = (SgenThreadPoolJob *)sgen_alloc_internal_dynamic (size, INTERNAL_MEM_THREAD_POOL_JOB, TRUE);
191         job->name = name;
192         job->size = size;
193         job->state = STATE_WAITING;
194         job->func = func;
195         return job;
196 }
197
198 void
199 sgen_thread_pool_job_free (SgenThreadPoolJob *job)
200 {
201         sgen_free_internal_dynamic (job, job->size, INTERNAL_MEM_THREAD_POOL_JOB);
202 }
203
204 void
205 sgen_thread_pool_job_enqueue (SgenThreadPoolJob *job)
206 {
207         mono_os_mutex_lock (&lock);
208
209         sgen_pointer_queue_add (&job_queue, job);
210         /*
211          * FIXME: We could check whether there is a job in progress.  If there is, there's
212          * no need to signal the condition, at least as long as we have only one thread.
213          */
214         mono_os_cond_signal (&work_cond);
215
216         mono_os_mutex_unlock (&lock);
217 }
218
219 void
220 sgen_thread_pool_job_wait (SgenThreadPoolJob *job)
221 {
222         SGEN_ASSERT (0, job, "Where's the job?");
223
224         mono_os_mutex_lock (&lock);
225
226         while (find_job_in_queue (job) >= 0)
227                 mono_os_cond_wait (&done_cond, &lock);
228
229         mono_os_mutex_unlock (&lock);
230 }
231
232 void
233 sgen_thread_pool_idle_signal (void)
234 {
235         SGEN_ASSERT (0, idle_job_func, "Why are we signaling idle without an idle function?");
236
237         mono_os_mutex_lock (&lock);
238
239         if (continue_idle_job_func ())
240                 mono_os_cond_signal (&work_cond);
241
242         mono_os_mutex_unlock (&lock);
243 }
244
245 void
246 sgen_thread_pool_idle_wait (void)
247 {
248         SGEN_ASSERT (0, idle_job_func, "Why are we waiting for idle without an idle function?");
249
250         mono_os_mutex_lock (&lock);
251
252         while (continue_idle_job_func ())
253                 mono_os_cond_wait (&done_cond, &lock);
254
255         mono_os_mutex_unlock (&lock);
256 }
257
258 void
259 sgen_thread_pool_wait_for_all_jobs (void)
260 {
261         mono_os_mutex_lock (&lock);
262
263         while (!sgen_pointer_queue_is_empty (&job_queue))
264                 mono_os_cond_wait (&done_cond, &lock);
265
266         mono_os_mutex_unlock (&lock);
267 }
268
269 gboolean
270 sgen_thread_pool_is_thread_pool_thread (MonoNativeThreadId some_thread)
271 {
272         return some_thread == thread;
273 }
274
275 #endif