Merge pull request #1066 from esdrubal/bug19313
[mono.git] / mono / metadata / sgen-workers.c
1 /*
2  * sgen-workers.c: Worker threads for parallel and concurrent GC.
3  *
4  * Copyright 2001-2003 Ximian, Inc
5  * Copyright 2003-2010 Novell, Inc.
6  * Copyright (C) 2012 Xamarin Inc
7  *
8  * This library is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Library General Public
10  * License 2.0 as published by the Free Software Foundation;
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License 2.0 along with this library; if not, write to the Free
19  * Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22 #include "config.h"
23 #ifdef HAVE_SGEN_GC
24
25 #include "metadata/sgen-gc.h"
26 #include "metadata/sgen-workers.h"
27 #include "utils/mono-counters.h"
28
29 static int workers_num;
30 static WorkerData *workers_data;
31 static void *workers_gc_thread_major_collector_data = NULL;
32
33 static SgenSectionGrayQueue workers_distribute_gray_queue;
34 static gboolean workers_distribute_gray_queue_inited;
35
36 static volatile gboolean workers_marking = FALSE;
37 static gboolean workers_started = FALSE;
38
39 typedef union {
40         gint32 value;
41         struct {
42                 /*
43                  * Decremented by the main thread and incremented by
44                  * worker threads.
45                  */
46                 guint32 num_waiting : 8;
47                 /* Set by worker threads and reset by the main thread. */
48                 guint32 done_posted : 1;
49                 /* Set by the main thread. */
50                 guint32 gc_in_progress : 1;
51         } data;
52 } State;
53
54 static volatile State workers_state;
55
56 static MonoSemType workers_waiting_sem;
57 static MonoSemType workers_done_sem;
58
59 static volatile int workers_job_queue_num_entries = 0;
60 static volatile JobQueueEntry *workers_job_queue = NULL;
61 static LOCK_DECLARE (workers_job_queue_mutex);
62 static int workers_num_jobs_enqueued = 0;
63 static volatile int workers_num_jobs_finished = 0;
64
65 static long long stat_workers_stolen_from_self_lock;
66 static long long stat_workers_stolen_from_self_no_lock;
67 static long long stat_workers_stolen_from_others;
68 static long long stat_workers_num_waited;
69
70 static gboolean
71 set_state (State old_state, State new_state)
72 {
73         return InterlockedCompareExchange (&workers_state.value,
74                         new_state.value, old_state.value) == old_state.value;
75 }
76
77 static void
78 workers_wake_up (int max)
79 {
80         int i;
81
82         for (i = 0; i < max; ++i) {
83                 State old_state, new_state;
84                 do {
85                         old_state = new_state = workers_state;
86                         /*
87                          * We must not wake workers up once done has
88                          * been posted.
89                          */
90                         if (old_state.data.done_posted)
91                                 return;
92                         if (old_state.data.num_waiting == 0)
93                                 return;
94                         --new_state.data.num_waiting;
95                 } while (!set_state (old_state, new_state));
96                 MONO_SEM_POST (&workers_waiting_sem);
97         }
98 }
99
100 static void
101 workers_wake_up_all (void)
102 {
103         workers_wake_up (workers_num);
104 }
105
106 void
107 sgen_workers_wake_up_all (void)
108 {
109         g_assert (workers_state.data.gc_in_progress);
110         workers_wake_up_all ();
111 }
112
113 static void
114 workers_wait (void)
115 {
116         State old_state, new_state;
117         ++stat_workers_num_waited;
118         do {
119                 old_state = new_state = workers_state;
120                 /*
121                  * Only the last worker thread awake can set the done
122                  * posted flag, and since we're awake and haven't set
123                  * it yet, it cannot be set.
124                  */
125                 g_assert (!old_state.data.done_posted);
126                 ++new_state.data.num_waiting;
127                 /*
128                  * This is the only place where we use
129                  * workers_gc_in_progress in the worker threads.
130                  */
131                 if (new_state.data.num_waiting == workers_num && !old_state.data.gc_in_progress)
132                         new_state.data.done_posted = 1;
133         } while (!set_state (old_state, new_state));
134         mono_memory_barrier ();
135         if (new_state.data.done_posted)
136                 MONO_SEM_POST (&workers_done_sem);
137         MONO_SEM_WAIT (&workers_waiting_sem);
138 }
139
140 static gboolean
141 collection_needs_workers (void)
142 {
143         return sgen_collection_is_parallel () || sgen_collection_is_concurrent ();
144 }
145
146 void
147 sgen_workers_enqueue_job (JobFunc func, void *data)
148 {
149         int num_entries;
150         JobQueueEntry *entry;
151
152         if (!collection_needs_workers ()) {
153                 func (NULL, data);
154                 return;
155         }
156
157         g_assert (workers_state.data.gc_in_progress);
158
159         entry = sgen_alloc_internal (INTERNAL_MEM_JOB_QUEUE_ENTRY);
160         entry->func = func;
161         entry->data = data;
162
163         mono_mutex_lock (&workers_job_queue_mutex);
164         entry->next = workers_job_queue;
165         workers_job_queue = entry;
166         num_entries = ++workers_job_queue_num_entries;
167         ++workers_num_jobs_enqueued;
168         mono_mutex_unlock (&workers_job_queue_mutex);
169
170         workers_wake_up (num_entries);
171 }
172
173 void
174 sgen_workers_wait_for_jobs (void)
175 {
176         // FIXME: implement this properly
177         while (workers_num_jobs_finished < workers_num_jobs_enqueued) {
178                 State state = workers_state;
179                 g_assert (state.data.gc_in_progress);
180                 g_assert (!state.data.done_posted);
181                 if (state.data.num_waiting == workers_num)
182                         workers_wake_up_all ();
183                 g_usleep (1000);
184         }
185 }
186
187 static gboolean
188 workers_dequeue_and_do_job (WorkerData *data)
189 {
190         JobQueueEntry *entry;
191
192         /*
193          * At this point the GC might not be running anymore.  We
194          * could have been woken up by a job that was then taken by
195          * another thread, after which the collection finished, so we
196          * first have to successfully dequeue a job before doing
197          * anything assuming that the collection is still ongoing.
198          */
199
200         if (!workers_job_queue_num_entries)
201                 return FALSE;
202
203         mono_mutex_lock (&workers_job_queue_mutex);
204         entry = (JobQueueEntry*)workers_job_queue;
205         if (entry) {
206                 workers_job_queue = entry->next;
207                 --workers_job_queue_num_entries;
208         }
209         mono_mutex_unlock (&workers_job_queue_mutex);
210
211         if (!entry)
212                 return FALSE;
213
214         g_assert (collection_needs_workers ());
215
216         entry->func (data, entry->data);
217         sgen_free_internal (entry, INTERNAL_MEM_JOB_QUEUE_ENTRY);
218
219         SGEN_ATOMIC_ADD (workers_num_jobs_finished, 1);
220
221         return TRUE;
222 }
223
224 static gboolean
225 workers_steal (WorkerData *data, WorkerData *victim_data, gboolean lock)
226 {
227         SgenGrayQueue *queue = &data->private_gray_queue;
228         int num, n;
229
230         g_assert (!queue->first);
231
232         if (!victim_data->stealable_stack_fill)
233                 return FALSE;
234
235         if (lock && mono_mutex_trylock (&victim_data->stealable_stack_mutex))
236                 return FALSE;
237
238         n = num = (victim_data->stealable_stack_fill + 1) / 2;
239         /* We're stealing num entries. */
240
241         while (n > 0) {
242                 int m = MIN (SGEN_GRAY_QUEUE_SECTION_SIZE, n);
243                 n -= m;
244
245                 sgen_gray_object_alloc_queue_section (queue);
246                 memcpy (queue->first->objects,
247                                 victim_data->stealable_stack + victim_data->stealable_stack_fill - num + n,
248                                 sizeof (char*) * m);
249                 queue->first->size = m;
250
251                 /*
252                  * DO NOT move outside this loop
253                  * Doing so trigger "assert not reached" in sgen-scan-object.h : we use the queue->cursor
254                  * to compute the size of the first section during section allocation (via alloc_prepare_func
255                  * -> workers_gray_queue_share_redirect -> sgen_gray_object_dequeue_section) which will be then
256                  * set to 0, because queue->cursor is still pointing to queue->first->objects [-1], thus
257                  * losing objects in the gray queue.
258                  */
259                 queue->cursor = (char**)queue->first->objects + queue->first->size - 1;
260         }
261
262         victim_data->stealable_stack_fill -= num;
263
264         if (lock)
265                 mono_mutex_unlock (&victim_data->stealable_stack_mutex);
266
267         if (data == victim_data) {
268                 if (lock)
269                         stat_workers_stolen_from_self_lock += num;
270                 else
271                         stat_workers_stolen_from_self_no_lock += num;
272         } else {
273                 stat_workers_stolen_from_others += num;
274         }
275
276         return num != 0;
277 }
278
279 static gboolean
280 workers_get_work (WorkerData *data)
281 {
282         SgenMajorCollector *major;
283         int i;
284
285         g_assert (sgen_gray_object_queue_is_empty (&data->private_gray_queue));
286
287         /* Try to steal from our own stack. */
288         if (workers_steal (data, data, TRUE))
289                 return TRUE;
290
291         /* From another worker. */
292         for (i = data->index + 1; i < workers_num + data->index; ++i) {
293                 WorkerData *victim_data = &workers_data [i % workers_num];
294                 g_assert (data != victim_data);
295                 if (workers_steal (data, victim_data, TRUE))
296                         return TRUE;
297         }
298
299         /*
300          * If we're concurrent or parallel, from the workers
301          * distribute gray queue.
302          */
303         major = sgen_get_major_collector ();
304         if (major->is_concurrent || major->is_parallel) {
305                 GrayQueueSection *section = sgen_section_gray_queue_dequeue (&workers_distribute_gray_queue);
306                 if (section) {
307                         sgen_gray_object_enqueue_section (&data->private_gray_queue, section);
308                         return TRUE;
309                 }
310         }
311
312         /* Nobody to steal from */
313         g_assert (sgen_gray_object_queue_is_empty (&data->private_gray_queue));
314         return FALSE;
315 }
316
317 static void
318 workers_gray_queue_share_redirect (SgenGrayQueue *queue)
319 {
320         GrayQueueSection *section;
321         WorkerData *data = queue->alloc_prepare_data;
322
323         if (data->stealable_stack_fill) {
324                 /*
325                  * There are still objects in the stealable stack, so
326                  * wake up any workers that might be sleeping
327                  */
328                 if (workers_state.data.gc_in_progress)
329                         workers_wake_up_all ();
330                 return;
331         }
332
333         /* The stealable stack is empty, so fill it. */
334         mono_mutex_lock (&data->stealable_stack_mutex);
335
336         while (data->stealable_stack_fill < STEALABLE_STACK_SIZE &&
337                         (section = sgen_gray_object_dequeue_section (queue))) {
338                 int num = MIN (section->size, STEALABLE_STACK_SIZE - data->stealable_stack_fill);
339
340                 memcpy (data->stealable_stack + data->stealable_stack_fill,
341                                 section->objects + section->size - num,
342                                 sizeof (char*) * num);
343
344                 section->size -= num;
345                 data->stealable_stack_fill += num;
346
347                 if (section->size)
348                         sgen_gray_object_enqueue_section (queue, section);
349                 else
350                         sgen_gray_object_free_queue_section (section);
351         }
352
353         if (sgen_gray_object_queue_is_empty (queue))
354                 workers_steal (data, data, FALSE);
355
356         mono_mutex_unlock (&data->stealable_stack_mutex);
357
358         if (workers_state.data.gc_in_progress)
359                 workers_wake_up_all ();
360 }
361
362 static void
363 concurrent_enqueue_check (char *obj)
364 {
365         g_assert (sgen_concurrent_collection_in_progress ());
366         g_assert (!sgen_ptr_in_nursery (obj));
367         g_assert (SGEN_LOAD_VTABLE (obj));
368 }
369
370 static void
371 init_private_gray_queue (WorkerData *data)
372 {
373         sgen_gray_object_queue_init_with_alloc_prepare (&data->private_gray_queue,
374                         sgen_get_major_collector ()->is_concurrent ? concurrent_enqueue_check : NULL,
375                         workers_gray_queue_share_redirect, data);
376 }
377
378 static mono_native_thread_return_t
379 workers_thread_func (void *data_untyped)
380 {
381         WorkerData *data = data_untyped;
382         SgenMajorCollector *major = sgen_get_major_collector ();
383
384         mono_thread_info_register_small_id ();
385
386         if (major->init_worker_thread)
387                 major->init_worker_thread (data->major_collector_data);
388
389         init_private_gray_queue (data);
390
391         for (;;) {
392                 gboolean did_work = FALSE;
393
394                 while (workers_dequeue_and_do_job (data)) {
395                         did_work = TRUE;
396                         /* FIXME: maybe distribute the gray queue here? */
397                 }
398
399                 if (workers_marking && (!sgen_gray_object_queue_is_empty (&data->private_gray_queue) || workers_get_work (data))) {
400                         SgenObjectOperations *ops = sgen_concurrent_collection_in_progress ()
401                                 ? &major->major_concurrent_ops
402                                 : &major->major_ops;
403                         ScanCopyContext ctx = { ops->scan_object, NULL, &data->private_gray_queue };
404
405                         g_assert (!sgen_gray_object_queue_is_empty (&data->private_gray_queue));
406
407                         while (!sgen_drain_gray_stack (32, ctx))
408                                 workers_gray_queue_share_redirect (&data->private_gray_queue);
409                         g_assert (sgen_gray_object_queue_is_empty (&data->private_gray_queue));
410
411                         init_private_gray_queue (data);
412
413                         did_work = TRUE;
414                 }
415
416                 if (!did_work)
417                         workers_wait ();
418         }
419
420         /* dummy return to make compilers happy */
421         return NULL;
422 }
423
424 static void
425 init_distribute_gray_queue (gboolean locked)
426 {
427         if (workers_distribute_gray_queue_inited) {
428                 g_assert (sgen_section_gray_queue_is_empty (&workers_distribute_gray_queue));
429                 g_assert (!workers_distribute_gray_queue.locked == !locked);
430                 return;
431         }
432
433         sgen_section_gray_queue_init (&workers_distribute_gray_queue, locked,
434                         sgen_get_major_collector ()->is_concurrent ? concurrent_enqueue_check : NULL);
435         workers_distribute_gray_queue_inited = TRUE;
436 }
437
438 void
439 sgen_workers_init_distribute_gray_queue (void)
440 {
441         if (!collection_needs_workers ())
442                 return;
443
444         init_distribute_gray_queue (sgen_get_major_collector ()->is_concurrent || sgen_get_major_collector ()->is_parallel);
445 }
446
447 void
448 sgen_workers_init (int num_workers)
449 {
450         int i;
451
452         if (!sgen_get_major_collector ()->is_parallel && !sgen_get_major_collector ()->is_concurrent)
453                 return;
454
455         //g_print ("initing %d workers\n", num_workers);
456
457         workers_num = num_workers;
458
459         workers_data = sgen_alloc_internal_dynamic (sizeof (WorkerData) * num_workers, INTERNAL_MEM_WORKER_DATA, TRUE);
460         memset (workers_data, 0, sizeof (WorkerData) * num_workers);
461
462         MONO_SEM_INIT (&workers_waiting_sem, 0);
463         MONO_SEM_INIT (&workers_done_sem, 0);
464
465         init_distribute_gray_queue (sgen_get_major_collector ()->is_concurrent || sgen_get_major_collector ()->is_parallel);
466
467         if (sgen_get_major_collector ()->alloc_worker_data)
468                 workers_gc_thread_major_collector_data = sgen_get_major_collector ()->alloc_worker_data ();
469
470         for (i = 0; i < workers_num; ++i) {
471                 workers_data [i].index = i;
472
473                 /* private gray queue is inited by the thread itself */
474                 mono_mutex_init (&workers_data [i].stealable_stack_mutex);
475                 workers_data [i].stealable_stack_fill = 0;
476
477                 if (sgen_get_major_collector ()->alloc_worker_data)
478                         workers_data [i].major_collector_data = sgen_get_major_collector ()->alloc_worker_data ();
479         }
480
481         LOCK_INIT (workers_job_queue_mutex);
482
483         sgen_register_fixed_internal_mem_type (INTERNAL_MEM_JOB_QUEUE_ENTRY, sizeof (JobQueueEntry));
484
485         mono_counters_register ("Stolen from self lock", MONO_COUNTER_GC | MONO_COUNTER_LONG, &stat_workers_stolen_from_self_lock);
486         mono_counters_register ("Stolen from self no lock", MONO_COUNTER_GC | MONO_COUNTER_LONG, &stat_workers_stolen_from_self_no_lock);
487         mono_counters_register ("Stolen from others", MONO_COUNTER_GC | MONO_COUNTER_LONG, &stat_workers_stolen_from_others);
488         mono_counters_register ("# workers waited", MONO_COUNTER_GC | MONO_COUNTER_LONG, &stat_workers_num_waited);
489 }
490
491 /* only the GC thread is allowed to start and join workers */
492
493 static void
494 workers_start_worker (int index)
495 {
496         g_assert (index >= 0 && index < workers_num);
497
498         g_assert (!workers_data [index].thread);
499         mono_native_thread_create (&workers_data [index].thread, workers_thread_func, &workers_data [index]);
500 }
501
502 void
503 sgen_workers_start_all_workers (void)
504 {
505         State old_state, new_state;
506         int i;
507
508         if (!collection_needs_workers ())
509                 return;
510
511         if (sgen_get_major_collector ()->init_worker_thread)
512                 sgen_get_major_collector ()->init_worker_thread (workers_gc_thread_major_collector_data);
513
514         old_state = new_state = workers_state;
515         g_assert (!old_state.data.gc_in_progress);
516         new_state.data.gc_in_progress = TRUE;
517
518         workers_marking = FALSE;
519
520         g_assert (workers_job_queue_num_entries == 0);
521         workers_num_jobs_enqueued = 0;
522         workers_num_jobs_finished = 0;
523
524         if (workers_started) {
525                 g_assert (old_state.data.done_posted);
526                 if (old_state.data.num_waiting != workers_num) {
527                         g_error ("Expecting all %d sgen workers to be parked, but only %d are",
528                                         workers_num, old_state.data.num_waiting);
529                 }
530
531                 /* Clear the done posted flag */
532                 new_state.data.done_posted = 0;
533                 if (!set_state (old_state, new_state))
534                         g_assert_not_reached ();
535
536                 workers_wake_up_all ();
537                 return;
538         }
539
540         g_assert (!old_state.data.done_posted);
541
542         if (!set_state (old_state, new_state))
543                 g_assert_not_reached ();
544
545         for (i = 0; i < workers_num; ++i)
546                 workers_start_worker (i);
547
548         workers_started = TRUE;
549 }
550
551 gboolean
552 sgen_workers_have_started (void)
553 {
554         return workers_state.data.gc_in_progress;
555 }
556
557 void
558 sgen_workers_start_marking (void)
559 {
560         if (!collection_needs_workers ())
561                 return;
562
563         g_assert (workers_started && workers_state.data.gc_in_progress);
564         g_assert (!workers_marking);
565
566         workers_marking = TRUE;
567
568         workers_wake_up_all ();
569 }
570
571 void
572 sgen_workers_join (void)
573 {
574         State old_state, new_state;
575         int i;
576
577         if (!collection_needs_workers ())
578                 return;
579
580         do {
581                 old_state = new_state = workers_state;
582                 g_assert (old_state.data.gc_in_progress);
583                 g_assert (!old_state.data.done_posted);
584
585                 new_state.data.gc_in_progress = 0;
586         } while (!set_state (old_state, new_state));
587
588         if (new_state.data.num_waiting == workers_num) {
589                 /*
590                  * All the workers have shut down but haven't posted
591                  * the done semaphore yet, or, if we come from below,
592                  * haven't done all their work yet.
593                  *
594                  * It's not a big deal to wake them up again - they'll
595                  * just do one iteration of their loop trying to find
596                  * something to do and then go back to waiting again.
597                  */
598         reawaken:
599                 workers_wake_up_all ();
600         }
601         MONO_SEM_WAIT (&workers_done_sem);
602
603         old_state = new_state = workers_state;
604         g_assert (old_state.data.num_waiting == workers_num);
605         g_assert (old_state.data.done_posted);
606
607         if (workers_job_queue_num_entries || !sgen_section_gray_queue_is_empty (&workers_distribute_gray_queue)) {
608                 /*
609                  * There's a small race condition that we avoid here.
610                  * It's possible that a worker thread runs out of
611                  * things to do, so it goes to sleep.  Right at that
612                  * moment a new job is enqueued, but the thread is
613                  * still registered as running.  Now the threads are
614                  * joined, and we wait for the semaphore.  Only at
615                  * this point does the worker go to sleep, and posts
616                  * the semaphore, because workers_gc_in_progress is
617                  * already FALSE.  The job is still in the queue,
618                  * though.
619                  *
620                  * Clear the done posted flag.
621                  */
622                 new_state.data.done_posted = 0;
623                 if (!set_state (old_state, new_state))
624                         g_assert_not_reached ();
625                 goto reawaken;
626         }
627
628         /* At this point all the workers have stopped. */
629
630         workers_marking = FALSE;
631
632         if (sgen_get_major_collector ()->reset_worker_data) {
633                 for (i = 0; i < workers_num; ++i)
634                         sgen_get_major_collector ()->reset_worker_data (workers_data [i].major_collector_data);
635         }
636
637         g_assert (workers_job_queue_num_entries == 0);
638         g_assert (sgen_section_gray_queue_is_empty (&workers_distribute_gray_queue));
639         for (i = 0; i < workers_num; ++i) {
640                 g_assert (!workers_data [i].stealable_stack_fill);
641                 g_assert (sgen_gray_object_queue_is_empty (&workers_data [i].private_gray_queue));
642         }
643 }
644
645 gboolean
646 sgen_workers_all_done (void)
647 {
648         State state = workers_state;
649         /*
650          * Can only be called while the collection is still in
651          * progress, i.e., before done has been posted.
652          */
653         g_assert (state.data.gc_in_progress);
654         g_assert (!state.data.done_posted);
655         return state.data.num_waiting == workers_num;
656 }
657
658 gboolean
659 sgen_is_worker_thread (MonoNativeThreadId thread)
660 {
661         int i;
662
663         if (sgen_get_major_collector ()->is_worker_thread && sgen_get_major_collector ()->is_worker_thread (thread))
664                 return TRUE;
665
666         for (i = 0; i < workers_num; ++i) {
667                 if (workers_data [i].thread == thread)
668                         return TRUE;
669         }
670         return FALSE;
671 }
672
673 SgenSectionGrayQueue*
674 sgen_workers_get_distribute_section_gray_queue (void)
675 {
676         return &workers_distribute_gray_queue;
677 }
678
679 void
680 sgen_workers_reset_data (void)
681 {
682         if (sgen_get_major_collector ()->reset_worker_data)
683                 sgen_get_major_collector ()->reset_worker_data (workers_gc_thread_major_collector_data);
684 }
685 #endif