Merge pull request #615 from nealef/master
[mono.git] / mono / metadata / sgen-workers.c
1 /*
2  * sgen-workers.c: Worker threads for parallel and concurrent GC.
3  *
4  * Copyright 2001-2003 Ximian, Inc
5  * Copyright 2003-2010 Novell, Inc.
6  * Copyright (C) 2012 Xamarin Inc
7  *
8  * This library is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Library General Public
10  * License 2.0 as published by the Free Software Foundation;
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License 2.0 along with this library; if not, write to the Free
19  * Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22 #include "config.h"
23 #ifdef HAVE_SGEN_GC
24
25 #include "metadata/sgen-gc.h"
26 #include "metadata/sgen-workers.h"
27 #include "utils/mono-counters.h"
28
29 static int workers_num;
30 static WorkerData *workers_data;
31 static void *workers_gc_thread_major_collector_data = NULL;
32
33 static SgenSectionGrayQueue workers_distribute_gray_queue;
34 static gboolean workers_distribute_gray_queue_inited;
35
36 static volatile gboolean workers_marking = FALSE;
37 static gboolean workers_started = FALSE;
38
39 typedef union {
40         gint32 value;
41         struct {
42                 /*
43                  * Decremented by the main thread and incremented by
44                  * worker threads.
45                  */
46                 guint32 num_waiting : 8;
47                 /* Set by worker threads and reset by the main thread. */
48                 guint32 done_posted : 1;
49                 /* Set by the main thread. */
50                 guint32 gc_in_progress : 1;
51         } data;
52 } State;
53
54 static volatile State workers_state;
55
56 static MonoSemType workers_waiting_sem;
57 static MonoSemType workers_done_sem;
58
59 static volatile int workers_job_queue_num_entries = 0;
60 static volatile JobQueueEntry *workers_job_queue = NULL;
61 static LOCK_DECLARE (workers_job_queue_mutex);
62 static int workers_num_jobs_enqueued = 0;
63 static volatile int workers_num_jobs_finished = 0;
64
65 static long long stat_workers_stolen_from_self_lock;
66 static long long stat_workers_stolen_from_self_no_lock;
67 static long long stat_workers_stolen_from_others;
68 static long long stat_workers_num_waited;
69
70 static gboolean
71 set_state (State old_state, State new_state)
72 {
73         return InterlockedCompareExchange (&workers_state.value,
74                         new_state.value, old_state.value) == old_state.value;
75 }
76
77 static void
78 workers_wake_up (int max)
79 {
80         int i;
81
82         for (i = 0; i < max; ++i) {
83                 State old_state, new_state;
84                 do {
85                         old_state = new_state = workers_state;
86                         /*
87                          * We must not wake workers up once done has
88                          * been posted.
89                          */
90                         if (old_state.data.done_posted)
91                                 return;
92                         if (old_state.data.num_waiting == 0)
93                                 return;
94                         --new_state.data.num_waiting;
95                 } while (!set_state (old_state, new_state));
96                 MONO_SEM_POST (&workers_waiting_sem);
97         }
98 }
99
100 static void
101 workers_wake_up_all (void)
102 {
103         workers_wake_up (workers_num);
104 }
105
106 void
107 sgen_workers_wake_up_all (void)
108 {
109         g_assert (workers_state.data.gc_in_progress);
110         workers_wake_up_all ();
111 }
112
113 static void
114 workers_wait (void)
115 {
116         State old_state, new_state;
117         ++stat_workers_num_waited;
118         do {
119                 old_state = new_state = workers_state;
120                 /*
121                  * Only the last worker thread awake can set the done
122                  * posted flag, and since we're awake and haven't set
123                  * it yet, it cannot be set.
124                  */
125                 g_assert (!old_state.data.done_posted);
126                 ++new_state.data.num_waiting;
127                 /*
128                  * This is the only place where we use
129                  * workers_gc_in_progress in the worker threads.
130                  */
131                 if (new_state.data.num_waiting == workers_num && !old_state.data.gc_in_progress)
132                         new_state.data.done_posted = 1;
133         } while (!set_state (old_state, new_state));
134         mono_memory_barrier ();
135         if (new_state.data.done_posted)
136                 MONO_SEM_POST (&workers_done_sem);
137         MONO_SEM_WAIT (&workers_waiting_sem);
138 }
139
140 static gboolean
141 collection_needs_workers (void)
142 {
143         return sgen_collection_is_parallel () || sgen_collection_is_concurrent ();
144 }
145
146 void
147 sgen_workers_enqueue_job (JobFunc func, void *data)
148 {
149         int num_entries;
150         JobQueueEntry *entry;
151
152         if (!collection_needs_workers ()) {
153                 func (NULL, data);
154                 return;
155         }
156
157         g_assert (workers_state.data.gc_in_progress);
158
159         entry = sgen_alloc_internal (INTERNAL_MEM_JOB_QUEUE_ENTRY);
160         entry->func = func;
161         entry->data = data;
162
163         mono_mutex_lock (&workers_job_queue_mutex);
164         entry->next = workers_job_queue;
165         workers_job_queue = entry;
166         num_entries = ++workers_job_queue_num_entries;
167         ++workers_num_jobs_enqueued;
168         mono_mutex_unlock (&workers_job_queue_mutex);
169
170         workers_wake_up (num_entries);
171 }
172
173 void
174 sgen_workers_wait_for_jobs (void)
175 {
176         // FIXME: implement this properly
177         while (workers_num_jobs_finished < workers_num_jobs_enqueued) {
178                 State state = workers_state;
179                 g_assert (state.data.gc_in_progress);
180                 g_assert (!state.data.done_posted);
181                 if (state.data.num_waiting == workers_num)
182                         workers_wake_up_all ();
183                 g_usleep (1000);
184         }
185 }
186
187 static gboolean
188 workers_dequeue_and_do_job (WorkerData *data)
189 {
190         JobQueueEntry *entry;
191
192         /*
193          * At this point the GC might not be running anymore.  We
194          * could have been woken up by a job that was then taken by
195          * another thread, after which the collection finished, so we
196          * first have to successfully dequeue a job before doing
197          * anything assuming that the collection is still ongoing.
198          */
199
200         if (!workers_job_queue_num_entries)
201                 return FALSE;
202
203         mono_mutex_lock (&workers_job_queue_mutex);
204         entry = (JobQueueEntry*)workers_job_queue;
205         if (entry) {
206                 workers_job_queue = entry->next;
207                 --workers_job_queue_num_entries;
208         }
209         mono_mutex_unlock (&workers_job_queue_mutex);
210
211         if (!entry)
212                 return FALSE;
213
214         g_assert (collection_needs_workers ());
215
216         entry->func (data, entry->data);
217         sgen_free_internal (entry, INTERNAL_MEM_JOB_QUEUE_ENTRY);
218
219         SGEN_ATOMIC_ADD (workers_num_jobs_finished, 1);
220
221         return TRUE;
222 }
223
224 static gboolean
225 workers_steal (WorkerData *data, WorkerData *victim_data, gboolean lock)
226 {
227         SgenGrayQueue *queue = &data->private_gray_queue;
228         int num, n;
229
230         g_assert (!queue->first);
231
232         if (!victim_data->stealable_stack_fill)
233                 return FALSE;
234
235         if (lock && mono_mutex_trylock (&victim_data->stealable_stack_mutex))
236                 return FALSE;
237
238         n = num = (victim_data->stealable_stack_fill + 1) / 2;
239         /* We're stealing num entries. */
240
241         while (n > 0) {
242                 int m = MIN (SGEN_GRAY_QUEUE_SECTION_SIZE, n);
243                 n -= m;
244
245                 sgen_gray_object_alloc_queue_section (queue);
246                 memcpy (queue->first->objects,
247                                 victim_data->stealable_stack + victim_data->stealable_stack_fill - num + n,
248                                 sizeof (char*) * m);
249                 queue->first->end = m;
250         }
251
252         victim_data->stealable_stack_fill -= num;
253
254         if (lock)
255                 mono_mutex_unlock (&victim_data->stealable_stack_mutex);
256
257         if (data == victim_data) {
258                 if (lock)
259                         stat_workers_stolen_from_self_lock += num;
260                 else
261                         stat_workers_stolen_from_self_no_lock += num;
262         } else {
263                 stat_workers_stolen_from_others += num;
264         }
265
266         return num != 0;
267 }
268
269 static gboolean
270 workers_get_work (WorkerData *data)
271 {
272         SgenMajorCollector *major;
273         int i;
274
275         g_assert (sgen_gray_object_queue_is_empty (&data->private_gray_queue));
276
277         /* Try to steal from our own stack. */
278         if (workers_steal (data, data, TRUE))
279                 return TRUE;
280
281         /* From another worker. */
282         for (i = 0; i < workers_num; ++i) {
283                 WorkerData *victim_data = &workers_data [i];
284                 if (data == victim_data)
285                         continue;
286                 if (workers_steal (data, victim_data, TRUE))
287                         return TRUE;
288         }
289
290         /*
291          * If we're concurrent or parallel, from the workers
292          * distribute gray queue.
293          */
294         major = sgen_get_major_collector ();
295         if (major->is_concurrent || major->is_parallel) {
296                 GrayQueueSection *section = sgen_section_gray_queue_dequeue (&workers_distribute_gray_queue);
297                 if (section) {
298                         sgen_gray_object_enqueue_section (&data->private_gray_queue, section);
299                         return TRUE;
300                 }
301         }
302
303         /* Nobody to steal from */
304         g_assert (sgen_gray_object_queue_is_empty (&data->private_gray_queue));
305         return FALSE;
306 }
307
308 static void
309 workers_gray_queue_share_redirect (SgenGrayQueue *queue)
310 {
311         GrayQueueSection *section;
312         WorkerData *data = queue->alloc_prepare_data;
313
314         if (data->stealable_stack_fill) {
315                 /*
316                  * There are still objects in the stealable stack, so
317                  * wake up any workers that might be sleeping
318                  */
319                 if (workers_state.data.gc_in_progress)
320                         workers_wake_up_all ();
321                 return;
322         }
323
324         /* The stealable stack is empty, so fill it. */
325         mono_mutex_lock (&data->stealable_stack_mutex);
326
327         while (data->stealable_stack_fill < STEALABLE_STACK_SIZE &&
328                         (section = sgen_gray_object_dequeue_section (queue))) {
329                 int num = MIN (section->end, STEALABLE_STACK_SIZE - data->stealable_stack_fill);
330
331                 memcpy (data->stealable_stack + data->stealable_stack_fill,
332                                 section->objects + section->end - num,
333                                 sizeof (char*) * num);
334
335                 section->end -= num;
336                 data->stealable_stack_fill += num;
337
338                 if (section->end)
339                         sgen_gray_object_enqueue_section (queue, section);
340                 else
341                         sgen_gray_object_free_queue_section (section);
342         }
343
344         if (sgen_gray_object_queue_is_empty (queue))
345                 workers_steal (data, data, FALSE);
346
347         mono_mutex_unlock (&data->stealable_stack_mutex);
348
349         if (workers_state.data.gc_in_progress)
350                 workers_wake_up_all ();
351 }
352
353 static void
354 concurrent_enqueue_check (char *obj)
355 {
356         g_assert (sgen_concurrent_collection_in_progress ());
357         g_assert (!sgen_ptr_in_nursery (obj));
358         g_assert (SGEN_LOAD_VTABLE (obj));
359 }
360
361 static void
362 init_private_gray_queue (WorkerData *data)
363 {
364         sgen_gray_object_queue_init_with_alloc_prepare (&data->private_gray_queue,
365                         sgen_get_major_collector ()->is_concurrent ? concurrent_enqueue_check : NULL,
366                         workers_gray_queue_share_redirect, data);
367 }
368
369 static mono_native_thread_return_t
370 workers_thread_func (void *data_untyped)
371 {
372         WorkerData *data = data_untyped;
373         SgenMajorCollector *major = sgen_get_major_collector ();
374
375         mono_thread_info_register_small_id ();
376
377         if (major->init_worker_thread)
378                 major->init_worker_thread (data->major_collector_data);
379
380         init_private_gray_queue (data);
381
382         for (;;) {
383                 gboolean did_work = FALSE;
384
385                 while (workers_dequeue_and_do_job (data)) {
386                         did_work = TRUE;
387                         /* FIXME: maybe distribute the gray queue here? */
388                 }
389
390                 if (workers_marking && (!sgen_gray_object_queue_is_empty (&data->private_gray_queue) || workers_get_work (data))) {
391                         SgenObjectOperations *ops = sgen_concurrent_collection_in_progress ()
392                                 ? &major->major_concurrent_ops
393                                 : &major->major_ops;
394                         ScanCopyContext ctx = { ops->scan_object, NULL, &data->private_gray_queue };
395
396                         g_assert (!sgen_gray_object_queue_is_empty (&data->private_gray_queue));
397
398                         while (!sgen_drain_gray_stack (32, ctx))
399                                 workers_gray_queue_share_redirect (&data->private_gray_queue);
400                         g_assert (sgen_gray_object_queue_is_empty (&data->private_gray_queue));
401
402                         init_private_gray_queue (data);
403
404                         did_work = TRUE;
405                 }
406
407                 if (!did_work)
408                         workers_wait ();
409         }
410
411         /* dummy return to make compilers happy */
412         return NULL;
413 }
414
415 static void
416 init_distribute_gray_queue (gboolean locked)
417 {
418         if (workers_distribute_gray_queue_inited) {
419                 g_assert (sgen_section_gray_queue_is_empty (&workers_distribute_gray_queue));
420                 g_assert (!workers_distribute_gray_queue.locked == !locked);
421                 return;
422         }
423
424         sgen_section_gray_queue_init (&workers_distribute_gray_queue, locked,
425                         sgen_get_major_collector ()->is_concurrent ? concurrent_enqueue_check : NULL);
426         workers_distribute_gray_queue_inited = TRUE;
427 }
428
429 void
430 sgen_workers_init_distribute_gray_queue (void)
431 {
432         if (!collection_needs_workers ())
433                 return;
434
435         init_distribute_gray_queue (sgen_get_major_collector ()->is_concurrent || sgen_get_major_collector ()->is_parallel);
436 }
437
438 void
439 sgen_workers_init (int num_workers)
440 {
441         int i;
442
443         if (!sgen_get_major_collector ()->is_parallel && !sgen_get_major_collector ()->is_concurrent)
444                 return;
445
446         //g_print ("initing %d workers\n", num_workers);
447
448         workers_num = num_workers;
449
450         workers_data = sgen_alloc_internal_dynamic (sizeof (WorkerData) * num_workers, INTERNAL_MEM_WORKER_DATA, TRUE);
451         memset (workers_data, 0, sizeof (WorkerData) * num_workers);
452
453         MONO_SEM_INIT (&workers_waiting_sem, 0);
454         MONO_SEM_INIT (&workers_done_sem, 0);
455
456         init_distribute_gray_queue (sgen_get_major_collector ()->is_concurrent || sgen_get_major_collector ()->is_parallel);
457
458         if (sgen_get_major_collector ()->alloc_worker_data)
459                 workers_gc_thread_major_collector_data = sgen_get_major_collector ()->alloc_worker_data ();
460
461         for (i = 0; i < workers_num; ++i) {
462                 /* private gray queue is inited by the thread itself */
463                 mono_mutex_init (&workers_data [i].stealable_stack_mutex);
464                 workers_data [i].stealable_stack_fill = 0;
465
466                 if (sgen_get_major_collector ()->alloc_worker_data)
467                         workers_data [i].major_collector_data = sgen_get_major_collector ()->alloc_worker_data ();
468         }
469
470         LOCK_INIT (workers_job_queue_mutex);
471
472         sgen_register_fixed_internal_mem_type (INTERNAL_MEM_JOB_QUEUE_ENTRY, sizeof (JobQueueEntry));
473
474         mono_counters_register ("Stolen from self lock", MONO_COUNTER_GC | MONO_COUNTER_LONG, &stat_workers_stolen_from_self_lock);
475         mono_counters_register ("Stolen from self no lock", MONO_COUNTER_GC | MONO_COUNTER_LONG, &stat_workers_stolen_from_self_no_lock);
476         mono_counters_register ("Stolen from others", MONO_COUNTER_GC | MONO_COUNTER_LONG, &stat_workers_stolen_from_others);
477         mono_counters_register ("# workers waited", MONO_COUNTER_GC | MONO_COUNTER_LONG, &stat_workers_num_waited);
478 }
479
480 /* only the GC thread is allowed to start and join workers */
481
482 static void
483 workers_start_worker (int index)
484 {
485         g_assert (index >= 0 && index < workers_num);
486
487         g_assert (!workers_data [index].thread);
488         mono_native_thread_create (&workers_data [index].thread, workers_thread_func, &workers_data [index]);
489 }
490
491 void
492 sgen_workers_start_all_workers (void)
493 {
494         State old_state, new_state;
495         int i;
496
497         if (!collection_needs_workers ())
498                 return;
499
500         if (sgen_get_major_collector ()->init_worker_thread)
501                 sgen_get_major_collector ()->init_worker_thread (workers_gc_thread_major_collector_data);
502
503         old_state = new_state = workers_state;
504         g_assert (!old_state.data.gc_in_progress);
505         new_state.data.gc_in_progress = TRUE;
506
507         workers_marking = FALSE;
508
509         g_assert (workers_job_queue_num_entries == 0);
510         workers_num_jobs_enqueued = 0;
511         workers_num_jobs_finished = 0;
512
513         if (workers_started) {
514                 g_assert (old_state.data.done_posted);
515                 if (old_state.data.num_waiting != workers_num) {
516                         g_error ("Expecting all %d sgen workers to be parked, but only %d are",
517                                         workers_num, old_state.data.num_waiting);
518                 }
519
520                 /* Clear the done posted flag */
521                 new_state.data.done_posted = 0;
522                 if (!set_state (old_state, new_state))
523                         g_assert_not_reached ();
524
525                 workers_wake_up_all ();
526                 return;
527         }
528
529         g_assert (!old_state.data.done_posted);
530
531         if (!set_state (old_state, new_state))
532                 g_assert_not_reached ();
533
534         for (i = 0; i < workers_num; ++i)
535                 workers_start_worker (i);
536
537         workers_started = TRUE;
538 }
539
540 gboolean
541 sgen_workers_have_started (void)
542 {
543         return workers_state.data.gc_in_progress;
544 }
545
546 void
547 sgen_workers_start_marking (void)
548 {
549         if (!collection_needs_workers ())
550                 return;
551
552         g_assert (workers_started && workers_state.data.gc_in_progress);
553         g_assert (!workers_marking);
554
555         workers_marking = TRUE;
556
557         workers_wake_up_all ();
558 }
559
560 void
561 sgen_workers_join (void)
562 {
563         State old_state, new_state;
564         int i;
565
566         if (!collection_needs_workers ())
567                 return;
568
569         do {
570                 old_state = new_state = workers_state;
571                 g_assert (old_state.data.gc_in_progress);
572                 g_assert (!old_state.data.done_posted);
573
574                 new_state.data.gc_in_progress = 0;
575         } while (!set_state (old_state, new_state));
576
577         if (new_state.data.num_waiting == workers_num) {
578                 /*
579                  * All the workers have shut down but haven't posted
580                  * the done semaphore yet, or, if we come from below,
581                  * haven't done all their work yet.
582                  *
583                  * It's not a big deal to wake them up again - they'll
584                  * just do one iteration of their loop trying to find
585                  * something to do and then go back to waiting again.
586                  */
587         reawaken:
588                 workers_wake_up_all ();
589         }
590         MONO_SEM_WAIT (&workers_done_sem);
591
592         old_state = new_state = workers_state;
593         g_assert (old_state.data.num_waiting == workers_num);
594         g_assert (old_state.data.done_posted);
595
596         if (workers_job_queue_num_entries || !sgen_section_gray_queue_is_empty (&workers_distribute_gray_queue)) {
597                 /*
598                  * There's a small race condition that we avoid here.
599                  * It's possible that a worker thread runs out of
600                  * things to do, so it goes to sleep.  Right at that
601                  * moment a new job is enqueued, but the thread is
602                  * still registered as running.  Now the threads are
603                  * joined, and we wait for the semaphore.  Only at
604                  * this point does the worker go to sleep, and posts
605                  * the semaphore, because workers_gc_in_progress is
606                  * already FALSE.  The job is still in the queue,
607                  * though.
608                  *
609                  * Clear the done posted flag.
610                  */
611                 new_state.data.done_posted = 0;
612                 if (!set_state (old_state, new_state))
613                         g_assert_not_reached ();
614                 goto reawaken;
615         }
616
617         /* At this point all the workers have stopped. */
618
619         workers_marking = FALSE;
620
621         if (sgen_get_major_collector ()->reset_worker_data) {
622                 for (i = 0; i < workers_num; ++i)
623                         sgen_get_major_collector ()->reset_worker_data (workers_data [i].major_collector_data);
624         }
625
626         g_assert (workers_job_queue_num_entries == 0);
627         g_assert (sgen_section_gray_queue_is_empty (&workers_distribute_gray_queue));
628         for (i = 0; i < workers_num; ++i) {
629                 g_assert (!workers_data [i].stealable_stack_fill);
630                 g_assert (sgen_gray_object_queue_is_empty (&workers_data [i].private_gray_queue));
631         }
632 }
633
634 gboolean
635 sgen_workers_all_done (void)
636 {
637         State state = workers_state;
638         /*
639          * Can only be called while the collection is still in
640          * progress, i.e., before done has been posted.
641          */
642         g_assert (state.data.gc_in_progress);
643         g_assert (!state.data.done_posted);
644         return state.data.num_waiting == workers_num;
645 }
646
647 gboolean
648 sgen_is_worker_thread (MonoNativeThreadId thread)
649 {
650         int i;
651
652         if (sgen_get_major_collector ()->is_worker_thread && sgen_get_major_collector ()->is_worker_thread (thread))
653                 return TRUE;
654
655         for (i = 0; i < workers_num; ++i) {
656                 if (workers_data [i].thread == thread)
657                         return TRUE;
658         }
659         return FALSE;
660 }
661
662 SgenSectionGrayQueue*
663 sgen_workers_get_distribute_section_gray_queue (void)
664 {
665         return &workers_distribute_gray_queue;
666 }
667
668 void
669 sgen_workers_reset_data (void)
670 {
671         if (sgen_get_major_collector ()->reset_worker_data)
672                 sgen_get_major_collector ()->reset_worker_data (workers_gc_thread_major_collector_data);
673 }
674 #endif