2 * Copyright 2001-2003 Ximian, Inc
3 * Copyright 2003-2010 Novell, Inc.
5 * Permission is hereby granted, free of charge, to any person obtaining
6 * a copy of this software and associated documentation files (the
7 * "Software"), to deal in the Software without restriction, including
8 * without limitation the rights to use, copy, modify, merge, publish,
9 * distribute, sublicense, and/or sell copies of the Software, and to
10 * permit persons to whom the Software is furnished to do so, subject to
11 * the following conditions:
13 * The above copyright notice and this permission notice shall be
14 * included in all copies or substantial portions of the Software.
16 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
17 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
18 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
19 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
20 * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
21 * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
22 * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
25 typedef struct _WorkerData WorkerData;
28 MonoSemType start_worker_sem;
30 GrayQueue private_gray_queue; /* only read/written by worker thread */
31 int shared_buffer_increment;
32 int shared_buffer_index;
35 static int workers_num;
36 static WorkerData *workers_data;
37 static WorkerData workers_gc_thread_data;
39 static int workers_num_working;
41 static GrayQueue workers_distribute_gray_queue;
43 #define WORKERS_DISTRIBUTE_GRAY_QUEUE (major_collector.is_parallel ? &workers_distribute_gray_queue : &gray_queue)
46 * Must be a power of 2. It seems that larger values don't help much.
47 * The main reason to make this larger would be to sustain a bigger
48 * number of worker threads.
50 #define WORKERS_SHARED_BUFFER_SIZE 16
51 static GrayQueueSection *workers_shared_buffer [WORKERS_SHARED_BUFFER_SIZE];
52 static int workers_shared_buffer_used;
54 static const int workers_primes [] = { 3, 5, 7, 11, 13, 17, 23, 29 };
56 static MonoSemType workers_done_sem;
58 static long long stat_shared_buffer_insert_tries;
59 static long long stat_shared_buffer_insert_full;
60 static long long stat_shared_buffer_insert_iterations;
61 static long long stat_shared_buffer_insert_failures;
62 static long long stat_shared_buffer_remove_tries;
63 static long long stat_shared_buffer_remove_iterations;
64 static long long stat_shared_buffer_remove_empty;
67 workers_gray_queue_share_redirect (GrayQueue *queue)
69 GrayQueueSection *section;
70 WorkerData *data = queue->alloc_prepare_data;
71 int increment = data->shared_buffer_increment;
73 while ((section = gray_object_dequeue_section (queue))) {
76 HEAVY_STAT (++stat_shared_buffer_insert_tries);
78 if (workers_shared_buffer_used == WORKERS_SHARED_BUFFER_SIZE) {
79 HEAVY_STAT (++stat_shared_buffer_insert_full);
80 gray_object_enqueue_section (queue, section);
84 index = data->shared_buffer_index;
85 for (i = 0; i < WORKERS_SHARED_BUFFER_SIZE; ++i) {
86 GrayQueueSection *old = workers_shared_buffer [index];
87 HEAVY_STAT (++stat_shared_buffer_insert_iterations);
89 if (SGEN_CAS_PTR ((void**)&workers_shared_buffer [index], section, NULL) == NULL) {
90 SGEN_ATOMIC_ADD (workers_shared_buffer_used, 1);
91 //g_print ("thread %d put section %d\n", data - workers_data, index);
95 index = (index + increment) & (WORKERS_SHARED_BUFFER_SIZE - 1);
97 data->shared_buffer_index = index;
99 if (i == WORKERS_SHARED_BUFFER_SIZE) {
101 HEAVY_STAT (++stat_shared_buffer_insert_failures);
102 gray_object_enqueue_section (queue, section);
109 workers_get_work (WorkerData *data)
112 int increment = data->shared_buffer_increment;
114 HEAVY_STAT (++stat_shared_buffer_remove_tries);
116 index = data->shared_buffer_index;
117 for (i = 0; i < WORKERS_SHARED_BUFFER_SIZE; ++i) {
118 GrayQueueSection *section;
120 HEAVY_STAT (++stat_shared_buffer_remove_iterations);
123 section = workers_shared_buffer [index];
126 } while (SGEN_CAS_PTR ((void**)&workers_shared_buffer [index], NULL, section) != section);
129 SGEN_ATOMIC_ADD (workers_shared_buffer_used, -1);
130 gray_object_enqueue_section (&data->private_gray_queue, section);
131 data->shared_buffer_index = index;
132 //g_print ("thread %d popped section %d\n", data - workers_data, index);
136 index = (index + increment) & (WORKERS_SHARED_BUFFER_SIZE - 1);
139 HEAVY_STAT (++stat_shared_buffer_remove_empty);
141 data->shared_buffer_index = index;
145 /* returns the new value */
147 workers_change_num_working (int delta)
151 if (!major_collector.is_parallel)
155 old = workers_num_working;
157 } while (InterlockedCompareExchange (&workers_num_working, new, old) != old);
162 workers_thread_func (void *data_untyped)
164 WorkerData *data = data_untyped;
165 SgenInternalAllocator allocator;
167 memset (&allocator, 0, sizeof (allocator));
169 gray_object_queue_init_with_alloc_prepare (&data->private_gray_queue, &allocator,
170 workers_gray_queue_share_redirect, data);
173 //g_print ("worker waiting for start %d\n", data->start_worker_sem);
175 MONO_SEM_WAIT (&data->start_worker_sem);
177 //g_print ("worker starting\n");
181 drain_gray_stack (&data->private_gray_queue);
182 } while (workers_get_work (data));
185 * FIXME: This might never terminate with
189 if (workers_change_num_working (-1) == 0)
192 /* we weren't the last one working */
193 //g_print ("sleeping\n");
195 workers_change_num_working (1);
198 gray_object_queue_init (&data->private_gray_queue, &allocator);
200 MONO_SEM_POST (&workers_done_sem);
202 //g_print ("worker done\n");
205 /* dummy return to make compilers happy */
210 workers_distribute_gray_queue_sections (void)
212 if (!major_collector.is_parallel)
215 workers_gray_queue_share_redirect (&workers_distribute_gray_queue);
219 workers_init (int num_workers)
223 if (!major_collector.is_parallel)
226 //g_print ("initing %d workers\n", num_workers);
228 workers_num = num_workers;
229 workers_data = mono_sgen_alloc_internal_dynamic (sizeof (WorkerData) * num_workers, INTERNAL_MEM_WORKER_DATA);
230 MONO_SEM_INIT (&workers_done_sem, 0);
231 workers_gc_thread_data.shared_buffer_increment = 1;
232 workers_gc_thread_data.shared_buffer_index = 0;
233 gray_object_queue_init_with_alloc_prepare (&workers_distribute_gray_queue, mono_sgen_get_unmanaged_allocator (),
234 workers_gray_queue_share_redirect, &workers_gc_thread_data);
236 g_assert (num_workers <= sizeof (workers_primes) / sizeof (workers_primes [0]));
237 for (i = 0; i < workers_num; ++i) {
238 workers_data [i].shared_buffer_increment = workers_primes [i];
239 workers_data [i].shared_buffer_index = 0;
242 mono_counters_register ("Shared buffer insert tries", MONO_COUNTER_GC | MONO_COUNTER_LONG, &stat_shared_buffer_insert_tries);
243 mono_counters_register ("Shared buffer insert full", MONO_COUNTER_GC | MONO_COUNTER_LONG, &stat_shared_buffer_insert_full);
244 mono_counters_register ("Shared buffer insert iterations", MONO_COUNTER_GC | MONO_COUNTER_LONG, &stat_shared_buffer_insert_iterations);
245 mono_counters_register ("Shared buffer insert failures", MONO_COUNTER_GC | MONO_COUNTER_LONG, &stat_shared_buffer_insert_failures);
246 mono_counters_register ("Shared buffer remove tries", MONO_COUNTER_GC | MONO_COUNTER_LONG, &stat_shared_buffer_remove_tries);
247 mono_counters_register ("Shared buffer remove iterations", MONO_COUNTER_GC | MONO_COUNTER_LONG, &stat_shared_buffer_remove_iterations);
248 mono_counters_register ("Shared buffer remove empty", MONO_COUNTER_GC | MONO_COUNTER_LONG, &stat_shared_buffer_remove_empty);
251 /* only the GC thread is allowed to start and join workers */
254 workers_start_worker (int index)
256 g_assert (index >= 0 && index < workers_num);
258 if (workers_data [index].is_working)
261 if (!workers_data [index].thread) {
262 //g_print ("initing thread %d\n", index);
263 MONO_SEM_INIT (&workers_data [index].start_worker_sem, 0);
264 pthread_create (&workers_data [index].thread, NULL, workers_thread_func, &workers_data [index]);
267 workers_data [index].is_working = TRUE;
268 MONO_SEM_POST (&workers_data [index].start_worker_sem);
269 //g_print ("posted thread start %d %d\n", index, workers_data [index].start_worker_sem);
273 workers_start_all_workers (int num_additional_workers)
277 if (!major_collector.is_parallel)
280 g_assert (workers_num_working == 0);
281 workers_num_working = workers_num + num_additional_workers;
283 for (i = 0; i < workers_num; ++i)
284 workers_start_worker (i);
292 if (!major_collector.is_parallel)
295 //g_print ("joining\n");
296 for (i = 0; i < workers_num; ++i) {
297 if (workers_data [i].is_working)
298 MONO_SEM_WAIT (&workers_done_sem);
300 for (i = 0; i < workers_num; ++i)
301 workers_data [i].is_working = FALSE;
302 //g_print ("joined\n");
304 g_assert (workers_num_working == 0);
305 g_assert (workers_shared_buffer_used == 0);
307 for (i = 0; i < WORKERS_SHARED_BUFFER_SIZE; ++i)
308 g_assert (!workers_shared_buffer [i]);
312 mono_sgen_is_worker_thread (pthread_t thread)
316 if (!major_collector.is_parallel)
319 for (i = 0; i < workers_num; ++i) {
320 if (workers_data [i].thread == thread)