[sgen] Add worker index to the binary protocol entries
[mono.git] / mono / sgen / sgen-thread-pool.c
1 /*
2  * sgen-thread-pool.c: Threadpool for all concurrent GC work.
3  *
4  * Copyright (C) 2015 Xamarin Inc
5  *
6  * Licensed under the MIT license. See LICENSE file in the project root for full license information.
7  */
8
9 #include "config.h"
10 #ifdef HAVE_SGEN_GC
11
12 #include "mono/sgen/sgen-gc.h"
13 #include "mono/sgen/sgen-thread-pool.h"
14 #include "mono/sgen/sgen-pointer-queue.h"
15 #include "mono/utils/mono-os-mutex.h"
16 #ifndef SGEN_WITHOUT_MONO
17 #include "mono/utils/mono-threads.h"
18 #endif
19
20 #define MAX_NUM_THREADS 8
21
22 static mono_mutex_t lock;
23 static mono_cond_t work_cond;
24 static mono_cond_t done_cond;
25
26 static int threads_num = 0;
27 static MonoNativeThreadId threads [MAX_NUM_THREADS];
28
29 /* Only accessed with the lock held. */
30 static SgenPointerQueue job_queue;
31
32 static SgenThreadPoolThreadInitFunc thread_init_func;
33 static SgenThreadPoolIdleJobFunc idle_job_func;
34 static SgenThreadPoolContinueIdleJobFunc continue_idle_job_func;
35
36 static volatile gboolean threadpool_shutdown;
37 static volatile int threads_finished = 0;
38
39 enum {
40         STATE_WAITING,
41         STATE_IN_PROGRESS,
42         STATE_DONE
43 };
44
45 /* Assumes that the lock is held. */
46 static SgenThreadPoolJob*
47 get_job_and_set_in_progress (void)
48 {
49         for (size_t i = 0; i < job_queue.next_slot; ++i) {
50                 SgenThreadPoolJob *job = (SgenThreadPoolJob *)job_queue.data [i];
51                 if (job->state == STATE_WAITING) {
52                         job->state = STATE_IN_PROGRESS;
53                         return job;
54                 }
55         }
56         return NULL;
57 }
58
59 /* Assumes that the lock is held. */
60 static ssize_t
61 find_job_in_queue (SgenThreadPoolJob *job)
62 {
63         for (ssize_t i = 0; i < job_queue.next_slot; ++i) {
64                 if (job_queue.data [i] == job)
65                         return i;
66         }
67         return -1;
68 }
69
70 /* Assumes that the lock is held. */
71 static void
72 remove_job (SgenThreadPoolJob *job)
73 {
74         ssize_t index;
75         SGEN_ASSERT (0, job->state == STATE_DONE, "Why are we removing a job that's not done?");
76         index = find_job_in_queue (job);
77         SGEN_ASSERT (0, index >= 0, "Why is the job we're trying to remove not in the queue?");
78         job_queue.data [index] = NULL;
79         sgen_pointer_queue_remove_nulls (&job_queue);
80         sgen_thread_pool_job_free (job);
81 }
82
83 static gboolean
84 continue_idle_job (void *thread_data)
85 {
86         if (!continue_idle_job_func)
87                 return FALSE;
88         return continue_idle_job_func (thread_data);
89 }
90
91 static mono_native_thread_return_t
92 thread_func (void *thread_data)
93 {
94         thread_init_func (thread_data);
95
96         mono_os_mutex_lock (&lock);
97         for (;;) {
98                 /*
99                  * It's important that we check the continue idle flag with the lock held.
100                  * Suppose we didn't check with the lock held, and the result is FALSE.  The
101                  * main thread might then set continue idle and signal us before we can take
102                  * the lock, and we'd lose the signal.
103                  */
104                 gboolean do_idle = continue_idle_job (thread_data);
105                 SgenThreadPoolJob *job = get_job_and_set_in_progress ();
106
107                 if (!job && !do_idle && !threadpool_shutdown) {
108                         /*
109                          * pthread_cond_wait() can return successfully despite the condition
110                          * not being signalled, so we have to run this in a loop until we
111                          * really have work to do.
112                          */
113                         mono_os_cond_wait (&work_cond, &lock);
114                         continue;
115                 }
116
117                 mono_os_mutex_unlock (&lock);
118
119                 if (job) {
120                         job->func (thread_data, job);
121
122                         mono_os_mutex_lock (&lock);
123
124                         SGEN_ASSERT (0, job->state == STATE_IN_PROGRESS, "The job should still be in progress.");
125                         job->state = STATE_DONE;
126                         remove_job (job);
127                         /*
128                          * Only the main GC thread will ever wait on the done condition, so we don't
129                          * have to broadcast.
130                          */
131                         mono_os_cond_signal (&done_cond);
132                 } else if (do_idle) {
133                         SGEN_ASSERT (0, idle_job_func, "Why do we have idle work when there's no idle job function?");
134                         do {
135                                 idle_job_func (thread_data);
136                                 do_idle = continue_idle_job (thread_data);
137                         } while (do_idle && !job_queue.next_slot);
138
139                         mono_os_mutex_lock (&lock);
140
141                         if (!do_idle)
142                                 mono_os_cond_signal (&done_cond);
143                 } else {
144                         SGEN_ASSERT (0, threadpool_shutdown, "Why did we unlock if no jobs and not shutting down?");
145                         mono_os_mutex_lock (&lock);
146                         threads_finished++;
147                         mono_os_cond_signal (&done_cond);
148                         mono_os_mutex_unlock (&lock);
149                         return 0;
150                 }
151         }
152
153         return (mono_native_thread_return_t)0;
154 }
155
156 void
157 sgen_thread_pool_init (int num_threads, SgenThreadPoolThreadInitFunc init_func, SgenThreadPoolIdleJobFunc idle_func, SgenThreadPoolContinueIdleJobFunc continue_idle_func, void **thread_datas)
158 {
159         int i;
160
161         threads_num = (num_threads < MAX_NUM_THREADS) ? num_threads : MAX_NUM_THREADS;
162
163         mono_os_mutex_init (&lock);
164         mono_os_cond_init (&work_cond);
165         mono_os_cond_init (&done_cond);
166
167         thread_init_func = init_func;
168         idle_job_func = idle_func;
169         continue_idle_job_func = continue_idle_func;
170
171         for (i = 0; i < threads_num; i++)
172                 mono_native_thread_create (&threads [i], thread_func, thread_datas ? thread_datas [i] : NULL);
173 }
174
175 void
176 sgen_thread_pool_shutdown (void)
177 {
178         if (!threads_num)
179                 return;
180
181         mono_os_mutex_lock (&lock);
182         threadpool_shutdown = TRUE;
183         mono_os_cond_broadcast (&work_cond);
184         while (threads_finished < threads_num)
185                 mono_os_cond_wait (&done_cond, &lock);
186         mono_os_mutex_unlock (&lock);
187
188         mono_os_mutex_destroy (&lock);
189         mono_os_cond_destroy (&work_cond);
190         mono_os_cond_destroy (&done_cond);
191 }
192
193 SgenThreadPoolJob*
194 sgen_thread_pool_job_alloc (const char *name, SgenThreadPoolJobFunc func, size_t size)
195 {
196         SgenThreadPoolJob *job = (SgenThreadPoolJob *)sgen_alloc_internal_dynamic (size, INTERNAL_MEM_THREAD_POOL_JOB, TRUE);
197         job->name = name;
198         job->size = size;
199         job->state = STATE_WAITING;
200         job->func = func;
201         return job;
202 }
203
204 void
205 sgen_thread_pool_job_free (SgenThreadPoolJob *job)
206 {
207         sgen_free_internal_dynamic (job, job->size, INTERNAL_MEM_THREAD_POOL_JOB);
208 }
209
210 void
211 sgen_thread_pool_job_enqueue (SgenThreadPoolJob *job)
212 {
213         mono_os_mutex_lock (&lock);
214
215         sgen_pointer_queue_add (&job_queue, job);
216         mono_os_cond_signal (&work_cond);
217
218         mono_os_mutex_unlock (&lock);
219 }
220
221 void
222 sgen_thread_pool_job_wait (SgenThreadPoolJob *job)
223 {
224         SGEN_ASSERT (0, job, "Where's the job?");
225
226         mono_os_mutex_lock (&lock);
227
228         while (find_job_in_queue (job) >= 0)
229                 mono_os_cond_wait (&done_cond, &lock);
230
231         mono_os_mutex_unlock (&lock);
232 }
233
234 void
235 sgen_thread_pool_idle_signal (void)
236 {
237         SGEN_ASSERT (0, idle_job_func, "Why are we signaling idle without an idle function?");
238
239         mono_os_mutex_lock (&lock);
240
241         if (continue_idle_job_func (NULL))
242                 mono_os_cond_broadcast (&work_cond);
243
244         mono_os_mutex_unlock (&lock);
245 }
246
247 void
248 sgen_thread_pool_idle_wait (void)
249 {
250         SGEN_ASSERT (0, idle_job_func, "Why are we waiting for idle without an idle function?");
251
252         mono_os_mutex_lock (&lock);
253
254         while (continue_idle_job_func (NULL))
255                 mono_os_cond_wait (&done_cond, &lock);
256
257         mono_os_mutex_unlock (&lock);
258 }
259
260 void
261 sgen_thread_pool_wait_for_all_jobs (void)
262 {
263         mono_os_mutex_lock (&lock);
264
265         while (!sgen_pointer_queue_is_empty (&job_queue))
266                 mono_os_cond_wait (&done_cond, &lock);
267
268         mono_os_mutex_unlock (&lock);
269 }
270
271 /* Return 0 if is not a thread pool thread or the thread number otherwise */
272 int
273 sgen_thread_pool_is_thread_pool_thread (MonoNativeThreadId some_thread)
274 {
275         int i;
276
277         for (i = 0; i < threads_num; i++) {
278                 if (some_thread == threads [i])
279                         return i + 1;
280         }
281
282         return 0;
283 }
284
285 #endif