497a5c0d44a65a2d191c3d8a9128a5faff3b43b9
[mono.git] / mono / metadata / sgen-workers.c
1 /*
2  * Copyright 2001-2003 Ximian, Inc
3  * Copyright 2003-2010 Novell, Inc.
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining
6  * a copy of this software and associated documentation files (the
7  * "Software"), to deal in the Software without restriction, including
8  * without limitation the rights to use, copy, modify, merge, publish,
9  * distribute, sublicense, and/or sell copies of the Software, and to
10  * permit persons to whom the Software is furnished to do so, subject to
11  * the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be
14  * included in all copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
17  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
18  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
19  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
20  * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
21  * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
22  * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
23  */
24
25 #define STEALABLE_STACK_SIZE    512
26
27 typedef struct _WorkerData WorkerData;
28 struct _WorkerData {
29         pthread_t thread;
30         void *major_collector_data;
31
32         GrayQueue private_gray_queue; /* only read/written by worker thread */
33
34         pthread_mutex_t stealable_stack_mutex;
35         volatile int stealable_stack_fill;
36         char *stealable_stack [STEALABLE_STACK_SIZE];
37 };
38
39 typedef void (*JobFunc) (WorkerData *worker_data, void *job_data);
40
41 typedef struct _JobQueueEntry JobQueueEntry;
42 struct _JobQueueEntry {
43         JobFunc func;
44         void *data;
45
46         volatile JobQueueEntry *next;
47 };
48
49 static int workers_num;
50 static WorkerData *workers_data;
51 static WorkerData workers_gc_thread_data;
52
53 static GrayQueue workers_distribute_gray_queue;
54
55 #define WORKERS_DISTRIBUTE_GRAY_QUEUE (major_collector.is_parallel ? &workers_distribute_gray_queue : &gray_queue)
56
57 static volatile gboolean workers_gc_in_progress = FALSE;
58 static volatile gboolean workers_marking = FALSE;
59 static gboolean workers_started = FALSE;
60 static volatile int workers_num_waiting = 0;
61 static MonoSemType workers_waiting_sem;
62 static MonoSemType workers_done_sem;
63 static volatile int workers_done_posted = 0;
64
65 static volatile int workers_job_queue_num_entries = 0;
66 static volatile JobQueueEntry *workers_job_queue = NULL;
67 static LOCK_DECLARE (workers_job_queue_mutex);
68
69 static long long stat_workers_stolen_from_self_lock;
70 static long long stat_workers_stolen_from_self_no_lock;
71 static long long stat_workers_stolen_from_others;
72 static long long stat_workers_num_waited;
73
74 static void
75 workers_wake_up (int max)
76 {
77         int i;
78
79         for (i = 0; i < max; ++i) {
80                 int num;
81                 do {
82                         num = workers_num_waiting;
83                         if (num == 0)
84                                 return;
85                 } while (InterlockedCompareExchange (&workers_num_waiting, num - 1, num) != num);
86                 MONO_SEM_POST (&workers_waiting_sem);
87         }
88 }
89
90 static void
91 workers_wake_up_all (void)
92 {
93         workers_wake_up (workers_num);
94 }
95
96 static void
97 workers_wait (void)
98 {
99         int num;
100         ++stat_workers_num_waited;
101         do {
102                 num = workers_num_waiting;
103         } while (InterlockedCompareExchange (&workers_num_waiting, num + 1, num) != num);
104         if (num + 1 == workers_num && !workers_gc_in_progress) {
105                 /* Make sure the done semaphore is only posted once. */
106                 int posted;
107                 do {
108                         posted = workers_done_posted;
109                         if (posted)
110                                 break;
111                 } while (InterlockedCompareExchange (&workers_done_posted, 1, 0) != 0);
112                 if (!posted)
113                         MONO_SEM_POST (&workers_done_sem);
114         }
115         MONO_SEM_WAIT (&workers_waiting_sem);
116 }
117
118 static void
119 workers_enqueue_job (JobFunc func, void *data)
120 {
121         int num_entries;
122         JobQueueEntry *entry;
123
124         if (!major_collector.is_parallel) {
125                 func (NULL, data);
126                 return;
127         }
128
129         entry = mono_sgen_alloc_internal (INTERNAL_MEM_JOB_QUEUE_ENTRY);
130         entry->func = func;
131         entry->data = data;
132
133         pthread_mutex_lock (&workers_job_queue_mutex);
134         entry->next = workers_job_queue;
135         workers_job_queue = entry;
136         num_entries = ++workers_job_queue_num_entries;
137         pthread_mutex_unlock (&workers_job_queue_mutex);
138
139         workers_wake_up (num_entries);
140 }
141
142 static gboolean
143 workers_dequeue_and_do_job (WorkerData *data)
144 {
145         JobQueueEntry *entry;
146
147         g_assert (major_collector.is_parallel);
148
149         if (!workers_job_queue_num_entries)
150                 return FALSE;
151
152         pthread_mutex_lock (&workers_job_queue_mutex);
153         entry = (JobQueueEntry*)workers_job_queue;
154         if (entry) {
155                 workers_job_queue = entry->next;
156                 --workers_job_queue_num_entries;
157         }
158         pthread_mutex_unlock (&workers_job_queue_mutex);
159
160         if (!entry)
161                 return FALSE;
162
163         entry->func (data, entry->data);
164         mono_sgen_free_internal (entry, INTERNAL_MEM_JOB_QUEUE_ENTRY);
165         return TRUE;
166 }
167
168 static gboolean
169 workers_steal (WorkerData *data, WorkerData *victim_data, gboolean lock)
170 {
171         GrayQueue *queue = &data->private_gray_queue;
172         int num, n;
173
174         g_assert (!queue->first);
175
176         if (!victim_data->stealable_stack_fill)
177                 return FALSE;
178
179         if (lock && pthread_mutex_trylock (&victim_data->stealable_stack_mutex))
180                 return FALSE;
181
182         n = num = (victim_data->stealable_stack_fill + 1) / 2;
183         /* We're stealing num entries. */
184
185         while (n > 0) {
186                 int m = MIN (SGEN_GRAY_QUEUE_SECTION_SIZE, n);
187                 n -= m;
188
189                 gray_object_alloc_queue_section (queue);
190                 memcpy (queue->first->objects,
191                                 victim_data->stealable_stack + victim_data->stealable_stack_fill - num + n,
192                                 sizeof (char*) * m);
193                 queue->first->end = m;
194         }
195
196         victim_data->stealable_stack_fill -= num;
197
198         if (lock)
199                 pthread_mutex_unlock (&victim_data->stealable_stack_mutex);
200
201         if (data == victim_data) {
202                 if (lock)
203                         stat_workers_stolen_from_self_lock += num;
204                 else
205                         stat_workers_stolen_from_self_no_lock += num;
206         } else {
207                 stat_workers_stolen_from_others += num;
208         }
209
210         return num != 0;
211 }
212
213 static gboolean
214 workers_get_work (WorkerData *data)
215 {
216         int i;
217
218         g_assert (gray_object_queue_is_empty (&data->private_gray_queue));
219
220         /* Try to steal from our own stack. */
221         if (workers_steal (data, data, TRUE))
222                 return TRUE;
223
224         /* Then from the GC thread's stack. */
225         if (workers_steal (data, &workers_gc_thread_data, TRUE))
226                 return TRUE;
227
228         /* Finally, from another worker. */
229         for (i = 0; i < workers_num; ++i) {
230                 WorkerData *victim_data = &workers_data [i];
231                 if (data == victim_data)
232                         continue;
233                 if (workers_steal (data, victim_data, TRUE))
234                         return TRUE;
235         }
236
237         /* Nobody to steal from */
238         g_assert (gray_object_queue_is_empty (&data->private_gray_queue));
239         return FALSE;
240 }
241
242 static void
243 workers_gray_queue_share_redirect (GrayQueue *queue)
244 {
245         GrayQueueSection *section;
246         WorkerData *data = queue->alloc_prepare_data;
247
248         if (data->stealable_stack_fill) {
249                 /*
250                  * There are still objects in the stealable stack, so
251                  * wake up any workers that might be sleeping
252                  */
253                 if (workers_gc_in_progress)
254                         workers_wake_up_all ();
255                 return;
256         }
257
258         /* The stealable stack is empty, so fill it. */
259         pthread_mutex_lock (&data->stealable_stack_mutex);
260
261         while (data->stealable_stack_fill < STEALABLE_STACK_SIZE &&
262                         (section = gray_object_dequeue_section (queue))) {
263                 int num = MIN (section->end, STEALABLE_STACK_SIZE - data->stealable_stack_fill);
264
265                 memcpy (data->stealable_stack + data->stealable_stack_fill,
266                                 section->objects + section->end - num,
267                                 sizeof (char*) * num);
268
269                 section->end -= num;
270                 data->stealable_stack_fill += num;
271
272                 if (section->end)
273                         gray_object_enqueue_section (queue, section);
274                 else
275                         gray_object_free_queue_section (section);
276         }
277
278         if (data != &workers_gc_thread_data && gray_object_queue_is_empty (queue))
279                 workers_steal (data, data, FALSE);
280
281         pthread_mutex_unlock (&data->stealable_stack_mutex);
282
283         if (workers_gc_in_progress)
284                 workers_wake_up_all ();
285 }
286
287 static void*
288 workers_thread_func (void *data_untyped)
289 {
290         WorkerData *data = data_untyped;
291
292         if (major_collector.init_worker_thread)
293                 major_collector.init_worker_thread (data->major_collector_data);
294
295         gray_object_queue_init_with_alloc_prepare (&data->private_gray_queue,
296                         workers_gray_queue_share_redirect, data);
297
298         for (;;) {
299                 gboolean did_work = FALSE;
300
301                 while (workers_dequeue_and_do_job (data)) {
302                         did_work = TRUE;
303                         /* FIXME: maybe distribute the gray queue here? */
304                 }
305
306                 if (workers_marking && (!gray_object_queue_is_empty (&data->private_gray_queue) || workers_get_work (data))) {
307                         g_assert (!gray_object_queue_is_empty (&data->private_gray_queue));
308
309                         while (!drain_gray_stack (&data->private_gray_queue, 32))
310                                 workers_gray_queue_share_redirect (&data->private_gray_queue);
311                         g_assert (gray_object_queue_is_empty (&data->private_gray_queue));
312
313                         gray_object_queue_init (&data->private_gray_queue);
314
315                         did_work = TRUE;
316                 }
317
318                 if (!did_work)
319                         workers_wait ();
320         }
321
322         /* dummy return to make compilers happy */
323         return NULL;
324 }
325
326 static void
327 workers_distribute_gray_queue_sections (void)
328 {
329         if (!major_collector.is_parallel)
330                 return;
331
332         workers_gray_queue_share_redirect (&workers_distribute_gray_queue);
333 }
334
335 static void
336 workers_init_distribute_gray_queue (void)
337 {
338         if (!major_collector.is_parallel)
339                 return;
340
341         gray_object_queue_init (&workers_distribute_gray_queue);
342 }
343
344 static void
345 workers_init (int num_workers)
346 {
347         int i;
348
349         if (!major_collector.is_parallel)
350                 return;
351
352         //g_print ("initing %d workers\n", num_workers);
353
354         workers_num = num_workers;
355
356         workers_data = mono_sgen_alloc_internal_dynamic (sizeof (WorkerData) * num_workers, INTERNAL_MEM_WORKER_DATA);
357         memset (workers_data, 0, sizeof (WorkerData) * num_workers);
358
359         MONO_SEM_INIT (&workers_waiting_sem, 0);
360         MONO_SEM_INIT (&workers_done_sem, 0);
361
362         gray_object_queue_init_with_alloc_prepare (&workers_distribute_gray_queue,
363                         workers_gray_queue_share_redirect, &workers_gc_thread_data);
364         pthread_mutex_init (&workers_gc_thread_data.stealable_stack_mutex, NULL);
365         workers_gc_thread_data.stealable_stack_fill = 0;
366
367         if (major_collector.alloc_worker_data)
368                 workers_gc_thread_data.major_collector_data = major_collector.alloc_worker_data ();
369
370         for (i = 0; i < workers_num; ++i) {
371                 /* private gray queue is inited by the thread itself */
372                 pthread_mutex_init (&workers_data [i].stealable_stack_mutex, NULL);
373                 workers_data [i].stealable_stack_fill = 0;
374
375                 if (major_collector.alloc_worker_data)
376                         workers_data [i].major_collector_data = major_collector.alloc_worker_data ();
377         }
378
379         LOCK_INIT (workers_job_queue_mutex);
380
381         mono_sgen_register_fixed_internal_mem_type (INTERNAL_MEM_JOB_QUEUE_ENTRY, sizeof (JobQueueEntry));
382
383         mono_counters_register ("Stolen from self lock", MONO_COUNTER_GC | MONO_COUNTER_LONG, &stat_workers_stolen_from_self_lock);
384         mono_counters_register ("Stolen from self no lock", MONO_COUNTER_GC | MONO_COUNTER_LONG, &stat_workers_stolen_from_self_no_lock);
385         mono_counters_register ("Stolen from others", MONO_COUNTER_GC | MONO_COUNTER_LONG, &stat_workers_stolen_from_others);
386         mono_counters_register ("# workers waited", MONO_COUNTER_GC | MONO_COUNTER_LONG, &stat_workers_num_waited);
387 }
388
389 /* only the GC thread is allowed to start and join workers */
390
391 static void
392 workers_start_worker (int index)
393 {
394         g_assert (index >= 0 && index < workers_num);
395
396         g_assert (!workers_data [index].thread);
397         pthread_create (&workers_data [index].thread, NULL, workers_thread_func, &workers_data [index]);
398 }
399
400 static void
401 workers_start_all_workers (void)
402 {
403         int i;
404
405         if (!major_collector.is_parallel)
406                 return;
407
408         if (major_collector.init_worker_thread)
409                 major_collector.init_worker_thread (workers_gc_thread_data.major_collector_data);
410
411         g_assert (!workers_gc_in_progress);
412         workers_gc_in_progress = TRUE;
413         workers_marking = FALSE;
414         workers_done_posted = 0;
415
416         if (workers_started) {
417                 g_assert (workers_num_waiting == workers_num);
418                 workers_wake_up_all ();
419                 return;
420         }
421
422         for (i = 0; i < workers_num; ++i)
423                 workers_start_worker (i);
424
425         workers_started = TRUE;
426 }
427
428 static void
429 workers_start_marking (void)
430 {
431         if (!major_collector.is_parallel)
432                 return;
433
434         g_assert (workers_started && workers_gc_in_progress);
435         g_assert (!workers_marking);
436
437         workers_marking = TRUE;
438
439         workers_wake_up_all ();
440 }
441
442 static void
443 workers_join (void)
444 {
445         int i;
446
447         if (!major_collector.is_parallel)
448                 return;
449
450         g_assert (gray_object_queue_is_empty (&workers_gc_thread_data.private_gray_queue));
451         g_assert (gray_object_queue_is_empty (&workers_distribute_gray_queue));
452
453         g_assert (workers_gc_in_progress);
454         workers_gc_in_progress = FALSE;
455         if (workers_num_waiting == workers_num) {
456                 /*
457                  * All the workers might have shut down at this point
458                  * and posted the done semaphore but we don't know it
459                  * yet.  It's not a big deal to wake them up again -
460                  * they'll just do one iteration of their loop trying to
461                  * find something to do and then go back to waiting
462                  * again.
463                  */
464                 workers_wake_up_all ();
465         }
466         MONO_SEM_WAIT (&workers_done_sem);
467         workers_marking = FALSE;
468
469         if (major_collector.reset_worker_data) {
470                 for (i = 0; i < workers_num; ++i)
471                         major_collector.reset_worker_data (workers_data [i].major_collector_data);
472         }
473
474         g_assert (workers_done_posted);
475
476         g_assert (!workers_gc_thread_data.stealable_stack_fill);
477         g_assert (gray_object_queue_is_empty (&workers_gc_thread_data.private_gray_queue));
478         for (i = 0; i < workers_num; ++i) {
479                 g_assert (!workers_data [i].stealable_stack_fill);
480                 g_assert (gray_object_queue_is_empty (&workers_data [i].private_gray_queue));
481         }
482 }
483
484 gboolean
485 mono_sgen_is_worker_thread (pthread_t thread)
486 {
487         int i;
488
489         if (major_collector.is_worker_thread && major_collector.is_worker_thread (thread))
490                 return TRUE;
491
492         if (!major_collector.is_parallel)
493                 return FALSE;
494
495         for (i = 0; i < workers_num; ++i) {
496                 if (workers_data [i].thread == thread)
497                         return TRUE;
498         }
499         return FALSE;
500 }