gint64 as_gint64;
} ThreadPoolWorkerCounter;
-typedef MonoInternalThread ThreadPoolWorkerThread;
-
typedef struct {
MonoRefCount ref;
ThreadPoolWorkerCounter counters;
- GPtrArray *threads; // ThreadPoolWorkerThread* []
- MonoCoopMutex threads_lock; /* protect access to working_threads and parked_threads */
+ MonoCoopMutex parked_threads_lock;
gint32 parked_threads_count;
MonoCoopCond parked_threads_cond;
- MonoCoopCond threads_exit_cond;
ThreadPoolWorkItem *work_items; // ThreadPoolWorkItem []
gint32 work_items_count;
static void
destroy (gpointer data)
{
-#if 0
- mono_coop_mutex_destroy (&worker.threads_lock);
+ mono_coop_mutex_destroy (&worker.parked_threads_lock);
mono_coop_cond_destroy (&worker.parked_threads_cond);
mono_coop_mutex_destroy (&worker.work_items_lock);
mono_coop_mutex_destroy (&worker.heuristic_lock);
g_free (worker.cpu_usage_state);
-#endif
}
void
mono_refcount_init (&worker, destroy);
- worker.threads = g_ptr_array_new ();
- mono_coop_mutex_init (&worker.threads_lock);
+ mono_coop_mutex_init (&worker.parked_threads_lock);
worker.parked_threads_count = 0;
mono_coop_cond_init (&worker.parked_threads_cond);
- mono_coop_cond_init (&worker.threads_exit_cond);
/* worker.work_items_size is inited to 0 */
mono_coop_mutex_init (&worker.work_items_lock);
void
mono_threadpool_worker_cleanup (void)
{
- MonoInternalThread *current;
-
- /* we make the assumption along the code that we are
- * cleaning up only if the runtime is shutting down */
- g_assert (mono_runtime_is_shutting_down ());
-
- current = mono_thread_internal_current ();
-
- while (worker.monitor_status != MONITOR_STATUS_NOT_RUNNING)
- mono_thread_info_sleep (1, NULL);
-
- mono_coop_mutex_lock (&worker.threads_lock);
-
- /* unpark all worker.parked_threads */
- mono_coop_cond_broadcast (&worker.parked_threads_cond);
-
-#if 0
- for (;;) {
- ThreadPoolWorkerCounter counter;
-
- counter = COUNTER_READ ();
- if (counter._.starting + counter._.working + counter._.parked == 0)
- break;
-
- if (counter._.starting + counter._.working + counter._.parked == 1) {
- if (worker.threads->len == 1 && g_ptr_array_index (worker.threads, 0) == current) {
- /* We are waiting on ourselves */
- break;
- }
- }
-
- mono_coop_cond_wait (&worker.threads_exit_cond, &worker.threads_lock);
- }
-#endif
-
- mono_coop_mutex_unlock (&worker.threads_lock);
-
mono_refcount_dec (&worker);
}
void
mono_threadpool_worker_enqueue (MonoThreadPoolWorkerCallback callback, gpointer data)
{
+ if (!mono_refcount_tryinc (&worker))
+ return;
+
work_item_push (callback, data);
worker_request ();
+
+ mono_refcount_dec (&worker);
}
static void
worker_wait_interrupt (gpointer unused)
{
- mono_coop_mutex_lock (&worker.threads_lock);
- mono_coop_cond_signal (&worker.parked_threads_cond);
- mono_coop_mutex_unlock (&worker.threads_lock);
+ /* If the runtime is not shutting down, we are not using this mechanism to wake up a unparked thread, and if the
+ * runtime is shutting down, then we need to wake up ALL the threads.
+ * It might be a bit wasteful, but I witnessed shutdown hang where the main thread would abort and then wait for all
+ * background threads to exit (see mono_thread_manage). This would go wrong because not all threadpool threads would
+ * be unparked. It would end up getting unstucked because of the timeout, but that would delay shutdown by 5-60s. */
+ if (!mono_runtime_is_shutting_down ())
+ return;
+
+ if (!mono_refcount_tryinc (&worker))
+ return;
+
+ mono_coop_mutex_lock (&worker.parked_threads_lock);
+ mono_coop_cond_broadcast (&worker.parked_threads_cond);
+ mono_coop_mutex_unlock (&worker.parked_threads_lock);
mono_refcount_dec (&worker);
}
worker_park (void)
{
gboolean timeout = FALSE;
+ gboolean interrupted = FALSE;
- mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] worker parking", mono_native_thread_id_get ());
+ mono_trace (G_LOG_LEVEL_INFO, MONO_TRACE_THREADPOOL, "[%p] worker parking", mono_native_thread_id_get ());
- mono_coop_mutex_lock (&worker.threads_lock);
+ mono_coop_mutex_lock (&worker.parked_threads_lock);
if (!mono_runtime_is_shutting_down ()) {
static gpointer rand_handle = NULL;
MonoInternalThread *thread;
- gboolean interrupted = FALSE;
ThreadPoolWorkerCounter counter;
if (!rand_handle)
worker.parked_threads_count += 1;
- mono_refcount_inc (&worker);
mono_thread_info_install_interrupt (worker_wait_interrupt, NULL, &interrupted);
- if (interrupted) {
- mono_refcount_dec (&worker);
+ if (interrupted)
goto done;
- }
- if (mono_coop_cond_timedwait (&worker.parked_threads_cond, &worker.threads_lock, rand_next (&rand_handle, 5 * 1000, 60 * 1000)) != 0)
+ if (mono_coop_cond_timedwait (&worker.parked_threads_cond, &worker.parked_threads_lock, rand_next (&rand_handle, 5 * 1000, 60 * 1000)) != 0)
timeout = TRUE;
mono_thread_info_uninstall_interrupt (&interrupted);
- if (!interrupted)
- mono_refcount_dec (&worker);
done:
worker.parked_threads_count -= 1;
});
}
- mono_coop_mutex_unlock (&worker.threads_lock);
+ mono_coop_mutex_unlock (&worker.parked_threads_lock);
- mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] worker unparking, timeout? %s", mono_native_thread_id_get (), timeout ? "yes" : "no");
+ mono_trace (G_LOG_LEVEL_INFO, MONO_TRACE_THREADPOOL, "[%p] worker unparking, timeout? %s interrupted? %s",
+ mono_native_thread_id_get (), timeout ? "yes" : "no", interrupted ? "yes" : "no");
return timeout;
}
mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try unpark worker", mono_native_thread_id_get ());
- mono_coop_mutex_lock (&worker.threads_lock);
+ mono_coop_mutex_lock (&worker.parked_threads_lock);
if (worker.parked_threads_count > 0) {
mono_coop_cond_signal (&worker.parked_threads_cond);
res = TRUE;
}
- mono_coop_mutex_unlock (&worker.threads_lock);
+ mono_coop_mutex_unlock (&worker.parked_threads_lock);
mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try unpark worker, success? %s", mono_native_thread_id_get (), res ? "yes" : "no");
mono_trace (G_LOG_LEVEL_INFO, MONO_TRACE_THREADPOOL, "[%p] worker starting", mono_native_thread_id_get ());
+ if (!mono_refcount_tryinc (&worker))
+ return 0;
+
COUNTER_ATOMIC (counter, {
counter._.starting --;
counter._.working ++;
thread = mono_thread_internal_current ();
g_assert (thread);
- mono_coop_mutex_lock (&worker.threads_lock);
- g_ptr_array_add (worker.threads, thread);
- mono_coop_mutex_unlock (&worker.threads_lock);
-
while (!mono_runtime_is_shutting_down ()) {
ThreadPoolWorkItem work_item;
work_item.callback (work_item.data);
}
- mono_coop_mutex_lock (&worker.threads_lock);
-
COUNTER_ATOMIC (counter, {
counter._.working --;
});
- g_ptr_array_remove (worker.threads, thread);
-
- mono_coop_cond_signal (&worker.threads_exit_cond);
-
- mono_coop_mutex_unlock (&worker.threads_lock);
-
mono_trace (G_LOG_LEVEL_INFO, MONO_TRACE_THREADPOOL, "[%p] worker finishing", mono_native_thread_id_get ());
mono_refcount_dec (&worker);
counter._.starting ++;
});
- mono_refcount_inc (&worker);
thread = mono_thread_create_internal (mono_get_root_domain (), worker_thread, NULL, TRUE, 0, &error);
if (!thread) {
mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try create worker, failed: could not create thread due to %s", mono_native_thread_id_get (), mono_error_get_message (&error));
mono_coop_mutex_unlock (&worker.worker_creation_lock);
- mono_refcount_dec (&worker);
-
return FALSE;
}
MonoInternalThread *internal;
guint i;
+ if (!mono_refcount_tryinc (&worker))
+ return 0;
+
internal = mono_thread_internal_current ();
g_assert (internal);
mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] monitor thread, finished", mono_native_thread_id_get ());
+ mono_refcount_dec (&worker);
return 0;
}
// printf ("monitor_thread: creating failed\n");
worker.monitor_status = MONITOR_STATUS_NOT_RUNNING;
mono_error_cleanup (&error);
+ mono_refcount_dec (&worker);
}
return;
}
hc = &worker.heuristic_hill_climbing;
- mono_trace (G_LOG_LEVEL_INFO, MONO_TRACE_THREADPOOL, "[%p] hill climbing, change max number of threads %d", mono_native_thread_id_get (), new_thread_count);
+ mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] hill climbing, change max number of threads %d", mono_native_thread_id_get (), new_thread_count);
hc->last_thread_count = new_thread_count;
hc->current_sample_interval = rand_next (&hc->random_interval_generator, hc->sample_interval_low, hc->sample_interval_high);
gint32
mono_threadpool_worker_get_min (void)
{
- return worker.limit_worker_min;
+ gint32 ret;
+
+ if (!mono_refcount_tryinc (&worker))
+ return 0;
+
+ ret = worker.limit_worker_min;
+
+ mono_refcount_dec (&worker);
+ return ret;
}
gboolean
if (value <= 0 || value > worker.limit_worker_max)
return FALSE;
+ if (!mono_refcount_tryinc (&worker))
+ return FALSE;
+
worker.limit_worker_min = value;
+
+ mono_refcount_dec (&worker);
return TRUE;
}
gint32
mono_threadpool_worker_get_max (void)
{
- return worker.limit_worker_max;
+ gint32 ret;
+
+ if (!mono_refcount_tryinc (&worker))
+ return 0;
+
+ ret = worker.limit_worker_max;
+
+ mono_refcount_dec (&worker);
+ return ret;
}
gboolean
if (value < worker.limit_worker_min || value < cpu_count)
return FALSE;
- if (value < worker.limit_worker_min || value < cpu_count)
+ if (!mono_refcount_tryinc (&worker))
return FALSE;
worker.limit_worker_max = value;
+
+ mono_refcount_dec (&worker);
return TRUE;
}
void
mono_threadpool_worker_set_suspended (gboolean suspended)
{
+ if (!mono_refcount_tryinc (&worker))
+ return;
+
worker.suspended = suspended;
if (!suspended)
worker_request ();
+
+ mono_refcount_dec (&worker);
}