[threadpool] Let the runtime abort and wait for threads on shutdown (#4348)
[mono.git] / mono / metadata / threadpool-worker-default.c
index a0cdd9a2170229d1d83e5777dd0187d36073e6f0..5fc716a74e73e87999bc0e124fdb9a19b64f1c9f 100644 (file)
@@ -124,18 +124,14 @@ typedef union {
        gint64 as_gint64;
 } ThreadPoolWorkerCounter;
 
-typedef MonoInternalThread ThreadPoolWorkerThread;
-
 typedef struct {
        MonoRefCount ref;
 
        ThreadPoolWorkerCounter counters;
 
-       GPtrArray *threads; // ThreadPoolWorkerThread* []
-       MonoCoopMutex threads_lock; /* protect access to working_threads and parked_threads */
+       MonoCoopMutex parked_threads_lock;
        gint32 parked_threads_count;
        MonoCoopCond parked_threads_cond;
-       MonoCoopCond threads_exit_cond;
 
        ThreadPoolWorkItem *work_items; // ThreadPoolWorkItem []
        gint32 work_items_count;
@@ -221,8 +217,7 @@ rand_next (gpointer *handle, guint32 min, guint32 max)
 static void
 destroy (gpointer data)
 {
-#if 0
-       mono_coop_mutex_destroy (&worker.threads_lock);
+       mono_coop_mutex_destroy (&worker.parked_threads_lock);
        mono_coop_cond_destroy (&worker.parked_threads_cond);
 
        mono_coop_mutex_destroy (&worker.work_items_lock);
@@ -232,7 +227,6 @@ destroy (gpointer data)
        mono_coop_mutex_destroy (&worker.heuristic_lock);
 
        g_free (worker.cpu_usage_state);
-#endif
 }
 
 void
@@ -245,11 +239,9 @@ mono_threadpool_worker_init (void)
 
        mono_refcount_init (&worker, destroy);
 
-       worker.threads = g_ptr_array_new ();
-       mono_coop_mutex_init (&worker.threads_lock);
+       mono_coop_mutex_init (&worker.parked_threads_lock);
        worker.parked_threads_count = 0;
        mono_coop_cond_init (&worker.parked_threads_cond);
-       mono_coop_cond_init (&worker.threads_exit_cond);
 
        /* worker.work_items_size is inited to 0 */
        mono_coop_mutex_init (&worker.work_items_lock);
@@ -316,43 +308,6 @@ mono_threadpool_worker_init (void)
 void
 mono_threadpool_worker_cleanup (void)
 {
-       MonoInternalThread *current;
-
-       /* we make the assumption along the code that we are
-        * cleaning up only if the runtime is shutting down */
-       g_assert (mono_runtime_is_shutting_down ());
-
-       current = mono_thread_internal_current ();
-
-       while (worker.monitor_status != MONITOR_STATUS_NOT_RUNNING)
-               mono_thread_info_sleep (1, NULL);
-
-       mono_coop_mutex_lock (&worker.threads_lock);
-
-       /* unpark all worker.parked_threads */
-       mono_coop_cond_broadcast (&worker.parked_threads_cond);
-
-#if 0
-       for (;;) {
-               ThreadPoolWorkerCounter counter;
-
-               counter = COUNTER_READ ();
-               if (counter._.starting + counter._.working + counter._.parked == 0)
-                       break;
-
-               if (counter._.starting + counter._.working + counter._.parked == 1) {
-                       if (worker.threads->len == 1 && g_ptr_array_index (worker.threads, 0) == current) {
-                               /* We are waiting on ourselves */
-                               break;
-                       }
-               }
-
-               mono_coop_cond_wait (&worker.threads_exit_cond, &worker.threads_lock);
-       }
-#endif
-
-       mono_coop_mutex_unlock (&worker.threads_lock);
-
        mono_refcount_dec (&worker);
 }
 
@@ -441,17 +396,33 @@ static void worker_request (void);
 void
 mono_threadpool_worker_enqueue (MonoThreadPoolWorkerCallback callback, gpointer data)
 {
+       if (!mono_refcount_tryinc (&worker))
+               return;
+
        work_item_push (callback, data);
 
        worker_request ();
+
+       mono_refcount_dec (&worker);
 }
 
 static void
 worker_wait_interrupt (gpointer unused)
 {
-       mono_coop_mutex_lock (&worker.threads_lock);
-       mono_coop_cond_signal (&worker.parked_threads_cond);
-       mono_coop_mutex_unlock (&worker.threads_lock);
+       /* If the runtime is not shutting down, we are not using this mechanism to wake up a unparked thread, and if the
+        * runtime is shutting down, then we need to wake up ALL the threads.
+        * It might be a bit wasteful, but I witnessed shutdown hang where the main thread would abort and then wait for all
+        * background threads to exit (see mono_thread_manage). This would go wrong because not all threadpool threads would
+        * be unparked. It would end up getting unstucked because of the timeout, but that would delay shutdown by 5-60s. */
+       if (!mono_runtime_is_shutting_down ())
+               return;
+
+       if (!mono_refcount_tryinc (&worker))
+               return;
+
+       mono_coop_mutex_lock (&worker.parked_threads_lock);
+       mono_coop_cond_broadcast (&worker.parked_threads_cond);
+       mono_coop_mutex_unlock (&worker.parked_threads_lock);
 
        mono_refcount_dec (&worker);
 }
@@ -461,15 +432,15 @@ static gboolean
 worker_park (void)
 {
        gboolean timeout = FALSE;
+       gboolean interrupted = FALSE;
 
-       mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] worker parking", mono_native_thread_id_get ());
+       mono_trace (G_LOG_LEVEL_INFO, MONO_TRACE_THREADPOOL, "[%p] worker parking", mono_native_thread_id_get ());
 
-       mono_coop_mutex_lock (&worker.threads_lock);
+       mono_coop_mutex_lock (&worker.parked_threads_lock);
 
        if (!mono_runtime_is_shutting_down ()) {
                static gpointer rand_handle = NULL;
                MonoInternalThread *thread;
-               gboolean interrupted = FALSE;
                ThreadPoolWorkerCounter counter;
 
                if (!rand_handle)
@@ -486,19 +457,14 @@ worker_park (void)
 
                worker.parked_threads_count += 1;
 
-               mono_refcount_inc (&worker);
                mono_thread_info_install_interrupt (worker_wait_interrupt, NULL, &interrupted);
-               if (interrupted) {
-                       mono_refcount_dec (&worker);
+               if (interrupted)
                        goto done;
-               }
 
-               if (mono_coop_cond_timedwait (&worker.parked_threads_cond, &worker.threads_lock, rand_next (&rand_handle, 5 * 1000, 60 * 1000)) != 0)
+               if (mono_coop_cond_timedwait (&worker.parked_threads_cond, &worker.parked_threads_lock, rand_next (&rand_handle, 5 * 1000, 60 * 1000)) != 0)
                        timeout = TRUE;
 
                mono_thread_info_uninstall_interrupt (&interrupted);
-               if (!interrupted)
-                       mono_refcount_dec (&worker);
 
 done:
                worker.parked_threads_count -= 1;
@@ -509,9 +475,10 @@ done:
                });
        }
 
-       mono_coop_mutex_unlock (&worker.threads_lock);
+       mono_coop_mutex_unlock (&worker.parked_threads_lock);
 
-       mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] worker unparking, timeout? %s", mono_native_thread_id_get (), timeout ? "yes" : "no");
+       mono_trace (G_LOG_LEVEL_INFO, MONO_TRACE_THREADPOOL, "[%p] worker unparking, timeout? %s interrupted? %s",
+               mono_native_thread_id_get (), timeout ? "yes" : "no", interrupted ? "yes" : "no");
 
        return timeout;
 }
@@ -523,12 +490,12 @@ worker_try_unpark (void)
 
        mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try unpark worker", mono_native_thread_id_get ());
 
-       mono_coop_mutex_lock (&worker.threads_lock);
+       mono_coop_mutex_lock (&worker.parked_threads_lock);
        if (worker.parked_threads_count > 0) {
                mono_coop_cond_signal (&worker.parked_threads_cond);
                res = TRUE;
        }
-       mono_coop_mutex_unlock (&worker.threads_lock);
+       mono_coop_mutex_unlock (&worker.parked_threads_lock);
 
        mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try unpark worker, success? %s", mono_native_thread_id_get (), res ? "yes" : "no");
 
@@ -543,6 +510,9 @@ worker_thread (gpointer unused)
 
        mono_trace (G_LOG_LEVEL_INFO, MONO_TRACE_THREADPOOL, "[%p] worker starting", mono_native_thread_id_get ());
 
+       if (!mono_refcount_tryinc (&worker))
+               return 0;
+
        COUNTER_ATOMIC (counter, {
                counter._.starting --;
                counter._.working ++;
@@ -551,10 +521,6 @@ worker_thread (gpointer unused)
        thread = mono_thread_internal_current ();
        g_assert (thread);
 
-       mono_coop_mutex_lock (&worker.threads_lock);
-       g_ptr_array_add (worker.threads, thread);
-       mono_coop_mutex_unlock (&worker.threads_lock);
-
        while (!mono_runtime_is_shutting_down ()) {
                ThreadPoolWorkItem work_item;
 
@@ -577,18 +543,10 @@ worker_thread (gpointer unused)
                work_item.callback (work_item.data);
        }
 
-       mono_coop_mutex_lock (&worker.threads_lock);
-
        COUNTER_ATOMIC (counter, {
                counter._.working --;
        });
 
-       g_ptr_array_remove (worker.threads, thread);
-
-       mono_coop_cond_signal (&worker.threads_exit_cond);
-
-       mono_coop_mutex_unlock (&worker.threads_lock);
-
        mono_trace (G_LOG_LEVEL_INFO, MONO_TRACE_THREADPOOL, "[%p] worker finishing", mono_native_thread_id_get ());
 
        mono_refcount_dec (&worker);
@@ -641,7 +599,6 @@ worker_try_create (void)
                counter._.starting ++;
        });
 
-       mono_refcount_inc (&worker);
        thread = mono_thread_create_internal (mono_get_root_domain (), worker_thread, NULL, TRUE, 0, &error);
        if (!thread) {
                mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try create worker, failed: could not create thread due to %s", mono_native_thread_id_get (), mono_error_get_message (&error));
@@ -653,8 +610,6 @@ worker_try_create (void)
 
                mono_coop_mutex_unlock (&worker.worker_creation_lock);
 
-               mono_refcount_dec (&worker);
-
                return FALSE;
        }
 
@@ -752,6 +707,9 @@ monitor_thread (gpointer unused)
        MonoInternalThread *internal;
        guint i;
 
+       if (!mono_refcount_tryinc (&worker))
+               return 0;
+
        internal = mono_thread_internal_current ();
        g_assert (internal);
 
@@ -838,6 +796,7 @@ monitor_thread (gpointer unused)
 
        mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] monitor thread, finished", mono_native_thread_id_get ());
 
+       mono_refcount_dec (&worker);
        return 0;
 }
 
@@ -864,6 +823,7 @@ monitor_ensure_running (void)
                                        // printf ("monitor_thread: creating failed\n");
                                        worker.monitor_status = MONITOR_STATUS_NOT_RUNNING;
                                        mono_error_cleanup (&error);
+                                       mono_refcount_dec (&worker);
                                }
                                return;
                        }
@@ -880,7 +840,7 @@ hill_climbing_change_thread_count (gint16 new_thread_count, ThreadPoolHeuristicS
 
        hc = &worker.heuristic_hill_climbing;
 
-       mono_trace (G_LOG_LEVEL_INFO, MONO_TRACE_THREADPOOL, "[%p] hill climbing, change max number of threads %d", mono_native_thread_id_get (), new_thread_count);
+       mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] hill climbing, change max number of threads %d", mono_native_thread_id_get (), new_thread_count);
 
        hc->last_thread_count = new_thread_count;
        hc->current_sample_interval = rand_next (&hc->random_interval_generator, hc->sample_interval_low, hc->sample_interval_high);
@@ -1204,7 +1164,15 @@ mono_threadpool_worker_notify_completed (void)
 gint32
 mono_threadpool_worker_get_min (void)
 {
-       return worker.limit_worker_min;
+       gint32 ret;
+
+       if (!mono_refcount_tryinc (&worker))
+               return 0;
+
+       ret = worker.limit_worker_min;
+
+       mono_refcount_dec (&worker);
+       return ret;
 }
 
 gboolean
@@ -1213,14 +1181,27 @@ mono_threadpool_worker_set_min (gint32 value)
        if (value <= 0 || value > worker.limit_worker_max)
                return FALSE;
 
+       if (!mono_refcount_tryinc (&worker))
+               return FALSE;
+
        worker.limit_worker_min = value;
+
+       mono_refcount_dec (&worker);
        return TRUE;
 }
 
 gint32
 mono_threadpool_worker_get_max (void)
 {
-       return worker.limit_worker_max;
+       gint32 ret;
+
+       if (!mono_refcount_tryinc (&worker))
+               return 0;
+
+       ret = worker.limit_worker_max;
+
+       mono_refcount_dec (&worker);
+       return ret;
 }
 
 gboolean
@@ -1232,17 +1213,24 @@ mono_threadpool_worker_set_max (gint32 value)
        if (value < worker.limit_worker_min || value < cpu_count)
                return FALSE;
 
-       if (value < worker.limit_worker_min || value < cpu_count)
+       if (!mono_refcount_tryinc (&worker))
                return FALSE;
 
        worker.limit_worker_max = value;
+
+       mono_refcount_dec (&worker);
        return TRUE;
 }
 
 void
 mono_threadpool_worker_set_suspended (gboolean suspended)
 {
+       if (!mono_refcount_tryinc (&worker))
+               return;
+
        worker.suspended = suspended;
        if (!suspended)
                worker_request ();
+
+       mono_refcount_dec (&worker);
 }