7f926b4f149f81dff35d184d2c9f3ac7e641672a
[mono.git] / mono / sgen / sgen-workers.c
1 /**
2  * \file
3  * Worker threads for parallel and concurrent GC.
4  *
5  * Copyright 2001-2003 Ximian, Inc
6  * Copyright 2003-2010 Novell, Inc.
7  * Copyright (C) 2012 Xamarin Inc
8  *
9  * Licensed under the MIT license. See LICENSE file in the project root for full license information.
10  */
11
12 #include "config.h"
13 #ifdef HAVE_SGEN_GC
14
15 #include <string.h>
16
17 #include "mono/sgen/sgen-gc.h"
18 #include "mono/sgen/sgen-workers.h"
19 #include "mono/sgen/sgen-thread-pool.h"
20 #include "mono/utils/mono-membar.h"
21 #include "mono/sgen/sgen-client.h"
22
23 static WorkerContext worker_contexts [GENERATION_MAX];
24
25 /*
26  * Allowed transitions:
27  *
28  * | from \ to          | NOT WORKING | WORKING | WORK ENQUEUED |
29  * |--------------------+-------------+---------+---------------+
30  * | NOT WORKING        | -           | -       | main / worker |
31  * | WORKING            | worker      | -       | main / worker |
32  * | WORK ENQUEUED      | -           | worker  | -             |
33  *
34  * The WORK ENQUEUED state guarantees that the worker thread will inspect the queue again at
35  * least once.  Only after looking at the queue will it go back to WORKING, and then,
36  * eventually, to NOT WORKING.  After enqueuing work the main thread transitions the state
37  * to WORK ENQUEUED.  Signalling the worker thread to wake up is only necessary if the old
38  * state was NOT WORKING.
39  */
40
41 enum {
42         STATE_NOT_WORKING,
43         STATE_WORKING,
44         STATE_WORK_ENQUEUED
45 };
46
47 #define SGEN_WORKER_MIN_SECTIONS_SIGNAL 4
48
49 static guint64 stat_workers_num_finished;
50
51 static gboolean
52 set_state (WorkerData *data, State old_state, State new_state)
53 {
54         SGEN_ASSERT (0, old_state != new_state, "Why are we transitioning to the same state?");
55         if (new_state == STATE_NOT_WORKING)
56                 SGEN_ASSERT (0, old_state == STATE_WORKING, "We can only transition to NOT WORKING from WORKING");
57         else if (new_state == STATE_WORKING)
58                 SGEN_ASSERT (0, old_state == STATE_WORK_ENQUEUED, "We can only transition to WORKING from WORK ENQUEUED");
59
60         return InterlockedCompareExchange (&data->state, new_state, old_state) == old_state;
61 }
62
63 static gboolean
64 state_is_working_or_enqueued (State state)
65 {
66         return state == STATE_WORKING || state == STATE_WORK_ENQUEUED;
67 }
68
69 static void
70 sgen_workers_ensure_awake (WorkerContext *context)
71 {
72         int i;
73         gboolean need_signal = FALSE;
74
75         /*
76          * All workers are awaken, make sure we reset the parallel context.
77          * We call this function only when starting the workers so nobody is running,
78          * or when the last worker is enqueuing preclean work. In both cases we can't
79          * have a worker working using a nopar context, which means it is safe.
80          */
81         context->idle_func_object_ops = (context->active_workers_num > 1) ? context->idle_func_object_ops_par : context->idle_func_object_ops_nopar;
82         context->workers_finished = FALSE;
83
84         for (i = 0; i < context->active_workers_num; i++) {
85                 State old_state;
86                 gboolean did_set_state;
87
88                 do {
89                         old_state = context->workers_data [i].state;
90
91                         if (old_state == STATE_WORK_ENQUEUED)
92                                 break;
93
94                         did_set_state = set_state (&context->workers_data [i], old_state, STATE_WORK_ENQUEUED);
95
96                         if (did_set_state && old_state == STATE_NOT_WORKING)
97                                 context->workers_data [i].last_start = sgen_timestamp ();
98                 } while (!did_set_state);
99
100                 if (!state_is_working_or_enqueued (old_state))
101                         need_signal = TRUE;
102         }
103
104         if (need_signal)
105                 sgen_thread_pool_idle_signal (context->thread_pool_context);
106 }
107
108 static void
109 worker_try_finish (WorkerData *data)
110 {
111         State old_state;
112         int i, working = 0;
113         WorkerContext *context = data->context;
114         gint64 last_start = data->last_start;
115
116         ++stat_workers_num_finished;
117
118         mono_os_mutex_lock (&context->finished_lock);
119
120         for (i = 0; i < context->active_workers_num; i++) {
121                 if (state_is_working_or_enqueued (context->workers_data [i].state))
122                         working++;
123         }
124
125         if (working == 1) {
126                 SgenWorkersFinishCallback callback = context->finish_callback;
127                 SGEN_ASSERT (0, context->idle_func_object_ops == context->idle_func_object_ops_nopar, "Why are we finishing with parallel context");
128                 /* We are the last one left. Enqueue preclean job if we have one and awake everybody */
129                 SGEN_ASSERT (0, data->state != STATE_NOT_WORKING, "How did we get from doing idle work to NOT WORKING without setting it ourselves?");
130                 if (callback) {
131                         context->finish_callback = NULL;
132                         callback ();
133                         context->worker_awakenings = 0;
134                         /* Make sure each worker has a chance of seeing the enqueued jobs */
135                         sgen_workers_ensure_awake (context);
136                         SGEN_ASSERT (0, data->state == STATE_WORK_ENQUEUED, "Why did we fail to set our own state to ENQUEUED");
137
138                         /*
139                          * Log to be able to get the duration of normal concurrent M&S phase.
140                          * Worker indexes are 1 based, since 0 is logically considered gc thread.
141                          */
142                         binary_protocol_worker_finish_stats (data - &context->workers_data [0] + 1, context->generation, context->forced_stop, data->major_scan_time, data->los_scan_time, data->total_time + sgen_timestamp () - last_start);
143                         goto work_available;
144                 }
145         }
146
147         do {
148                 old_state = data->state;
149
150                 SGEN_ASSERT (0, old_state != STATE_NOT_WORKING, "How did we get from doing idle work to NOT WORKING without setting it ourselves?");
151                 if (old_state == STATE_WORK_ENQUEUED)
152                         goto work_available;
153                 SGEN_ASSERT (0, old_state == STATE_WORKING, "What other possibility is there?");
154         } while (!set_state (data, old_state, STATE_NOT_WORKING));
155
156         /*
157          * If we are second to last to finish, we set the scan context to the non-parallel
158          * version so we can speed up the last worker. This helps us maintain same level
159          * of performance as non-parallel mode even if we fail to distribute work properly.
160          */
161         if (working == 2)
162                 context->idle_func_object_ops = context->idle_func_object_ops_nopar;
163
164         context->workers_finished = TRUE;
165         mono_os_mutex_unlock (&context->finished_lock);
166
167         data->total_time += (sgen_timestamp () - last_start);
168         binary_protocol_worker_finish_stats (data - &context->workers_data [0] + 1, context->generation, context->forced_stop, data->major_scan_time, data->los_scan_time, data->total_time);
169
170         binary_protocol_worker_finish (sgen_timestamp (), context->forced_stop);
171
172         sgen_gray_object_queue_trim_free_list (&data->private_gray_queue);
173         return;
174
175 work_available:
176         mono_os_mutex_unlock (&context->finished_lock);
177 }
178
179 void
180 sgen_workers_enqueue_job (int generation, SgenThreadPoolJob *job, gboolean enqueue)
181 {
182         if (!enqueue) {
183                 job->func (NULL, job);
184                 sgen_thread_pool_job_free (job);
185                 return;
186         }
187
188         sgen_thread_pool_job_enqueue (worker_contexts [generation].thread_pool_context, job);
189 }
190
191 static gboolean
192 workers_get_work (WorkerData *data)
193 {
194         SgenMajorCollector *major = sgen_get_major_collector ();
195         SgenMinorCollector *minor = sgen_get_minor_collector ();
196         GrayQueueSection *section;
197
198         g_assert (sgen_gray_object_queue_is_empty (&data->private_gray_queue));
199         g_assert (major->is_concurrent || minor->is_parallel);
200
201         section = sgen_section_gray_queue_dequeue (&data->context->workers_distribute_gray_queue);
202         if (section) {
203                 sgen_gray_object_enqueue_section (&data->private_gray_queue, section, major->is_parallel);
204                 return TRUE;
205         }
206
207         /* Nobody to steal from */
208         g_assert (sgen_gray_object_queue_is_empty (&data->private_gray_queue));
209         return FALSE;
210 }
211
212 static gboolean
213 workers_steal_work (WorkerData *data)
214 {
215         SgenMajorCollector *major = sgen_get_major_collector ();
216         SgenMinorCollector *minor = sgen_get_minor_collector ();
217         int generation = sgen_get_current_collection_generation ();
218         GrayQueueSection *section = NULL;
219         WorkerContext *context = data->context;
220         int i, current_worker;
221
222         if ((generation == GENERATION_OLD && !major->is_parallel) ||
223                         (generation == GENERATION_NURSERY && !minor->is_parallel))
224                 return FALSE;
225
226         /* If we're parallel, steal from other workers' private gray queues  */
227         g_assert (sgen_gray_object_queue_is_empty (&data->private_gray_queue));
228
229         current_worker = (int) (data - context->workers_data);
230
231         for (i = 1; i < context->active_workers_num && !section; i++) {
232                 int steal_worker = (current_worker + i) % context->active_workers_num;
233                 if (state_is_working_or_enqueued (context->workers_data [steal_worker].state))
234                         section = sgen_gray_object_steal_section (&context->workers_data [steal_worker].private_gray_queue);
235         }
236
237         if (section) {
238                 sgen_gray_object_enqueue_section (&data->private_gray_queue, section, TRUE);
239                 return TRUE;
240         }
241
242         /* Nobody to steal from */
243         g_assert (sgen_gray_object_queue_is_empty (&data->private_gray_queue));
244         return FALSE;
245 }
246
247 static void
248 concurrent_enqueue_check (GCObject *obj)
249 {
250         g_assert (sgen_concurrent_collection_in_progress ());
251         g_assert (!sgen_ptr_in_nursery (obj));
252         g_assert (SGEN_LOAD_VTABLE (obj));
253 }
254
255 static void
256 init_private_gray_queue (WorkerData *data)
257 {
258         sgen_gray_object_queue_init (&data->private_gray_queue,
259                         sgen_get_major_collector ()->is_concurrent ? concurrent_enqueue_check : NULL,
260                         FALSE);
261 }
262
263 static void
264 thread_pool_init_func (void *data_untyped)
265 {
266         WorkerData *data = (WorkerData *)data_untyped;
267         SgenMajorCollector *major = sgen_get_major_collector ();
268         SgenMinorCollector *minor = sgen_get_minor_collector ();
269
270         if (!major->is_concurrent && !minor->is_parallel)
271                 return;
272
273         init_private_gray_queue (data);
274
275         /* Separate WorkerData for same thread share free_block_lists */
276         if (major->is_parallel || minor->is_parallel)
277                 major->init_block_free_lists (&data->free_block_lists);
278 }
279
280 static gboolean
281 sgen_workers_are_working (WorkerContext *context)
282 {
283         int i;
284
285         for (i = 0; i < context->active_workers_num; i++) {
286                 if (state_is_working_or_enqueued (context->workers_data [i].state))
287                         return TRUE;
288         }
289         return FALSE;
290 }
291
292 static gboolean
293 continue_idle_func (void *data_untyped, int thread_pool_context)
294 {
295         if (data_untyped)
296                 return state_is_working_or_enqueued (((WorkerData*)data_untyped)->state);
297
298         /* Return if any of the threads is working in the context */
299         if (worker_contexts [GENERATION_NURSERY].workers_num && worker_contexts [GENERATION_NURSERY].thread_pool_context == thread_pool_context)
300                 return sgen_workers_are_working (&worker_contexts [GENERATION_NURSERY]);
301         if (worker_contexts [GENERATION_OLD].workers_num && worker_contexts [GENERATION_OLD].thread_pool_context == thread_pool_context)
302                 return sgen_workers_are_working (&worker_contexts [GENERATION_OLD]);
303
304         g_assert_not_reached ();
305         return FALSE;
306 }
307
308 static gboolean
309 should_work_func (void *data_untyped)
310 {
311         WorkerData *data = (WorkerData*)data_untyped;
312         WorkerContext *context = data->context;
313         int current_worker = (int) (data - context->workers_data);
314
315         return context->started && current_worker < context->active_workers_num && state_is_working_or_enqueued (data->state);
316 }
317
318 static void
319 marker_idle_func (void *data_untyped)
320 {
321         WorkerData *data = (WorkerData *)data_untyped;
322         WorkerContext *context = data->context;
323
324         SGEN_ASSERT (0, continue_idle_func (data_untyped, context->thread_pool_context), "Why are we called when we're not supposed to work?");
325
326         if (data->state == STATE_WORK_ENQUEUED) {
327                 set_state (data, STATE_WORK_ENQUEUED, STATE_WORKING);
328                 SGEN_ASSERT (0, data->state != STATE_NOT_WORKING, "How did we get from WORK ENQUEUED to NOT WORKING?");
329         }
330
331         if (!context->forced_stop && (!sgen_gray_object_queue_is_empty (&data->private_gray_queue) || workers_get_work (data) || workers_steal_work (data))) {
332                 ScanCopyContext ctx = CONTEXT_FROM_OBJECT_OPERATIONS (context->idle_func_object_ops, &data->private_gray_queue);
333
334                 SGEN_ASSERT (0, !sgen_gray_object_queue_is_empty (&data->private_gray_queue), "How is our gray queue empty if we just got work?");
335
336                 sgen_drain_gray_stack (ctx);
337
338                 if (data->private_gray_queue.num_sections >= SGEN_WORKER_MIN_SECTIONS_SIGNAL
339                                 && context->workers_finished && context->worker_awakenings < context->active_workers_num) {
340                         /* We bound the number of worker awakenings just to be sure */
341                         context->worker_awakenings++;
342                         mono_os_mutex_lock (&context->finished_lock);
343                         sgen_workers_ensure_awake (context);
344                         mono_os_mutex_unlock (&context->finished_lock);
345                 }
346         } else {
347                 worker_try_finish (data);
348         }
349 }
350
351 static void
352 init_distribute_gray_queue (WorkerContext *context)
353 {
354         sgen_section_gray_queue_init (&context->workers_distribute_gray_queue, TRUE,
355                         sgen_get_major_collector ()->is_concurrent ? concurrent_enqueue_check : NULL);
356 }
357
358 void
359 sgen_workers_create_context (int generation, int num_workers)
360 {
361         static gboolean stat_inited = FALSE;
362         int i;
363         WorkerData **workers_data_ptrs = (WorkerData**)sgen_alloc_internal_dynamic (num_workers * sizeof(WorkerData*), INTERNAL_MEM_WORKER_DATA, TRUE);
364         WorkerContext *context = &worker_contexts [generation];
365
366         SGEN_ASSERT (0, !context->workers_num, "We can't init the worker context for a generation twice");
367
368         mono_os_mutex_init (&context->finished_lock);
369
370         context->generation = generation;
371         context->workers_num = num_workers;
372         context->active_workers_num = num_workers;
373
374         context->workers_data = (WorkerData *)sgen_alloc_internal_dynamic (sizeof (WorkerData) * num_workers, INTERNAL_MEM_WORKER_DATA, TRUE);
375         memset (context->workers_data, 0, sizeof (WorkerData) * num_workers);
376
377         init_distribute_gray_queue (context);
378
379         for (i = 0; i < num_workers; ++i) {
380                 workers_data_ptrs [i] = &context->workers_data [i];
381                 context->workers_data [i].context = context;
382         }
383
384         context->thread_pool_context = sgen_thread_pool_create_context (num_workers, thread_pool_init_func, marker_idle_func, continue_idle_func, should_work_func, (void**)workers_data_ptrs);
385
386         if (!stat_inited) {
387                 mono_counters_register ("# workers finished", MONO_COUNTER_GC | MONO_COUNTER_ULONG, &stat_workers_num_finished);
388                 stat_inited = TRUE;
389         }
390 }
391
392 /* This is called with thread pool lock so no context switch can happen */
393 static gboolean
394 continue_idle_wait (int calling_context, int *threads_context)
395 {
396         WorkerContext *context;
397         int i;
398
399         if (worker_contexts [GENERATION_OLD].workers_num && calling_context == worker_contexts [GENERATION_OLD].thread_pool_context)
400                 context = &worker_contexts [GENERATION_OLD];
401         else if (worker_contexts [GENERATION_NURSERY].workers_num && calling_context == worker_contexts [GENERATION_NURSERY].thread_pool_context)
402                 context = &worker_contexts [GENERATION_NURSERY];
403         else
404                 g_assert_not_reached ();
405
406         /*
407          * We assume there are no pending jobs, since this is called only after
408          * we waited for all the jobs.
409          */
410         for (i = 0; i < context->active_workers_num; i++) {
411                 if (threads_context [i] == calling_context)
412                         return TRUE;
413         }
414
415         if (sgen_workers_have_idle_work (context->generation) && !context->forced_stop)
416                 return TRUE;
417
418         /*
419          * At this point there are no jobs to be done, and no objects to be scanned
420          * in the gray queues. We can simply asynchronously finish all the workers
421          * from the context that were not finished already (due to being stuck working
422          * in another context)
423          */
424
425         for (i = 0; i < context->active_workers_num; i++) {
426                 if (context->workers_data [i].state == STATE_WORK_ENQUEUED)
427                         set_state (&context->workers_data [i], STATE_WORK_ENQUEUED, STATE_WORKING);
428                 if (context->workers_data [i].state == STATE_WORKING)
429                         worker_try_finish (&context->workers_data [i]);
430         }
431
432         return FALSE;
433 }
434
435
436 void
437 sgen_workers_stop_all_workers (int generation)
438 {
439         WorkerContext *context = &worker_contexts [generation];
440
441         mono_os_mutex_lock (&context->finished_lock);
442         context->finish_callback = NULL;
443         mono_os_mutex_unlock (&context->finished_lock);
444
445         context->forced_stop = TRUE;
446
447         sgen_thread_pool_wait_for_all_jobs (context->thread_pool_context);
448         sgen_thread_pool_idle_wait (context->thread_pool_context, continue_idle_wait);
449         SGEN_ASSERT (0, !sgen_workers_are_working (context), "Can only signal enqueue work when in no work state");
450
451         context->started = FALSE;
452 }
453
454 void
455 sgen_workers_set_num_active_workers (int generation, int num_workers)
456 {
457         WorkerContext *context = &worker_contexts [generation];
458         if (num_workers) {
459                 SGEN_ASSERT (0, num_workers <= context->workers_num, "We can't start more workers than we initialized");
460                 context->active_workers_num = num_workers;
461         } else {
462                 context->active_workers_num = context->workers_num;
463         }
464 }
465
466 void
467 sgen_workers_start_all_workers (int generation, SgenObjectOperations *object_ops_nopar, SgenObjectOperations *object_ops_par, SgenWorkersFinishCallback callback)
468 {
469         WorkerContext *context = &worker_contexts [generation];
470         int i;
471         SGEN_ASSERT (0, !context->started, "Why are we starting to work without finishing previous cycle");
472
473         context->idle_func_object_ops_par = object_ops_par;
474         context->idle_func_object_ops_nopar = object_ops_nopar;
475         context->forced_stop = FALSE;
476         context->finish_callback = callback;
477         context->worker_awakenings = 0;
478         context->started = TRUE;
479
480         for (i = 0; i < context->active_workers_num; i++) {
481                 context->workers_data [i].major_scan_time = 0;
482                 context->workers_data [i].los_scan_time = 0;
483                 context->workers_data [i].total_time = 0;
484                 context->workers_data [i].last_start = 0;
485         }
486         mono_memory_write_barrier ();
487
488         /*
489          * We expect workers to start finishing only after all of them were awaken.
490          * Otherwise we might think that we have fewer workers and use wrong context.
491          */
492         mono_os_mutex_lock (&context->finished_lock);
493         sgen_workers_ensure_awake (context);
494         mono_os_mutex_unlock (&context->finished_lock);
495 }
496
497 void
498 sgen_workers_join (int generation)
499 {
500         WorkerContext *context = &worker_contexts [generation];
501         int i;
502
503         SGEN_ASSERT (0, !context->finish_callback, "Why are we joining concurrent mark early");
504
505         sgen_thread_pool_wait_for_all_jobs (context->thread_pool_context);
506         sgen_thread_pool_idle_wait (context->thread_pool_context, continue_idle_wait);
507         SGEN_ASSERT (0, !sgen_workers_are_working (context), "Can only signal enqueue work when in no work state");
508
509         /* At this point all the workers have stopped. */
510
511         SGEN_ASSERT (0, sgen_section_gray_queue_is_empty (&context->workers_distribute_gray_queue), "Why is there still work left to do?");
512         for (i = 0; i < context->active_workers_num; ++i)
513                 SGEN_ASSERT (0, sgen_gray_object_queue_is_empty (&context->workers_data [i].private_gray_queue), "Why is there still work left to do?");
514
515         context->started = FALSE;
516 }
517
518 /*
519  * Can only be called if the workers are not working in the
520  * context and there are no pending jobs.
521  */
522 gboolean
523 sgen_workers_have_idle_work (int generation)
524 {
525         WorkerContext *context = &worker_contexts [generation];
526         int i;
527
528         if (!sgen_section_gray_queue_is_empty (&context->workers_distribute_gray_queue))
529                 return TRUE;
530
531         for (i = 0; i < context->active_workers_num; ++i) {
532                 if (!sgen_gray_object_queue_is_empty (&context->workers_data [i].private_gray_queue))
533                         return TRUE;
534         }
535
536         return FALSE;
537 }
538
539 gboolean
540 sgen_workers_all_done (void)
541 {
542         if (worker_contexts [GENERATION_NURSERY].workers_num && sgen_workers_are_working (&worker_contexts [GENERATION_NURSERY]))
543                 return FALSE;
544         if (worker_contexts [GENERATION_OLD].workers_num && sgen_workers_are_working (&worker_contexts [GENERATION_OLD]))
545                 return FALSE;
546
547         return TRUE;
548 }
549
550 void
551 sgen_workers_assert_gray_queue_is_empty (int generation)
552 {
553         SGEN_ASSERT (0, sgen_section_gray_queue_is_empty (&worker_contexts [generation].workers_distribute_gray_queue), "Why is the workers gray queue not empty?");
554 }
555
556 void
557 sgen_workers_take_from_queue (int generation, SgenGrayQueue *queue)
558 {
559         WorkerContext *context = &worker_contexts [generation];
560
561         sgen_gray_object_spread (queue, sgen_workers_get_job_split_count (generation));
562
563         for (;;) {
564                 GrayQueueSection *section = sgen_gray_object_dequeue_section (queue);
565                 if (!section)
566                         break;
567                 sgen_section_gray_queue_enqueue (&context->workers_distribute_gray_queue, section);
568         }
569
570         SGEN_ASSERT (0, !sgen_workers_are_working (context), "We should fully populate the distribute gray queue before we start the workers");
571 }
572
573 SgenObjectOperations*
574 sgen_workers_get_idle_func_object_ops (WorkerData *worker)
575 {
576         g_assert (worker->context->idle_func_object_ops);
577         return worker->context->idle_func_object_ops;
578 }
579
580 /*
581  * If we have a single worker, splitting into multiple jobs makes no sense. With
582  * more than one worker, we split into a larger number of jobs so that, in case
583  * the work load is uneven, a worker that finished quickly can take up more jobs
584  * than another one.
585  *
586  * We also return 1 if there is no worker context for that generation.
587  */
588 int
589 sgen_workers_get_job_split_count (int generation)
590 {
591         return (worker_contexts [generation].active_workers_num > 1) ? worker_contexts [generation].active_workers_num * 4 : 1;
592 }
593
594 void
595 sgen_workers_foreach (int generation, SgenWorkerCallback callback)
596 {
597         WorkerContext *context = &worker_contexts [generation];
598         int i;
599
600         for (i = 0; i < context->workers_num; i++)
601                 callback (&context->workers_data [i]);
602 }
603
604 gboolean
605 sgen_workers_is_worker_thread (MonoNativeThreadId id)
606 {
607         return sgen_thread_pool_is_thread_pool_thread (id);
608 }
609
610 #endif