[sgen] Remove redundant code
[mono.git] / mono / sgen / sgen-workers.c
1 /**
2  * \file
3  * Worker threads for parallel and concurrent GC.
4  *
5  * Copyright 2001-2003 Ximian, Inc
6  * Copyright 2003-2010 Novell, Inc.
7  * Copyright (C) 2012 Xamarin Inc
8  *
9  * Licensed under the MIT license. See LICENSE file in the project root for full license information.
10  */
11
12 #include "config.h"
13 #ifdef HAVE_SGEN_GC
14
15 #include <string.h>
16
17 #include "mono/sgen/sgen-gc.h"
18 #include "mono/sgen/sgen-workers.h"
19 #include "mono/sgen/sgen-thread-pool.h"
20 #include "mono/utils/mono-membar.h"
21 #include "mono/sgen/sgen-client.h"
22
23 static int workers_num;
24 static int active_workers_num;
25 static volatile gboolean started;
26 static volatile gboolean forced_stop;
27 static WorkerData *workers_data;
28 static SgenWorkerCallback worker_init_cb;
29
30 static SgenThreadPool pool_inst;
31 static SgenThreadPool *pool; /* null if we're not using workers */
32
33 /*
34  * When using multiple workers, we need to have the last worker
35  * enqueue the preclean jobs (if there are any). This lock ensures
36  * that when the last worker takes it, all the other workers have
37  * gracefully finished, so it can restart them.
38  */
39 static mono_mutex_t finished_lock;
40 static volatile gboolean workers_finished;
41 static int worker_awakenings;
42
43 static SgenSectionGrayQueue workers_distribute_gray_queue;
44
45 /*
46  * Allowed transitions:
47  *
48  * | from \ to          | NOT WORKING | WORKING | WORK ENQUEUED |
49  * |--------------------+-------------+---------+---------------+
50  * | NOT WORKING        | -           | -       | main / worker |
51  * | WORKING            | worker      | -       | main / worker |
52  * | WORK ENQUEUED      | -           | worker  | -             |
53  *
54  * The WORK ENQUEUED state guarantees that the worker thread will inspect the queue again at
55  * least once.  Only after looking at the queue will it go back to WORKING, and then,
56  * eventually, to NOT WORKING.  After enqueuing work the main thread transitions the state
57  * to WORK ENQUEUED.  Signalling the worker thread to wake up is only necessary if the old
58  * state was NOT WORKING.
59  */
60
61 enum {
62         STATE_NOT_WORKING,
63         STATE_WORKING,
64         STATE_WORK_ENQUEUED
65 };
66
67 #define SGEN_WORKER_MIN_SECTIONS_SIGNAL 4
68
69 typedef gint32 State;
70
71 static SgenObjectOperations * volatile idle_func_object_ops;
72 static SgenObjectOperations *idle_func_object_ops_par, *idle_func_object_ops_nopar;
73 /*
74  * finished_callback is called only when the workers finish work normally (when they
75  * are not forced to finish). The callback is used to enqueue preclean jobs.
76  */
77 static volatile SgenWorkersFinishCallback finish_callback;
78
79 static guint64 stat_workers_num_finished;
80
81 static gboolean
82 set_state (WorkerData *data, State old_state, State new_state)
83 {
84         SGEN_ASSERT (0, old_state != new_state, "Why are we transitioning to the same state?");
85         if (new_state == STATE_NOT_WORKING)
86                 SGEN_ASSERT (0, old_state == STATE_WORKING, "We can only transition to NOT WORKING from WORKING");
87         else if (new_state == STATE_WORKING)
88                 SGEN_ASSERT (0, old_state == STATE_WORK_ENQUEUED, "We can only transition to WORKING from WORK ENQUEUED");
89         if (new_state == STATE_NOT_WORKING || new_state == STATE_WORKING)
90                 SGEN_ASSERT (6, sgen_thread_pool_is_thread_pool_thread (pool, mono_native_thread_id_get ()), "Only the worker thread is allowed to transition to NOT_WORKING or WORKING");
91
92         return InterlockedCompareExchange (&data->state, new_state, old_state) == old_state;
93 }
94
95 static gboolean
96 state_is_working_or_enqueued (State state)
97 {
98         return state == STATE_WORKING || state == STATE_WORK_ENQUEUED;
99 }
100
101 static void
102 sgen_workers_ensure_awake (void)
103 {
104         int i;
105         gboolean need_signal = FALSE;
106
107         /*
108          * All workers are awaken, make sure we reset the parallel context.
109          * We call this function only when starting the workers so nobody is running,
110          * or when the last worker is enqueuing preclean work. In both cases we can't
111          * have a worker working using a nopar context, which means it is safe.
112          */
113         idle_func_object_ops = (active_workers_num > 1) ? idle_func_object_ops_par : idle_func_object_ops_nopar;
114         workers_finished = FALSE;
115
116         for (i = 0; i < active_workers_num; i++) {
117                 State old_state;
118                 gboolean did_set_state;
119
120                 do {
121                         old_state = workers_data [i].state;
122
123                         if (old_state == STATE_WORK_ENQUEUED)
124                                 break;
125
126                         did_set_state = set_state (&workers_data [i], old_state, STATE_WORK_ENQUEUED);
127                 } while (!did_set_state);
128
129                 if (!state_is_working_or_enqueued (old_state))
130                         need_signal = TRUE;
131         }
132
133         if (need_signal)
134                 sgen_thread_pool_idle_signal (pool);
135 }
136
137 static void
138 worker_try_finish (WorkerData *data)
139 {
140         State old_state;
141         int i, working = 0;
142
143         ++stat_workers_num_finished;
144
145         mono_os_mutex_lock (&finished_lock);
146
147         for (i = 0; i < active_workers_num; i++) {
148                 if (state_is_working_or_enqueued (workers_data [i].state))
149                         working++;
150         }
151
152         if (working == 1) {
153                 SgenWorkersFinishCallback callback = finish_callback;
154                 SGEN_ASSERT (0, idle_func_object_ops == idle_func_object_ops_nopar, "Why are we finishing with parallel context");
155                 /* We are the last one left. Enqueue preclean job if we have one and awake everybody */
156                 SGEN_ASSERT (0, data->state != STATE_NOT_WORKING, "How did we get from doing idle work to NOT WORKING without setting it ourselves?");
157                 if (callback) {
158                         finish_callback = NULL;
159                         callback ();
160                         worker_awakenings = 0;
161                         /* Make sure each worker has a chance of seeing the enqueued jobs */
162                         sgen_workers_ensure_awake ();
163                         SGEN_ASSERT (0, data->state == STATE_WORK_ENQUEUED, "Why did we fail to set our own state to ENQUEUED");
164                         goto work_available;
165                 }
166         }
167
168         do {
169                 old_state = data->state;
170
171                 SGEN_ASSERT (0, old_state != STATE_NOT_WORKING, "How did we get from doing idle work to NOT WORKING without setting it ourselves?");
172                 if (old_state == STATE_WORK_ENQUEUED)
173                         goto work_available;
174                 SGEN_ASSERT (0, old_state == STATE_WORKING, "What other possibility is there?");
175         } while (!set_state (data, old_state, STATE_NOT_WORKING));
176
177         /*
178          * If we are second to last to finish, we set the scan context to the non-parallel
179          * version so we can speed up the last worker. This helps us maintain same level
180          * of performance as non-parallel mode even if we fail to distribute work properly.
181          */
182         if (working == 2)
183                 idle_func_object_ops = idle_func_object_ops_nopar;
184
185         workers_finished = TRUE;
186         mono_os_mutex_unlock (&finished_lock);
187
188         binary_protocol_worker_finish (sgen_timestamp (), forced_stop);
189
190         sgen_gray_object_queue_trim_free_list (&data->private_gray_queue);
191         return;
192
193 work_available:
194         mono_os_mutex_unlock (&finished_lock);
195 }
196
197 void
198 sgen_workers_enqueue_job (SgenThreadPoolJob *job, gboolean enqueue)
199 {
200         if (!enqueue) {
201                 job->func (NULL, job);
202                 sgen_thread_pool_job_free (job);
203                 return;
204         }
205
206         sgen_thread_pool_job_enqueue (pool, job);
207 }
208
209 static gboolean
210 workers_get_work (WorkerData *data)
211 {
212         SgenMajorCollector *major = sgen_get_major_collector ();
213         SgenMinorCollector *minor = sgen_get_minor_collector ();
214         GrayQueueSection *section;
215
216         g_assert (sgen_gray_object_queue_is_empty (&data->private_gray_queue));
217         g_assert (major->is_concurrent || minor->is_parallel);
218
219         section = sgen_section_gray_queue_dequeue (&workers_distribute_gray_queue);
220         if (section) {
221                 sgen_gray_object_enqueue_section (&data->private_gray_queue, section, major->is_parallel);
222                 return TRUE;
223         }
224
225         /* Nobody to steal from */
226         g_assert (sgen_gray_object_queue_is_empty (&data->private_gray_queue));
227         return FALSE;
228 }
229
230 static gboolean
231 workers_steal_work (WorkerData *data)
232 {
233         SgenMajorCollector *major = sgen_get_major_collector ();
234         SgenMinorCollector *minor = sgen_get_minor_collector ();
235         int generation = sgen_get_current_collection_generation ();
236         GrayQueueSection *section = NULL;
237         int i, current_worker;
238
239         if ((generation == GENERATION_OLD && !major->is_parallel) ||
240                         (generation == GENERATION_NURSERY && !minor->is_parallel))
241                 return FALSE;
242
243         /* If we're parallel, steal from other workers' private gray queues  */
244         g_assert (sgen_gray_object_queue_is_empty (&data->private_gray_queue));
245
246         current_worker = (int) (data - workers_data);
247
248         for (i = 1; i < active_workers_num && !section; i++) {
249                 int steal_worker = (current_worker + i) % active_workers_num;
250                 if (state_is_working_or_enqueued (workers_data [steal_worker].state))
251                         section = sgen_gray_object_steal_section (&workers_data [steal_worker].private_gray_queue);
252         }
253
254         if (section) {
255                 sgen_gray_object_enqueue_section (&data->private_gray_queue, section, TRUE);
256                 return TRUE;
257         }
258
259         /* Nobody to steal from */
260         g_assert (sgen_gray_object_queue_is_empty (&data->private_gray_queue));
261         return FALSE;
262 }
263
264 static void
265 concurrent_enqueue_check (GCObject *obj)
266 {
267         g_assert (sgen_concurrent_collection_in_progress ());
268         g_assert (!sgen_ptr_in_nursery (obj));
269         g_assert (SGEN_LOAD_VTABLE (obj));
270 }
271
272 static void
273 init_private_gray_queue (WorkerData *data)
274 {
275         sgen_gray_object_queue_init (&data->private_gray_queue,
276                         sgen_get_major_collector ()->is_concurrent ? concurrent_enqueue_check : NULL,
277                         FALSE);
278 }
279
280 static void
281 thread_pool_init_func (void *data_untyped)
282 {
283         WorkerData *data = (WorkerData *)data_untyped;
284         SgenMajorCollector *major = sgen_get_major_collector ();
285         SgenMinorCollector *minor = sgen_get_minor_collector ();
286
287         sgen_client_thread_register_worker ();
288
289         if (!major->is_concurrent && !minor->is_parallel)
290                 return;
291
292         init_private_gray_queue (data);
293
294         if (worker_init_cb)
295                 worker_init_cb (data);
296 }
297
298 static gboolean
299 continue_idle_func (void *data_untyped)
300 {
301         if (data_untyped) {
302                 WorkerData *data = (WorkerData *)data_untyped;
303                 return state_is_working_or_enqueued (data->state);
304         } else {
305                 /* Return if any of the threads is working */
306                 return !sgen_workers_all_done ();
307         }
308 }
309
310 static gboolean
311 should_work_func (void *data_untyped)
312 {
313         WorkerData *data = (WorkerData*)data_untyped;
314         int current_worker = (int) (data - workers_data);
315
316         return started && current_worker < active_workers_num && state_is_working_or_enqueued (data->state);
317 }
318
319 static void
320 marker_idle_func (void *data_untyped)
321 {
322         WorkerData *data = (WorkerData *)data_untyped;
323
324         SGEN_ASSERT (0, continue_idle_func (data_untyped), "Why are we called when we're not supposed to work?");
325
326         if (data->state == STATE_WORK_ENQUEUED) {
327                 set_state (data, STATE_WORK_ENQUEUED, STATE_WORKING);
328                 SGEN_ASSERT (0, data->state != STATE_NOT_WORKING, "How did we get from WORK ENQUEUED to NOT WORKING?");
329         }
330
331         if (!forced_stop && (!sgen_gray_object_queue_is_empty (&data->private_gray_queue) || workers_get_work (data) || workers_steal_work (data))) {
332                 ScanCopyContext ctx = CONTEXT_FROM_OBJECT_OPERATIONS (idle_func_object_ops, &data->private_gray_queue);
333
334                 SGEN_ASSERT (0, !sgen_gray_object_queue_is_empty (&data->private_gray_queue), "How is our gray queue empty if we just got work?");
335
336                 sgen_drain_gray_stack (ctx);
337
338                 if (data->private_gray_queue.num_sections >= SGEN_WORKER_MIN_SECTIONS_SIGNAL
339                                 && workers_finished && worker_awakenings < active_workers_num) {
340                         /* We bound the number of worker awakenings just to be sure */
341                         worker_awakenings++;
342                         mono_os_mutex_lock (&finished_lock);
343                         sgen_workers_ensure_awake ();
344                         mono_os_mutex_unlock (&finished_lock);
345                 }
346         } else {
347                 worker_try_finish (data);
348         }
349 }
350
351 static void
352 init_distribute_gray_queue (void)
353 {
354         sgen_section_gray_queue_init (&workers_distribute_gray_queue, TRUE,
355                         sgen_get_major_collector ()->is_concurrent ? concurrent_enqueue_check : NULL);
356 }
357
358 void
359 sgen_workers_init (int num_workers, SgenWorkerCallback callback)
360 {
361         int i;
362         WorkerData **workers_data_ptrs = (WorkerData**)alloca(num_workers * sizeof(WorkerData*));
363
364         mono_os_mutex_init (&finished_lock);
365         //g_print ("initing %d workers\n", num_workers);
366
367         workers_num = num_workers;
368         active_workers_num = num_workers;
369
370         workers_data = (WorkerData *)sgen_alloc_internal_dynamic (sizeof (WorkerData) * num_workers, INTERNAL_MEM_WORKER_DATA, TRUE);
371         memset (workers_data, 0, sizeof (WorkerData) * num_workers);
372
373         init_distribute_gray_queue ();
374
375         for (i = 0; i < num_workers; ++i)
376                 workers_data_ptrs [i] = &workers_data [i];
377
378         worker_init_cb = callback;
379
380         pool = &pool_inst;
381         sgen_thread_pool_init (pool, num_workers, thread_pool_init_func, marker_idle_func, continue_idle_func, should_work_func, (SgenThreadPoolData**)workers_data_ptrs);
382
383         mono_counters_register ("# workers finished", MONO_COUNTER_GC | MONO_COUNTER_ULONG, &stat_workers_num_finished);
384 }
385
386 void
387 sgen_workers_shutdown (void)
388 {
389         if (pool)
390                 sgen_thread_pool_shutdown (pool);
391 }
392
393 void
394 sgen_workers_stop_all_workers (void)
395 {
396         finish_callback = NULL;
397         mono_memory_write_barrier ();
398         forced_stop = TRUE;
399
400         sgen_thread_pool_wait_for_all_jobs (pool);
401         sgen_thread_pool_idle_wait (pool);
402         SGEN_ASSERT (0, sgen_workers_all_done (), "Can only signal enqueue work when in no work state");
403
404         started = FALSE;
405 }
406
407 void
408 sgen_workers_set_num_active_workers (int num_workers)
409 {
410         if (num_workers) {
411                 SGEN_ASSERT (0, active_workers_num <= workers_num, "We can't start more workers than we initialized");
412                 active_workers_num = num_workers;
413         } else {
414                 active_workers_num = workers_num;
415         }
416 }
417
418 void
419 sgen_workers_start_all_workers (SgenObjectOperations *object_ops_nopar, SgenObjectOperations *object_ops_par, SgenWorkersFinishCallback callback)
420 {
421         SGEN_ASSERT (0, !started, "Why are we starting to work without finishing previous cycle");
422
423         idle_func_object_ops_par = object_ops_par;
424         idle_func_object_ops_nopar = object_ops_nopar;
425         forced_stop = FALSE;
426         finish_callback = callback;
427         worker_awakenings = 0;
428         started = TRUE;
429         mono_memory_write_barrier ();
430
431         /*
432          * We expect workers to start finishing only after all of them were awaken.
433          * Otherwise we might think that we have fewer workers and use wrong context.
434          */
435         mono_os_mutex_lock (&finished_lock);
436         sgen_workers_ensure_awake ();
437         mono_os_mutex_unlock (&finished_lock);
438 }
439
440 void
441 sgen_workers_join (void)
442 {
443         int i;
444
445         sgen_thread_pool_wait_for_all_jobs (pool);
446         sgen_thread_pool_idle_wait (pool);
447         SGEN_ASSERT (0, sgen_workers_all_done (), "Can only signal enqueue work when in no work state");
448
449         /* At this point all the workers have stopped. */
450
451         SGEN_ASSERT (0, sgen_section_gray_queue_is_empty (&workers_distribute_gray_queue), "Why is there still work left to do?");
452         for (i = 0; i < active_workers_num; ++i)
453                 SGEN_ASSERT (0, sgen_gray_object_queue_is_empty (&workers_data [i].private_gray_queue), "Why is there still work left to do?");
454
455         started = FALSE;
456 }
457
458 /*
459  * Can only be called if the workers are stopped.
460  * If we're stopped, there are also no pending jobs.
461  */
462 gboolean
463 sgen_workers_have_idle_work (void)
464 {
465         int i;
466
467         SGEN_ASSERT (0, forced_stop && sgen_workers_all_done (), "Checking for idle work should only happen if the workers are stopped.");
468
469         if (!sgen_section_gray_queue_is_empty (&workers_distribute_gray_queue))
470                 return TRUE;
471
472         for (i = 0; i < active_workers_num; ++i) {
473                 if (!sgen_gray_object_queue_is_empty (&workers_data [i].private_gray_queue))
474                         return TRUE;
475         }
476
477         return FALSE;
478 }
479
480 gboolean
481 sgen_workers_all_done (void)
482 {
483         int i;
484
485         for (i = 0; i < active_workers_num; i++) {
486                 if (state_is_working_or_enqueued (workers_data [i].state))
487                         return FALSE;
488         }
489         return TRUE;
490 }
491
492 /* Must only be used for debugging */
493 gboolean
494 sgen_workers_are_working (void)
495 {
496         return !sgen_workers_all_done ();
497 }
498
499 void
500 sgen_workers_assert_gray_queue_is_empty (void)
501 {
502         SGEN_ASSERT (0, sgen_section_gray_queue_is_empty (&workers_distribute_gray_queue), "Why is the workers gray queue not empty?");
503 }
504
505 void
506 sgen_workers_take_from_queue (SgenGrayQueue *queue)
507 {
508         sgen_gray_object_spread (queue, sgen_workers_get_job_split_count ());
509
510         for (;;) {
511                 GrayQueueSection *section = sgen_gray_object_dequeue_section (queue);
512                 if (!section)
513                         break;
514                 sgen_section_gray_queue_enqueue (&workers_distribute_gray_queue, section);
515         }
516
517         SGEN_ASSERT (0, !sgen_workers_are_working (), "We should fully populate the distribute gray queue before we start the workers");
518 }
519
520 SgenObjectOperations*
521 sgen_workers_get_idle_func_object_ops (void)
522 {
523         g_assert (idle_func_object_ops);
524         return idle_func_object_ops;
525 }
526
527 /*
528  * If we have a single worker, splitting into multiple jobs makes no sense. With
529  * more than one worker, we split into a larger number of jobs so that, in case
530  * the work load is uneven, a worker that finished quickly can take up more jobs
531  * than another one.
532  */
533 int
534 sgen_workers_get_job_split_count (void)
535 {
536         return (active_workers_num > 1) ? active_workers_num * 4 : 1;
537 }
538
539 void
540 sgen_workers_foreach (SgenWorkerCallback callback)
541 {
542         int i;
543
544         for (i = 0; i < workers_num; i++)
545                 callback (&workers_data [i]);
546 }
547
548 gboolean
549 sgen_workers_is_worker_thread (MonoNativeThreadId id)
550 {
551         if (!pool)
552                 return FALSE;
553         return sgen_thread_pool_is_thread_pool_thread (pool, id);
554 }
555
556 #endif