7eb3740cc15cc15bcec3866f2f750f2acec917fa
[mono.git] / mono / sgen / sgen-workers.c
1 /**
2  * \file
3  * Worker threads for parallel and concurrent GC.
4  *
5  * Copyright 2001-2003 Ximian, Inc
6  * Copyright 2003-2010 Novell, Inc.
7  * Copyright (C) 2012 Xamarin Inc
8  *
9  * Licensed under the MIT license. See LICENSE file in the project root for full license information.
10  */
11
12 #include "config.h"
13 #ifdef HAVE_SGEN_GC
14
15 #include <string.h>
16
17 #include "mono/sgen/sgen-gc.h"
18 #include "mono/sgen/sgen-workers.h"
19 #include "mono/sgen/sgen-thread-pool.h"
20 #include "mono/utils/mono-membar.h"
21 #include "mono/sgen/sgen-client.h"
22
23 static WorkerContext worker_contexts [GENERATION_MAX];
24
25 /*
26  * Allowed transitions:
27  *
28  * | from \ to          | NOT WORKING | WORKING | WORK ENQUEUED |
29  * |--------------------+-------------+---------+---------------+
30  * | NOT WORKING        | -           | -       | main / worker |
31  * | WORKING            | worker      | -       | main / worker |
32  * | WORK ENQUEUED      | -           | worker  | -             |
33  *
34  * The WORK ENQUEUED state guarantees that the worker thread will inspect the queue again at
35  * least once.  Only after looking at the queue will it go back to WORKING, and then,
36  * eventually, to NOT WORKING.  After enqueuing work the main thread transitions the state
37  * to WORK ENQUEUED.  Signalling the worker thread to wake up is only necessary if the old
38  * state was NOT WORKING.
39  */
40
41 enum {
42         STATE_NOT_WORKING,
43         STATE_WORKING,
44         STATE_WORK_ENQUEUED
45 };
46
47 #define SGEN_WORKER_MIN_SECTIONS_SIGNAL 4
48
49 static guint64 stat_workers_num_finished;
50
51 static gboolean
52 set_state (WorkerData *data, State old_state, State new_state)
53 {
54         SGEN_ASSERT (0, old_state != new_state, "Why are we transitioning to the same state?");
55         if (new_state == STATE_NOT_WORKING)
56                 SGEN_ASSERT (0, old_state == STATE_WORKING, "We can only transition to NOT WORKING from WORKING");
57         else if (new_state == STATE_WORKING)
58                 SGEN_ASSERT (0, old_state == STATE_WORK_ENQUEUED, "We can only transition to WORKING from WORK ENQUEUED");
59         if (new_state == STATE_NOT_WORKING || new_state == STATE_WORKING)
60                 SGEN_ASSERT (6, sgen_thread_pool_is_thread_pool_thread (mono_native_thread_id_get ()), "Only the worker thread is allowed to transition to NOT_WORKING or WORKING");
61
62         return InterlockedCompareExchange (&data->state, new_state, old_state) == old_state;
63 }
64
65 static gboolean
66 state_is_working_or_enqueued (State state)
67 {
68         return state == STATE_WORKING || state == STATE_WORK_ENQUEUED;
69 }
70
71 static void
72 sgen_workers_ensure_awake (WorkerContext *context)
73 {
74         int i;
75         gboolean need_signal = FALSE;
76
77         /*
78          * All workers are awaken, make sure we reset the parallel context.
79          * We call this function only when starting the workers so nobody is running,
80          * or when the last worker is enqueuing preclean work. In both cases we can't
81          * have a worker working using a nopar context, which means it is safe.
82          */
83         context->idle_func_object_ops = (context->active_workers_num > 1) ? context->idle_func_object_ops_par : context->idle_func_object_ops_nopar;
84         context->workers_finished = FALSE;
85
86         for (i = 0; i < context->active_workers_num; i++) {
87                 State old_state;
88                 gboolean did_set_state;
89
90                 do {
91                         old_state = context->workers_data [i].state;
92
93                         if (old_state == STATE_WORK_ENQUEUED)
94                                 break;
95
96                         did_set_state = set_state (&context->workers_data [i], old_state, STATE_WORK_ENQUEUED);
97                 } while (!did_set_state);
98
99                 if (!state_is_working_or_enqueued (old_state))
100                         need_signal = TRUE;
101         }
102
103         if (need_signal)
104                 sgen_thread_pool_idle_signal (context->thread_pool_context);
105 }
106
107 static void
108 worker_try_finish (WorkerData *data)
109 {
110         State old_state;
111         int i, working = 0;
112         WorkerContext *context = data->context;
113
114         ++stat_workers_num_finished;
115
116         mono_os_mutex_lock (&context->finished_lock);
117
118         for (i = 0; i < context->active_workers_num; i++) {
119                 if (state_is_working_or_enqueued (context->workers_data [i].state))
120                         working++;
121         }
122
123         if (working == 1) {
124                 SgenWorkersFinishCallback callback = context->finish_callback;
125                 SGEN_ASSERT (0, context->idle_func_object_ops == context->idle_func_object_ops_nopar, "Why are we finishing with parallel context");
126                 /* We are the last one left. Enqueue preclean job if we have one and awake everybody */
127                 SGEN_ASSERT (0, data->state != STATE_NOT_WORKING, "How did we get from doing idle work to NOT WORKING without setting it ourselves?");
128                 if (callback) {
129                         context->finish_callback = NULL;
130                         callback ();
131                         context->worker_awakenings = 0;
132                         /* Make sure each worker has a chance of seeing the enqueued jobs */
133                         sgen_workers_ensure_awake (context);
134                         SGEN_ASSERT (0, data->state == STATE_WORK_ENQUEUED, "Why did we fail to set our own state to ENQUEUED");
135                         goto work_available;
136                 }
137         }
138
139         do {
140                 old_state = data->state;
141
142                 SGEN_ASSERT (0, old_state != STATE_NOT_WORKING, "How did we get from doing idle work to NOT WORKING without setting it ourselves?");
143                 if (old_state == STATE_WORK_ENQUEUED)
144                         goto work_available;
145                 SGEN_ASSERT (0, old_state == STATE_WORKING, "What other possibility is there?");
146         } while (!set_state (data, old_state, STATE_NOT_WORKING));
147
148         /*
149          * If we are second to last to finish, we set the scan context to the non-parallel
150          * version so we can speed up the last worker. This helps us maintain same level
151          * of performance as non-parallel mode even if we fail to distribute work properly.
152          */
153         if (working == 2)
154                 context->idle_func_object_ops = context->idle_func_object_ops_nopar;
155
156         context->workers_finished = TRUE;
157         mono_os_mutex_unlock (&context->finished_lock);
158
159         binary_protocol_worker_finish (sgen_timestamp (), context->forced_stop);
160
161         sgen_gray_object_queue_trim_free_list (&data->private_gray_queue);
162         return;
163
164 work_available:
165         mono_os_mutex_unlock (&context->finished_lock);
166 }
167
168 void
169 sgen_workers_enqueue_job (int generation, SgenThreadPoolJob *job, gboolean enqueue)
170 {
171         if (!enqueue) {
172                 job->func (NULL, job);
173                 sgen_thread_pool_job_free (job);
174                 return;
175         }
176
177         sgen_thread_pool_job_enqueue (worker_contexts [generation].thread_pool_context, job);
178 }
179
180 static gboolean
181 workers_get_work (WorkerData *data)
182 {
183         SgenMajorCollector *major = sgen_get_major_collector ();
184         SgenMinorCollector *minor = sgen_get_minor_collector ();
185         GrayQueueSection *section;
186
187         g_assert (sgen_gray_object_queue_is_empty (&data->private_gray_queue));
188         g_assert (major->is_concurrent || minor->is_parallel);
189
190         section = sgen_section_gray_queue_dequeue (&data->context->workers_distribute_gray_queue);
191         if (section) {
192                 sgen_gray_object_enqueue_section (&data->private_gray_queue, section, major->is_parallel);
193                 return TRUE;
194         }
195
196         /* Nobody to steal from */
197         g_assert (sgen_gray_object_queue_is_empty (&data->private_gray_queue));
198         return FALSE;
199 }
200
201 static gboolean
202 workers_steal_work (WorkerData *data)
203 {
204         SgenMajorCollector *major = sgen_get_major_collector ();
205         SgenMinorCollector *minor = sgen_get_minor_collector ();
206         int generation = sgen_get_current_collection_generation ();
207         GrayQueueSection *section = NULL;
208         WorkerContext *context = data->context;
209         int i, current_worker;
210
211         if ((generation == GENERATION_OLD && !major->is_parallel) ||
212                         (generation == GENERATION_NURSERY && !minor->is_parallel))
213                 return FALSE;
214
215         /* If we're parallel, steal from other workers' private gray queues  */
216         g_assert (sgen_gray_object_queue_is_empty (&data->private_gray_queue));
217
218         current_worker = (int) (data - context->workers_data);
219
220         for (i = 1; i < context->active_workers_num && !section; i++) {
221                 int steal_worker = (current_worker + i) % context->active_workers_num;
222                 if (state_is_working_or_enqueued (context->workers_data [steal_worker].state))
223                         section = sgen_gray_object_steal_section (&context->workers_data [steal_worker].private_gray_queue);
224         }
225
226         if (section) {
227                 sgen_gray_object_enqueue_section (&data->private_gray_queue, section, TRUE);
228                 return TRUE;
229         }
230
231         /* Nobody to steal from */
232         g_assert (sgen_gray_object_queue_is_empty (&data->private_gray_queue));
233         return FALSE;
234 }
235
236 static void
237 concurrent_enqueue_check (GCObject *obj)
238 {
239         g_assert (sgen_concurrent_collection_in_progress ());
240         g_assert (!sgen_ptr_in_nursery (obj));
241         g_assert (SGEN_LOAD_VTABLE (obj));
242 }
243
244 static void
245 init_private_gray_queue (WorkerData *data)
246 {
247         sgen_gray_object_queue_init (&data->private_gray_queue,
248                         sgen_get_major_collector ()->is_concurrent ? concurrent_enqueue_check : NULL,
249                         FALSE);
250 }
251
252 static void
253 thread_pool_init_func (void *data_untyped)
254 {
255         WorkerData *data = (WorkerData *)data_untyped;
256         SgenMajorCollector *major = sgen_get_major_collector ();
257         SgenMinorCollector *minor = sgen_get_minor_collector ();
258
259         if (!major->is_concurrent && !minor->is_parallel)
260                 return;
261
262         init_private_gray_queue (data);
263
264         /* Separate WorkerData for same thread share free_block_lists */
265         if (major->is_parallel || minor->is_parallel)
266                 major->init_block_free_lists (&data->free_block_lists);
267 }
268
269 static gboolean
270 sgen_workers_are_working (WorkerContext *context)
271 {
272         int i;
273
274         for (i = 0; i < context->active_workers_num; i++) {
275                 if (state_is_working_or_enqueued (context->workers_data [i].state))
276                         return TRUE;
277         }
278         return FALSE;
279 }
280
281 static gboolean
282 continue_idle_func (void *data_untyped, int thread_pool_context)
283 {
284         if (data_untyped)
285                 return state_is_working_or_enqueued (((WorkerData*)data_untyped)->state);
286
287         /* Return if any of the threads is working in the context */
288         if (worker_contexts [GENERATION_NURSERY].workers_num && worker_contexts [GENERATION_NURSERY].thread_pool_context == thread_pool_context)
289                 return sgen_workers_are_working (&worker_contexts [GENERATION_NURSERY]);
290         if (worker_contexts [GENERATION_OLD].workers_num && worker_contexts [GENERATION_OLD].thread_pool_context == thread_pool_context)
291                 return sgen_workers_are_working (&worker_contexts [GENERATION_OLD]);
292
293         g_assert_not_reached ();
294         return FALSE;
295 }
296
297 static gboolean
298 should_work_func (void *data_untyped)
299 {
300         WorkerData *data = (WorkerData*)data_untyped;
301         WorkerContext *context = data->context;
302         int current_worker = (int) (data - context->workers_data);
303
304         return context->started && current_worker < context->active_workers_num && state_is_working_or_enqueued (data->state);
305 }
306
307 static void
308 marker_idle_func (void *data_untyped)
309 {
310         WorkerData *data = (WorkerData *)data_untyped;
311         WorkerContext *context = data->context;
312
313         SGEN_ASSERT (0, continue_idle_func (data_untyped, context->thread_pool_context), "Why are we called when we're not supposed to work?");
314
315         if (data->state == STATE_WORK_ENQUEUED) {
316                 set_state (data, STATE_WORK_ENQUEUED, STATE_WORKING);
317                 SGEN_ASSERT (0, data->state != STATE_NOT_WORKING, "How did we get from WORK ENQUEUED to NOT WORKING?");
318         }
319
320         if (!context->forced_stop && (!sgen_gray_object_queue_is_empty (&data->private_gray_queue) || workers_get_work (data) || workers_steal_work (data))) {
321                 ScanCopyContext ctx = CONTEXT_FROM_OBJECT_OPERATIONS (context->idle_func_object_ops, &data->private_gray_queue);
322
323                 SGEN_ASSERT (0, !sgen_gray_object_queue_is_empty (&data->private_gray_queue), "How is our gray queue empty if we just got work?");
324
325                 sgen_drain_gray_stack (ctx);
326
327                 if (data->private_gray_queue.num_sections >= SGEN_WORKER_MIN_SECTIONS_SIGNAL
328                                 && context->workers_finished && context->worker_awakenings < context->active_workers_num) {
329                         /* We bound the number of worker awakenings just to be sure */
330                         context->worker_awakenings++;
331                         mono_os_mutex_lock (&context->finished_lock);
332                         sgen_workers_ensure_awake (context);
333                         mono_os_mutex_unlock (&context->finished_lock);
334                 }
335         } else {
336                 worker_try_finish (data);
337         }
338 }
339
340 static void
341 init_distribute_gray_queue (WorkerContext *context)
342 {
343         sgen_section_gray_queue_init (&context->workers_distribute_gray_queue, TRUE,
344                         sgen_get_major_collector ()->is_concurrent ? concurrent_enqueue_check : NULL);
345 }
346
347 void
348 sgen_workers_create_context (int generation, int num_workers)
349 {
350         static gboolean stat_inited = FALSE;
351         int i;
352         WorkerData **workers_data_ptrs = (WorkerData**)sgen_alloc_internal_dynamic (num_workers * sizeof(WorkerData*), INTERNAL_MEM_WORKER_DATA, TRUE);
353         WorkerContext *context = &worker_contexts [generation];
354
355         SGEN_ASSERT (0, !context->workers_num, "We can't init the worker context for a generation twice");
356
357         mono_os_mutex_init (&context->finished_lock);
358
359         context->generation = generation;
360         context->workers_num = num_workers;
361         context->active_workers_num = num_workers;
362
363         context->workers_data = (WorkerData *)sgen_alloc_internal_dynamic (sizeof (WorkerData) * num_workers, INTERNAL_MEM_WORKER_DATA, TRUE);
364         memset (context->workers_data, 0, sizeof (WorkerData) * num_workers);
365
366         init_distribute_gray_queue (context);
367
368         for (i = 0; i < num_workers; ++i) {
369                 workers_data_ptrs [i] = &context->workers_data [i];
370                 context->workers_data [i].context = context;
371         }
372
373         context->thread_pool_context = sgen_thread_pool_create_context (num_workers, thread_pool_init_func, marker_idle_func, continue_idle_func, should_work_func, (void**)workers_data_ptrs);
374
375         if (!stat_inited) {
376                 mono_counters_register ("# workers finished", MONO_COUNTER_GC | MONO_COUNTER_ULONG, &stat_workers_num_finished);
377                 stat_inited = TRUE;
378         }
379 }
380
381 void
382 sgen_workers_stop_all_workers (int generation)
383 {
384         WorkerContext *context = &worker_contexts [generation];
385
386         mono_os_mutex_lock (&context->finished_lock);
387         context->finish_callback = NULL;
388         mono_os_mutex_unlock (&context->finished_lock);
389
390         context->forced_stop = TRUE;
391
392         sgen_thread_pool_wait_for_all_jobs (context->thread_pool_context);
393         sgen_thread_pool_idle_wait (context->thread_pool_context);
394         SGEN_ASSERT (0, !sgen_workers_are_working (context), "Can only signal enqueue work when in no work state");
395
396         context->started = FALSE;
397 }
398
399 void
400 sgen_workers_set_num_active_workers (int generation, int num_workers)
401 {
402         WorkerContext *context = &worker_contexts [generation];
403         if (num_workers) {
404                 SGEN_ASSERT (0, num_workers <= context->workers_num, "We can't start more workers than we initialized");
405                 context->active_workers_num = num_workers;
406         } else {
407                 context->active_workers_num = context->workers_num;
408         }
409 }
410
411 void
412 sgen_workers_start_all_workers (int generation, SgenObjectOperations *object_ops_nopar, SgenObjectOperations *object_ops_par, SgenWorkersFinishCallback callback)
413 {
414         WorkerContext *context = &worker_contexts [generation];
415         SGEN_ASSERT (0, !context->started, "Why are we starting to work without finishing previous cycle");
416
417         context->idle_func_object_ops_par = object_ops_par;
418         context->idle_func_object_ops_nopar = object_ops_nopar;
419         context->forced_stop = FALSE;
420         context->finish_callback = callback;
421         context->worker_awakenings = 0;
422         context->started = TRUE;
423         mono_memory_write_barrier ();
424
425         /*
426          * We expect workers to start finishing only after all of them were awaken.
427          * Otherwise we might think that we have fewer workers and use wrong context.
428          */
429         mono_os_mutex_lock (&context->finished_lock);
430         sgen_workers_ensure_awake (context);
431         mono_os_mutex_unlock (&context->finished_lock);
432 }
433
434 void
435 sgen_workers_join (int generation)
436 {
437         WorkerContext *context = &worker_contexts [generation];
438         int i;
439
440         SGEN_ASSERT (0, !context->finish_callback, "Why are we joining concurrent mark early");
441         /*
442          * It might be the case that a worker didn't get to run anything
443          * in this context, because it was stuck working on a long job
444          * in another context. In this case its state is active (WORK_ENQUEUED)
445          * and we need to wait for it to finish itself.
446          * FIXME Avoid having to wait for the worker to report its own finish.
447          */
448
449         sgen_thread_pool_wait_for_all_jobs (context->thread_pool_context);
450         sgen_thread_pool_idle_wait (context->thread_pool_context);
451         SGEN_ASSERT (0, !sgen_workers_are_working (context), "Can only signal enqueue work when in no work state");
452
453         /* At this point all the workers have stopped. */
454
455         SGEN_ASSERT (0, sgen_section_gray_queue_is_empty (&context->workers_distribute_gray_queue), "Why is there still work left to do?");
456         for (i = 0; i < context->active_workers_num; ++i)
457                 SGEN_ASSERT (0, sgen_gray_object_queue_is_empty (&context->workers_data [i].private_gray_queue), "Why is there still work left to do?");
458
459         context->started = FALSE;
460 }
461
462 /*
463  * Can only be called if the workers are stopped.
464  * If we're stopped, there are also no pending jobs.
465  */
466 gboolean
467 sgen_workers_have_idle_work (int generation)
468 {
469         WorkerContext *context = &worker_contexts [generation];
470         int i;
471
472         SGEN_ASSERT (0, context->forced_stop && !sgen_workers_are_working (context), "Checking for idle work should only happen if the workers are stopped.");
473
474         if (!sgen_section_gray_queue_is_empty (&context->workers_distribute_gray_queue))
475                 return TRUE;
476
477         for (i = 0; i < context->active_workers_num; ++i) {
478                 if (!sgen_gray_object_queue_is_empty (&context->workers_data [i].private_gray_queue))
479                         return TRUE;
480         }
481
482         return FALSE;
483 }
484
485 gboolean
486 sgen_workers_all_done (void)
487 {
488         if (worker_contexts [GENERATION_NURSERY].workers_num && sgen_workers_are_working (&worker_contexts [GENERATION_NURSERY]))
489                 return FALSE;
490         if (worker_contexts [GENERATION_OLD].workers_num && sgen_workers_are_working (&worker_contexts [GENERATION_OLD]))
491                 return FALSE;
492
493         return TRUE;
494 }
495
496 void
497 sgen_workers_assert_gray_queue_is_empty (int generation)
498 {
499         SGEN_ASSERT (0, sgen_section_gray_queue_is_empty (&worker_contexts [generation].workers_distribute_gray_queue), "Why is the workers gray queue not empty?");
500 }
501
502 void
503 sgen_workers_take_from_queue (int generation, SgenGrayQueue *queue)
504 {
505         WorkerContext *context = &worker_contexts [generation];
506
507         sgen_gray_object_spread (queue, sgen_workers_get_job_split_count (generation));
508
509         for (;;) {
510                 GrayQueueSection *section = sgen_gray_object_dequeue_section (queue);
511                 if (!section)
512                         break;
513                 sgen_section_gray_queue_enqueue (&context->workers_distribute_gray_queue, section);
514         }
515
516         SGEN_ASSERT (0, !sgen_workers_are_working (context), "We should fully populate the distribute gray queue before we start the workers");
517 }
518
519 SgenObjectOperations*
520 sgen_workers_get_idle_func_object_ops (WorkerData *worker)
521 {
522         g_assert (worker->context->idle_func_object_ops);
523         return worker->context->idle_func_object_ops;
524 }
525
526 /*
527  * If we have a single worker, splitting into multiple jobs makes no sense. With
528  * more than one worker, we split into a larger number of jobs so that, in case
529  * the work load is uneven, a worker that finished quickly can take up more jobs
530  * than another one.
531  *
532  * We also return 1 if there is no worker context for that generation.
533  */
534 int
535 sgen_workers_get_job_split_count (int generation)
536 {
537         return (worker_contexts [generation].active_workers_num > 1) ? worker_contexts [generation].active_workers_num * 4 : 1;
538 }
539
540 void
541 sgen_workers_foreach (int generation, SgenWorkerCallback callback)
542 {
543         WorkerContext *context = &worker_contexts [generation];
544         int i;
545
546         for (i = 0; i < context->workers_num; i++)
547                 callback (&context->workers_data [i]);
548 }
549
550 gboolean
551 sgen_workers_is_worker_thread (MonoNativeThreadId id)
552 {
553         return sgen_thread_pool_is_thread_pool_thread (id);
554 }
555
556 #endif