Merge pull request #2013 from BrzVlad/feature-concurrent-work
[mono.git] / mono / sgen / sgen-thread-pool.c
1 /*
2  * sgen-thread-pool.c: Threadpool for all concurrent GC work.
3  *
4  * Copyright (C) 2015 Xamarin Inc
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License 2.0 as published by the Free Software Foundation;
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Library General Public License for more details.
14  *
15  * You should have received a copy of the GNU Library General Public
16  * License 2.0 along with this library; if not, write to the Free
17  * Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18  */
19
20 #include "config.h"
21 #ifdef HAVE_SGEN_GC
22
23 #include "mono/sgen/sgen-gc.h"
24 #include "mono/sgen/sgen-thread-pool.h"
25 #include "mono/sgen/sgen-pointer-queue.h"
26 #include "mono/utils/mono-mutex.h"
27 #ifndef SGEN_WITHOUT_MONO
28 #include "mono/utils/mono-threads.h"
29 #endif
30
31 static mono_mutex_t lock;
32 static mono_cond_t work_cond;
33 static mono_cond_t done_cond;
34
35 static MonoNativeThreadId thread;
36
37 /* Only accessed with the lock held. */
38 static SgenPointerQueue job_queue;
39
40 static SgenThreadPoolThreadInitFunc thread_init_func;
41 static SgenThreadPoolIdleJobFunc idle_job_func;
42 static SgenThreadPoolContinueIdleJobFunc continue_idle_job_func;
43
44 enum {
45         STATE_WAITING,
46         STATE_IN_PROGRESS,
47         STATE_DONE
48 };
49
50 /* Assumes that the lock is held. */
51 static SgenThreadPoolJob*
52 get_job_and_set_in_progress (void)
53 {
54         for (size_t i = 0; i < job_queue.next_slot; ++i) {
55                 SgenThreadPoolJob *job = job_queue.data [i];
56                 if (job->state == STATE_WAITING) {
57                         job->state = STATE_IN_PROGRESS;
58                         return job;
59                 }
60         }
61         return NULL;
62 }
63
64 /* Assumes that the lock is held. */
65 static ssize_t
66 find_job_in_queue (SgenThreadPoolJob *job)
67 {
68         for (ssize_t i = 0; i < job_queue.next_slot; ++i) {
69                 if (job_queue.data [i] == job)
70                         return i;
71         }
72         return -1;
73 }
74
75 /* Assumes that the lock is held. */
76 static void
77 remove_job (SgenThreadPoolJob *job)
78 {
79         ssize_t index;
80         SGEN_ASSERT (0, job->state == STATE_DONE, "Why are we removing a job that's not done?");
81         index = find_job_in_queue (job);
82         SGEN_ASSERT (0, index >= 0, "Why is the job we're trying to remove not in the queue?");
83         job_queue.data [index] = NULL;
84         sgen_pointer_queue_remove_nulls (&job_queue);
85         sgen_thread_pool_job_free (job);
86 }
87
88 static gboolean
89 continue_idle_job (void)
90 {
91         if (!continue_idle_job_func)
92                 return FALSE;
93         return continue_idle_job_func ();
94 }
95
96 static mono_native_thread_return_t
97 thread_func (void *thread_data)
98 {
99         thread_init_func (thread_data);
100
101         mono_mutex_lock (&lock);
102         for (;;) {
103                 /*
104                  * It's important that we check the continue idle flag with the lock held.
105                  * Suppose we didn't check with the lock held, and the result is FALSE.  The
106                  * main thread might then set continue idle and signal us before we can take
107                  * the lock, and we'd lose the signal.
108                  */
109                 gboolean do_idle = continue_idle_job ();
110                 SgenThreadPoolJob *job = get_job_and_set_in_progress ();
111
112                 if (!job && !do_idle) {
113                         /*
114                          * pthread_cond_wait() can return successfully despite the condition
115                          * not being signalled, so we have to run this in a loop until we
116                          * really have work to do.
117                          */
118                         mono_cond_wait (&work_cond, &lock);
119                         continue;
120                 }
121
122                 mono_mutex_unlock (&lock);
123
124                 if (job) {
125                         job->func (thread_data, job);
126
127                         mono_mutex_lock (&lock);
128
129                         SGEN_ASSERT (0, job->state == STATE_IN_PROGRESS, "The job should still be in progress.");
130                         job->state = STATE_DONE;
131                         remove_job (job);
132                         /*
133                          * Only the main GC thread will ever wait on the done condition, so we don't
134                          * have to broadcast.
135                          */
136                         mono_cond_signal (&done_cond);
137                 } else {
138                         SGEN_ASSERT (0, do_idle, "Why did we unlock if we still have to wait for idle?");
139                         SGEN_ASSERT (0, idle_job_func, "Why do we have idle work when there's no idle job function?");
140                         do {
141                                 idle_job_func (thread_data);
142                                 do_idle = continue_idle_job ();
143                         } while (do_idle && !job_queue.next_slot);
144
145                         mono_mutex_lock (&lock);
146
147                         if (!do_idle)
148                                 mono_cond_signal (&done_cond);
149                 }
150         }
151
152         return (mono_native_thread_return_t)0;
153 }
154
155 void
156 sgen_thread_pool_init (int num_threads, SgenThreadPoolThreadInitFunc init_func, SgenThreadPoolIdleJobFunc idle_func, SgenThreadPoolContinueIdleJobFunc continue_idle_func, void **thread_datas)
157 {
158         SGEN_ASSERT (0, num_threads == 1, "We only support 1 thread pool thread for now.");
159
160         mono_mutex_init (&lock);
161         mono_cond_init (&work_cond, 0);
162         mono_cond_init (&done_cond, 0);
163
164         thread_init_func = init_func;
165         idle_job_func = idle_func;
166         continue_idle_job_func = continue_idle_func;
167
168         mono_native_thread_create (&thread, thread_func, thread_datas ? thread_datas [0] : NULL);
169 }
170
171 SgenThreadPoolJob*
172 sgen_thread_pool_job_alloc (const char *name, SgenThreadPoolJobFunc func, size_t size)
173 {
174         SgenThreadPoolJob *job = sgen_alloc_internal_dynamic (size, INTERNAL_MEM_THREAD_POOL_JOB, TRUE);
175         job->name = name;
176         job->size = size;
177         job->state = STATE_WAITING;
178         job->func = func;
179         return job;
180 }
181
182 void
183 sgen_thread_pool_job_free (SgenThreadPoolJob *job)
184 {
185         sgen_free_internal_dynamic (job, job->size, INTERNAL_MEM_THREAD_POOL_JOB);
186 }
187
188 void
189 sgen_thread_pool_job_enqueue (SgenThreadPoolJob *job)
190 {
191         mono_mutex_lock (&lock);
192
193         sgen_pointer_queue_add (&job_queue, job);
194         /*
195          * FIXME: We could check whether there is a job in progress.  If there is, there's
196          * no need to signal the condition, at least as long as we have only one thread.
197          */
198         mono_cond_signal (&work_cond);
199
200         mono_mutex_unlock (&lock);
201 }
202
203 void
204 sgen_thread_pool_job_wait (SgenThreadPoolJob *job)
205 {
206         SGEN_ASSERT (0, job, "Where's the job?");
207
208         mono_mutex_lock (&lock);
209
210         while (find_job_in_queue (job) >= 0)
211                 mono_cond_wait (&done_cond, &lock);
212
213         mono_mutex_unlock (&lock);
214 }
215
216 void
217 sgen_thread_pool_idle_signal (void)
218 {
219         SGEN_ASSERT (0, idle_job_func, "Why are we signaling idle without an idle function?");
220
221         mono_mutex_lock (&lock);
222
223         if (continue_idle_job_func ())
224                 mono_cond_signal (&work_cond);
225
226         mono_mutex_unlock (&lock);
227 }
228
229 void
230 sgen_thread_pool_idle_wait (void)
231 {
232         SGEN_ASSERT (0, idle_job_func, "Why are we waiting for idle without an idle function?");
233
234         mono_mutex_lock (&lock);
235
236         while (continue_idle_job_func ())
237                 mono_cond_wait (&done_cond, &lock);
238
239         mono_mutex_unlock (&lock);
240 }
241
242 void
243 sgen_thread_pool_wait_for_all_jobs (void)
244 {
245         mono_mutex_lock (&lock);
246
247         while (!sgen_pointer_queue_is_empty (&job_queue))
248                 mono_cond_wait (&done_cond, &lock);
249
250         mono_mutex_unlock (&lock);
251 }
252
253 gboolean
254 sgen_thread_pool_is_thread_pool_thread (MonoNativeThreadId some_thread)
255 {
256         return some_thread == thread;
257 }
258
259 #endif