[sgen] Avoid waiting for workers to report their own finish
[mono.git] / mono / sgen / sgen-workers.c
1 /**
2  * \file
3  * Worker threads for parallel and concurrent GC.
4  *
5  * Copyright 2001-2003 Ximian, Inc
6  * Copyright 2003-2010 Novell, Inc.
7  * Copyright (C) 2012 Xamarin Inc
8  *
9  * Licensed under the MIT license. See LICENSE file in the project root for full license information.
10  */
11
12 #include "config.h"
13 #ifdef HAVE_SGEN_GC
14
15 #include <string.h>
16
17 #include "mono/sgen/sgen-gc.h"
18 #include "mono/sgen/sgen-workers.h"
19 #include "mono/sgen/sgen-thread-pool.h"
20 #include "mono/utils/mono-membar.h"
21 #include "mono/sgen/sgen-client.h"
22
23 static WorkerContext worker_contexts [GENERATION_MAX];
24
25 /*
26  * Allowed transitions:
27  *
28  * | from \ to          | NOT WORKING | WORKING | WORK ENQUEUED |
29  * |--------------------+-------------+---------+---------------+
30  * | NOT WORKING        | -           | -       | main / worker |
31  * | WORKING            | worker      | -       | main / worker |
32  * | WORK ENQUEUED      | -           | worker  | -             |
33  *
34  * The WORK ENQUEUED state guarantees that the worker thread will inspect the queue again at
35  * least once.  Only after looking at the queue will it go back to WORKING, and then,
36  * eventually, to NOT WORKING.  After enqueuing work the main thread transitions the state
37  * to WORK ENQUEUED.  Signalling the worker thread to wake up is only necessary if the old
38  * state was NOT WORKING.
39  */
40
41 enum {
42         STATE_NOT_WORKING,
43         STATE_WORKING,
44         STATE_WORK_ENQUEUED
45 };
46
47 #define SGEN_WORKER_MIN_SECTIONS_SIGNAL 4
48
49 static guint64 stat_workers_num_finished;
50
51 static gboolean
52 set_state (WorkerData *data, State old_state, State new_state)
53 {
54         SGEN_ASSERT (0, old_state != new_state, "Why are we transitioning to the same state?");
55         if (new_state == STATE_NOT_WORKING)
56                 SGEN_ASSERT (0, old_state == STATE_WORKING, "We can only transition to NOT WORKING from WORKING");
57         else if (new_state == STATE_WORKING)
58                 SGEN_ASSERT (0, old_state == STATE_WORK_ENQUEUED, "We can only transition to WORKING from WORK ENQUEUED");
59
60         return InterlockedCompareExchange (&data->state, new_state, old_state) == old_state;
61 }
62
63 static gboolean
64 state_is_working_or_enqueued (State state)
65 {
66         return state == STATE_WORKING || state == STATE_WORK_ENQUEUED;
67 }
68
69 static void
70 sgen_workers_ensure_awake (WorkerContext *context)
71 {
72         int i;
73         gboolean need_signal = FALSE;
74
75         /*
76          * All workers are awaken, make sure we reset the parallel context.
77          * We call this function only when starting the workers so nobody is running,
78          * or when the last worker is enqueuing preclean work. In both cases we can't
79          * have a worker working using a nopar context, which means it is safe.
80          */
81         context->idle_func_object_ops = (context->active_workers_num > 1) ? context->idle_func_object_ops_par : context->idle_func_object_ops_nopar;
82         context->workers_finished = FALSE;
83
84         for (i = 0; i < context->active_workers_num; i++) {
85                 State old_state;
86                 gboolean did_set_state;
87
88                 do {
89                         old_state = context->workers_data [i].state;
90
91                         if (old_state == STATE_WORK_ENQUEUED)
92                                 break;
93
94                         did_set_state = set_state (&context->workers_data [i], old_state, STATE_WORK_ENQUEUED);
95                 } while (!did_set_state);
96
97                 if (!state_is_working_or_enqueued (old_state))
98                         need_signal = TRUE;
99         }
100
101         if (need_signal)
102                 sgen_thread_pool_idle_signal (context->thread_pool_context);
103 }
104
105 static void
106 worker_try_finish (WorkerData *data)
107 {
108         State old_state;
109         int i, working = 0;
110         WorkerContext *context = data->context;
111
112         ++stat_workers_num_finished;
113
114         mono_os_mutex_lock (&context->finished_lock);
115
116         for (i = 0; i < context->active_workers_num; i++) {
117                 if (state_is_working_or_enqueued (context->workers_data [i].state))
118                         working++;
119         }
120
121         if (working == 1) {
122                 SgenWorkersFinishCallback callback = context->finish_callback;
123                 SGEN_ASSERT (0, context->idle_func_object_ops == context->idle_func_object_ops_nopar, "Why are we finishing with parallel context");
124                 /* We are the last one left. Enqueue preclean job if we have one and awake everybody */
125                 SGEN_ASSERT (0, data->state != STATE_NOT_WORKING, "How did we get from doing idle work to NOT WORKING without setting it ourselves?");
126                 if (callback) {
127                         context->finish_callback = NULL;
128                         callback ();
129                         context->worker_awakenings = 0;
130                         /* Make sure each worker has a chance of seeing the enqueued jobs */
131                         sgen_workers_ensure_awake (context);
132                         SGEN_ASSERT (0, data->state == STATE_WORK_ENQUEUED, "Why did we fail to set our own state to ENQUEUED");
133                         goto work_available;
134                 }
135         }
136
137         do {
138                 old_state = data->state;
139
140                 SGEN_ASSERT (0, old_state != STATE_NOT_WORKING, "How did we get from doing idle work to NOT WORKING without setting it ourselves?");
141                 if (old_state == STATE_WORK_ENQUEUED)
142                         goto work_available;
143                 SGEN_ASSERT (0, old_state == STATE_WORKING, "What other possibility is there?");
144         } while (!set_state (data, old_state, STATE_NOT_WORKING));
145
146         /*
147          * If we are second to last to finish, we set the scan context to the non-parallel
148          * version so we can speed up the last worker. This helps us maintain same level
149          * of performance as non-parallel mode even if we fail to distribute work properly.
150          */
151         if (working == 2)
152                 context->idle_func_object_ops = context->idle_func_object_ops_nopar;
153
154         context->workers_finished = TRUE;
155         mono_os_mutex_unlock (&context->finished_lock);
156
157         binary_protocol_worker_finish (sgen_timestamp (), context->forced_stop);
158
159         sgen_gray_object_queue_trim_free_list (&data->private_gray_queue);
160         return;
161
162 work_available:
163         mono_os_mutex_unlock (&context->finished_lock);
164 }
165
166 void
167 sgen_workers_enqueue_job (int generation, SgenThreadPoolJob *job, gboolean enqueue)
168 {
169         if (!enqueue) {
170                 job->func (NULL, job);
171                 sgen_thread_pool_job_free (job);
172                 return;
173         }
174
175         sgen_thread_pool_job_enqueue (worker_contexts [generation].thread_pool_context, job);
176 }
177
178 static gboolean
179 workers_get_work (WorkerData *data)
180 {
181         SgenMajorCollector *major = sgen_get_major_collector ();
182         SgenMinorCollector *minor = sgen_get_minor_collector ();
183         GrayQueueSection *section;
184
185         g_assert (sgen_gray_object_queue_is_empty (&data->private_gray_queue));
186         g_assert (major->is_concurrent || minor->is_parallel);
187
188         section = sgen_section_gray_queue_dequeue (&data->context->workers_distribute_gray_queue);
189         if (section) {
190                 sgen_gray_object_enqueue_section (&data->private_gray_queue, section, major->is_parallel);
191                 return TRUE;
192         }
193
194         /* Nobody to steal from */
195         g_assert (sgen_gray_object_queue_is_empty (&data->private_gray_queue));
196         return FALSE;
197 }
198
199 static gboolean
200 workers_steal_work (WorkerData *data)
201 {
202         SgenMajorCollector *major = sgen_get_major_collector ();
203         SgenMinorCollector *minor = sgen_get_minor_collector ();
204         int generation = sgen_get_current_collection_generation ();
205         GrayQueueSection *section = NULL;
206         WorkerContext *context = data->context;
207         int i, current_worker;
208
209         if ((generation == GENERATION_OLD && !major->is_parallel) ||
210                         (generation == GENERATION_NURSERY && !minor->is_parallel))
211                 return FALSE;
212
213         /* If we're parallel, steal from other workers' private gray queues  */
214         g_assert (sgen_gray_object_queue_is_empty (&data->private_gray_queue));
215
216         current_worker = (int) (data - context->workers_data);
217
218         for (i = 1; i < context->active_workers_num && !section; i++) {
219                 int steal_worker = (current_worker + i) % context->active_workers_num;
220                 if (state_is_working_or_enqueued (context->workers_data [steal_worker].state))
221                         section = sgen_gray_object_steal_section (&context->workers_data [steal_worker].private_gray_queue);
222         }
223
224         if (section) {
225                 sgen_gray_object_enqueue_section (&data->private_gray_queue, section, TRUE);
226                 return TRUE;
227         }
228
229         /* Nobody to steal from */
230         g_assert (sgen_gray_object_queue_is_empty (&data->private_gray_queue));
231         return FALSE;
232 }
233
234 static void
235 concurrent_enqueue_check (GCObject *obj)
236 {
237         g_assert (sgen_concurrent_collection_in_progress ());
238         g_assert (!sgen_ptr_in_nursery (obj));
239         g_assert (SGEN_LOAD_VTABLE (obj));
240 }
241
242 static void
243 init_private_gray_queue (WorkerData *data)
244 {
245         sgen_gray_object_queue_init (&data->private_gray_queue,
246                         sgen_get_major_collector ()->is_concurrent ? concurrent_enqueue_check : NULL,
247                         FALSE);
248 }
249
250 static void
251 thread_pool_init_func (void *data_untyped)
252 {
253         WorkerData *data = (WorkerData *)data_untyped;
254         SgenMajorCollector *major = sgen_get_major_collector ();
255         SgenMinorCollector *minor = sgen_get_minor_collector ();
256
257         if (!major->is_concurrent && !minor->is_parallel)
258                 return;
259
260         init_private_gray_queue (data);
261
262         /* Separate WorkerData for same thread share free_block_lists */
263         if (major->is_parallel || minor->is_parallel)
264                 major->init_block_free_lists (&data->free_block_lists);
265 }
266
267 static gboolean
268 sgen_workers_are_working (WorkerContext *context)
269 {
270         int i;
271
272         for (i = 0; i < context->active_workers_num; i++) {
273                 if (state_is_working_or_enqueued (context->workers_data [i].state))
274                         return TRUE;
275         }
276         return FALSE;
277 }
278
279 static gboolean
280 continue_idle_func (void *data_untyped, int thread_pool_context)
281 {
282         if (data_untyped)
283                 return state_is_working_or_enqueued (((WorkerData*)data_untyped)->state);
284
285         /* Return if any of the threads is working in the context */
286         if (worker_contexts [GENERATION_NURSERY].workers_num && worker_contexts [GENERATION_NURSERY].thread_pool_context == thread_pool_context)
287                 return sgen_workers_are_working (&worker_contexts [GENERATION_NURSERY]);
288         if (worker_contexts [GENERATION_OLD].workers_num && worker_contexts [GENERATION_OLD].thread_pool_context == thread_pool_context)
289                 return sgen_workers_are_working (&worker_contexts [GENERATION_OLD]);
290
291         g_assert_not_reached ();
292         return FALSE;
293 }
294
295 static gboolean
296 should_work_func (void *data_untyped)
297 {
298         WorkerData *data = (WorkerData*)data_untyped;
299         WorkerContext *context = data->context;
300         int current_worker = (int) (data - context->workers_data);
301
302         return context->started && current_worker < context->active_workers_num && state_is_working_or_enqueued (data->state);
303 }
304
305 static void
306 marker_idle_func (void *data_untyped)
307 {
308         WorkerData *data = (WorkerData *)data_untyped;
309         WorkerContext *context = data->context;
310
311         SGEN_ASSERT (0, continue_idle_func (data_untyped, context->thread_pool_context), "Why are we called when we're not supposed to work?");
312
313         if (data->state == STATE_WORK_ENQUEUED) {
314                 set_state (data, STATE_WORK_ENQUEUED, STATE_WORKING);
315                 SGEN_ASSERT (0, data->state != STATE_NOT_WORKING, "How did we get from WORK ENQUEUED to NOT WORKING?");
316         }
317
318         if (!context->forced_stop && (!sgen_gray_object_queue_is_empty (&data->private_gray_queue) || workers_get_work (data) || workers_steal_work (data))) {
319                 ScanCopyContext ctx = CONTEXT_FROM_OBJECT_OPERATIONS (context->idle_func_object_ops, &data->private_gray_queue);
320
321                 SGEN_ASSERT (0, !sgen_gray_object_queue_is_empty (&data->private_gray_queue), "How is our gray queue empty if we just got work?");
322
323                 sgen_drain_gray_stack (ctx);
324
325                 if (data->private_gray_queue.num_sections >= SGEN_WORKER_MIN_SECTIONS_SIGNAL
326                                 && context->workers_finished && context->worker_awakenings < context->active_workers_num) {
327                         /* We bound the number of worker awakenings just to be sure */
328                         context->worker_awakenings++;
329                         mono_os_mutex_lock (&context->finished_lock);
330                         sgen_workers_ensure_awake (context);
331                         mono_os_mutex_unlock (&context->finished_lock);
332                 }
333         } else {
334                 worker_try_finish (data);
335         }
336 }
337
338 static void
339 init_distribute_gray_queue (WorkerContext *context)
340 {
341         sgen_section_gray_queue_init (&context->workers_distribute_gray_queue, TRUE,
342                         sgen_get_major_collector ()->is_concurrent ? concurrent_enqueue_check : NULL);
343 }
344
345 void
346 sgen_workers_create_context (int generation, int num_workers)
347 {
348         static gboolean stat_inited = FALSE;
349         int i;
350         WorkerData **workers_data_ptrs = (WorkerData**)sgen_alloc_internal_dynamic (num_workers * sizeof(WorkerData*), INTERNAL_MEM_WORKER_DATA, TRUE);
351         WorkerContext *context = &worker_contexts [generation];
352
353         SGEN_ASSERT (0, !context->workers_num, "We can't init the worker context for a generation twice");
354
355         mono_os_mutex_init (&context->finished_lock);
356
357         context->generation = generation;
358         context->workers_num = num_workers;
359         context->active_workers_num = num_workers;
360
361         context->workers_data = (WorkerData *)sgen_alloc_internal_dynamic (sizeof (WorkerData) * num_workers, INTERNAL_MEM_WORKER_DATA, TRUE);
362         memset (context->workers_data, 0, sizeof (WorkerData) * num_workers);
363
364         init_distribute_gray_queue (context);
365
366         for (i = 0; i < num_workers; ++i) {
367                 workers_data_ptrs [i] = &context->workers_data [i];
368                 context->workers_data [i].context = context;
369         }
370
371         context->thread_pool_context = sgen_thread_pool_create_context (num_workers, thread_pool_init_func, marker_idle_func, continue_idle_func, should_work_func, (void**)workers_data_ptrs);
372
373         if (!stat_inited) {
374                 mono_counters_register ("# workers finished", MONO_COUNTER_GC | MONO_COUNTER_ULONG, &stat_workers_num_finished);
375                 stat_inited = TRUE;
376         }
377 }
378
379 /* This is called with thread pool lock so no context switch can happen */
380 static gboolean
381 continue_idle_wait (int calling_context, int *threads_context)
382 {
383         WorkerContext *context;
384         int i;
385
386         if (worker_contexts [GENERATION_OLD].workers_num && calling_context == worker_contexts [GENERATION_OLD].thread_pool_context)
387                 context = &worker_contexts [GENERATION_OLD];
388         else if (worker_contexts [GENERATION_NURSERY].workers_num && calling_context == worker_contexts [GENERATION_NURSERY].thread_pool_context)
389                 context = &worker_contexts [GENERATION_NURSERY];
390         else
391                 g_assert_not_reached ();
392
393         /*
394          * We assume there are no pending jobs, since this is called only after
395          * we waited for all the jobs.
396          */
397         for (i = 0; i < context->active_workers_num; i++) {
398                 if (threads_context [i] == calling_context)
399                         return TRUE;
400         }
401
402         if (sgen_workers_have_idle_work (context->generation) && !context->forced_stop)
403                 return TRUE;
404
405         /*
406          * At this point there are no jobs to be done, and no objects to be scanned
407          * in the gray queues. We can simply asynchronously finish all the workers
408          * from the context that were not finished already (due to being stuck working
409          * in another context)
410          */
411
412         for (i = 0; i < context->active_workers_num; i++) {
413                 if (context->workers_data [i].state == STATE_WORK_ENQUEUED)
414                         set_state (&context->workers_data [i], STATE_WORK_ENQUEUED, STATE_WORKING);
415                 if (context->workers_data [i].state == STATE_WORKING)
416                         worker_try_finish (&context->workers_data [i]);
417         }
418
419         return FALSE;
420 }
421
422
423 void
424 sgen_workers_stop_all_workers (int generation)
425 {
426         WorkerContext *context = &worker_contexts [generation];
427
428         mono_os_mutex_lock (&context->finished_lock);
429         context->finish_callback = NULL;
430         mono_os_mutex_unlock (&context->finished_lock);
431
432         context->forced_stop = TRUE;
433
434         sgen_thread_pool_wait_for_all_jobs (context->thread_pool_context);
435         sgen_thread_pool_idle_wait (context->thread_pool_context, continue_idle_wait);
436         SGEN_ASSERT (0, !sgen_workers_are_working (context), "Can only signal enqueue work when in no work state");
437
438         context->started = FALSE;
439 }
440
441 void
442 sgen_workers_set_num_active_workers (int generation, int num_workers)
443 {
444         WorkerContext *context = &worker_contexts [generation];
445         if (num_workers) {
446                 SGEN_ASSERT (0, num_workers <= context->workers_num, "We can't start more workers than we initialized");
447                 context->active_workers_num = num_workers;
448         } else {
449                 context->active_workers_num = context->workers_num;
450         }
451 }
452
453 void
454 sgen_workers_start_all_workers (int generation, SgenObjectOperations *object_ops_nopar, SgenObjectOperations *object_ops_par, SgenWorkersFinishCallback callback)
455 {
456         WorkerContext *context = &worker_contexts [generation];
457         SGEN_ASSERT (0, !context->started, "Why are we starting to work without finishing previous cycle");
458
459         context->idle_func_object_ops_par = object_ops_par;
460         context->idle_func_object_ops_nopar = object_ops_nopar;
461         context->forced_stop = FALSE;
462         context->finish_callback = callback;
463         context->worker_awakenings = 0;
464         context->started = TRUE;
465         mono_memory_write_barrier ();
466
467         /*
468          * We expect workers to start finishing only after all of them were awaken.
469          * Otherwise we might think that we have fewer workers and use wrong context.
470          */
471         mono_os_mutex_lock (&context->finished_lock);
472         sgen_workers_ensure_awake (context);
473         mono_os_mutex_unlock (&context->finished_lock);
474 }
475
476 void
477 sgen_workers_join (int generation)
478 {
479         WorkerContext *context = &worker_contexts [generation];
480         int i;
481
482         SGEN_ASSERT (0, !context->finish_callback, "Why are we joining concurrent mark early");
483
484         sgen_thread_pool_wait_for_all_jobs (context->thread_pool_context);
485         sgen_thread_pool_idle_wait (context->thread_pool_context, continue_idle_wait);
486         SGEN_ASSERT (0, !sgen_workers_are_working (context), "Can only signal enqueue work when in no work state");
487
488         /* At this point all the workers have stopped. */
489
490         SGEN_ASSERT (0, sgen_section_gray_queue_is_empty (&context->workers_distribute_gray_queue), "Why is there still work left to do?");
491         for (i = 0; i < context->active_workers_num; ++i)
492                 SGEN_ASSERT (0, sgen_gray_object_queue_is_empty (&context->workers_data [i].private_gray_queue), "Why is there still work left to do?");
493
494         context->started = FALSE;
495 }
496
497 /*
498  * Can only be called if the workers are not working in the
499  * context and there are no pending jobs.
500  */
501 gboolean
502 sgen_workers_have_idle_work (int generation)
503 {
504         WorkerContext *context = &worker_contexts [generation];
505         int i;
506
507         if (!sgen_section_gray_queue_is_empty (&context->workers_distribute_gray_queue))
508                 return TRUE;
509
510         for (i = 0; i < context->active_workers_num; ++i) {
511                 if (!sgen_gray_object_queue_is_empty (&context->workers_data [i].private_gray_queue))
512                         return TRUE;
513         }
514
515         return FALSE;
516 }
517
518 gboolean
519 sgen_workers_all_done (void)
520 {
521         if (worker_contexts [GENERATION_NURSERY].workers_num && sgen_workers_are_working (&worker_contexts [GENERATION_NURSERY]))
522                 return FALSE;
523         if (worker_contexts [GENERATION_OLD].workers_num && sgen_workers_are_working (&worker_contexts [GENERATION_OLD]))
524                 return FALSE;
525
526         return TRUE;
527 }
528
529 void
530 sgen_workers_assert_gray_queue_is_empty (int generation)
531 {
532         SGEN_ASSERT (0, sgen_section_gray_queue_is_empty (&worker_contexts [generation].workers_distribute_gray_queue), "Why is the workers gray queue not empty?");
533 }
534
535 void
536 sgen_workers_take_from_queue (int generation, SgenGrayQueue *queue)
537 {
538         WorkerContext *context = &worker_contexts [generation];
539
540         sgen_gray_object_spread (queue, sgen_workers_get_job_split_count (generation));
541
542         for (;;) {
543                 GrayQueueSection *section = sgen_gray_object_dequeue_section (queue);
544                 if (!section)
545                         break;
546                 sgen_section_gray_queue_enqueue (&context->workers_distribute_gray_queue, section);
547         }
548
549         SGEN_ASSERT (0, !sgen_workers_are_working (context), "We should fully populate the distribute gray queue before we start the workers");
550 }
551
552 SgenObjectOperations*
553 sgen_workers_get_idle_func_object_ops (WorkerData *worker)
554 {
555         g_assert (worker->context->idle_func_object_ops);
556         return worker->context->idle_func_object_ops;
557 }
558
559 /*
560  * If we have a single worker, splitting into multiple jobs makes no sense. With
561  * more than one worker, we split into a larger number of jobs so that, in case
562  * the work load is uneven, a worker that finished quickly can take up more jobs
563  * than another one.
564  *
565  * We also return 1 if there is no worker context for that generation.
566  */
567 int
568 sgen_workers_get_job_split_count (int generation)
569 {
570         return (worker_contexts [generation].active_workers_num > 1) ? worker_contexts [generation].active_workers_num * 4 : 1;
571 }
572
573 void
574 sgen_workers_foreach (int generation, SgenWorkerCallback callback)
575 {
576         WorkerContext *context = &worker_contexts [generation];
577         int i;
578
579         for (i = 0; i < context->workers_num; i++)
580                 callback (&context->workers_data [i]);
581 }
582
583 gboolean
584 sgen_workers_is_worker_thread (MonoNativeThreadId id)
585 {
586         return sgen_thread_pool_is_thread_pool_thread (id);
587 }
588
589 #endif