[threadpool] Make sure we check if the runtime is shutting down in case of interruption
[mono.git] / mono / metadata / threadpool.c
index ab88e67f4b960c6de4e6d2e52e3f38edd3cd7b29..7f7c31f0631ab5d5bc279212b98d939f369ce43b 100644 (file)
@@ -53,7 +53,7 @@ typedef struct {
        /* Number of currently executing jobs */
        gint32 threadpool_jobs;
        /* Signalled when threadpool_jobs + outstanding_request is 0 */
-       /* Protected by threadpool->domains_lock */
+       /* Protected by threadpool.domains_lock */
        MonoCoopCond cleanup_cond;
 } ThreadPoolDomain;
 
@@ -79,86 +79,76 @@ typedef struct {
 
        gint32 limit_io_min;
        gint32 limit_io_max;
-
-       MonoThreadPoolWorker *worker;
 } ThreadPool;
 
 static mono_lazy_init_t status = MONO_LAZY_INIT_STATUS_NOT_INITIALIZED;
 
-static ThreadPool* threadpool;
+static ThreadPool threadpool;
 
-#define COUNTER_CHECK(counter) \
-       do { \
-               g_assert (sizeof (ThreadPoolCounter) == sizeof (gint32)); \
-               g_assert (counter._.starting >= 0); \
-               g_assert (counter._.working >= 0); \
-       } while (0)
-
-#define COUNTER_ATOMIC(threadpool,var,block) \
+#define COUNTER_ATOMIC(var,block) \
        do { \
                ThreadPoolCounter __old; \
                do { \
-                       g_assert (threadpool); \
-                       __old = COUNTER_READ (threadpool); \
-                       (var) = __old; \
+                       (var) = __old = COUNTER_READ (); \
                        { block; } \
-                       COUNTER_CHECK (var); \
-               } while (InterlockedCompareExchange (&threadpool->counters.as_gint32, (var).as_gint32, __old.as_gint32) != __old.as_gint32); \
+                       if (!(counter._.starting >= 0)) \
+                               g_error ("%s: counter._.starting = %d, but should be >= 0", __func__, counter._.starting); \
+                       if (!(counter._.working >= 0)) \
+                               g_error ("%s: counter._.working = %d, but should be >= 0", __func__, counter._.working); \
+               } while (InterlockedCompareExchange (&threadpool.counters.as_gint32, (var).as_gint32, __old.as_gint32) != __old.as_gint32); \
        } while (0)
 
 static inline ThreadPoolCounter
-COUNTER_READ (ThreadPool *threadpool)
+COUNTER_READ (void)
 {
        ThreadPoolCounter counter;
-       counter.as_gint32 = InterlockedRead (&threadpool->counters.as_gint32);
+       counter.as_gint32 = InterlockedRead (&threadpool.counters.as_gint32);
        return counter;
 }
 
 static inline void
 domains_lock (void)
 {
-       mono_coop_mutex_lock (&threadpool->domains_lock);
+       mono_coop_mutex_lock (&threadpool.domains_lock);
 }
 
 static inline void
 domains_unlock (void)
 {
-       mono_coop_mutex_unlock (&threadpool->domains_lock);
+       mono_coop_mutex_unlock (&threadpool.domains_lock);
 }
 
 static void
 destroy (gpointer unused)
 {
-       g_ptr_array_free (threadpool->domains, TRUE);
-       mono_coop_mutex_destroy (&threadpool->domains_lock);
-
-       g_ptr_array_free (threadpool->threads, TRUE);
-       mono_coop_mutex_destroy (&threadpool->threads_lock);
-       mono_coop_cond_destroy (&threadpool->threads_exit_cond);
+#if 0
+       g_ptr_array_free (threadpool.domains, TRUE);
+       mono_coop_mutex_destroy (&threadpool.domains_lock);
 
-       g_free (threadpool);
+       g_ptr_array_free (threadpool.threads, TRUE);
+       mono_coop_mutex_destroy (&threadpool.threads_lock);
+       mono_coop_cond_destroy (&threadpool.threads_exit_cond);
+#endif
 }
 
 static void
 initialize (void)
 {
-       g_assert (!threadpool);
-       threadpool = g_new0 (ThreadPool, 1);
-       g_assert (threadpool);
+       g_assert (sizeof (ThreadPoolCounter) == sizeof (gint32));
 
-       mono_refcount_init (threadpool, destroy);
+       mono_refcount_init (&threadpool, destroy);
 
-       threadpool->domains = g_ptr_array_new ();
-       mono_coop_mutex_init (&threadpool->domains_lock);
+       threadpool.domains = g_ptr_array_new ();
+       mono_coop_mutex_init (&threadpool.domains_lock);
 
-       threadpool->threads = g_ptr_array_new ();
-       mono_coop_mutex_init (&threadpool->threads_lock);
-       mono_coop_cond_init (&threadpool->threads_exit_cond);
+       threadpool.threads = g_ptr_array_new ();
+       mono_coop_mutex_init (&threadpool.threads_lock);
+       mono_coop_cond_init (&threadpool.threads_exit_cond);
 
-       threadpool->limit_io_min = mono_cpu_count ();
-       threadpool->limit_io_max = CLAMP (threadpool->limit_io_min * 100, MIN (threadpool->limit_io_min, 200), MAX (threadpool->limit_io_min, 200));
+       threadpool.limit_io_min = mono_cpu_count ();
+       threadpool.limit_io_max = CLAMP (threadpool.limit_io_min * 100, MIN (threadpool.limit_io_min, 200), MAX (threadpool.limit_io_min, 200));
 
-       mono_threadpool_worker_init (&threadpool->worker);
+       mono_threadpool_worker_init ();
 }
 
 static void
@@ -173,44 +163,41 @@ cleanup (void)
 
        current = mono_thread_internal_current ();
 
-       mono_coop_mutex_lock (&threadpool->threads_lock);
+       mono_coop_mutex_lock (&threadpool.threads_lock);
 
-       /* stop all threadpool->threads */
-       for (i = 0; i < threadpool->threads->len; ++i) {
-               MonoInternalThread *thread = (MonoInternalThread*) g_ptr_array_index (threadpool->threads, i);
+       /* stop all threadpool.threads */
+       for (i = 0; i < threadpool.threads->len; ++i) {
+               MonoInternalThread *thread = (MonoInternalThread*) g_ptr_array_index (threadpool.threads, i);
                if (thread != current)
                        mono_thread_internal_abort (thread);
        }
 
-       mono_coop_mutex_unlock (&threadpool->threads_lock);
+       mono_coop_mutex_unlock (&threadpool.threads_lock);
 
+#if 0
        /* give a chance to the other threads to exit */
        mono_thread_info_yield ();
 
-       mono_coop_mutex_lock (&threadpool->threads_lock);
+       mono_coop_mutex_lock (&threadpool.threads_lock);
 
        for (;;) {
-               ThreadPoolCounter counter;
-
-               counter = COUNTER_READ (threadpool);
-               if (counter._.working == 0)
+               if (threadpool.threads->len == 0)
                        break;
 
-               if (counter._.working == 1) {
-                       if (threadpool->threads->len == 1 && g_ptr_array_index (threadpool->threads, 0) == current) {
-                               /* We are waiting on ourselves */
-                               break;
-                       }
+               if (threadpool.threads->len == 1 && g_ptr_array_index (threadpool.threads, 0) == current) {
+                       /* We are waiting on ourselves */
+                       break;
                }
 
-               mono_coop_cond_wait (&threadpool->threads_exit_cond, &threadpool->threads_lock);
+               mono_coop_cond_wait (&threadpool.threads_exit_cond, &threadpool.threads_lock);
        }
 
-       mono_coop_mutex_unlock (&threadpool->threads_lock);
+       mono_coop_mutex_unlock (&threadpool.threads_lock);
+#endif
 
-       mono_threadpool_worker_cleanup (threadpool->worker);
+       mono_threadpool_worker_cleanup ();
 
-       mono_refcount_dec (threadpool);
+       mono_refcount_dec (&threadpool);
 }
 
 gboolean
@@ -264,14 +251,14 @@ tpdomain_add (ThreadPoolDomain *tpdomain)
 
        g_assert (tpdomain);
 
-       len = threadpool->domains->len;
+       len = threadpool.domains->len;
        for (i = 0; i < len; ++i) {
-               if (g_ptr_array_index (threadpool->domains, i) == tpdomain)
+               if (g_ptr_array_index (threadpool.domains, i) == tpdomain)
                        break;
        }
 
        if (i == len)
-               g_ptr_array_add (threadpool->domains, tpdomain);
+               g_ptr_array_add (threadpool.domains, tpdomain);
 }
 
 /* LOCKING: domains_lock must be held. */
@@ -279,7 +266,7 @@ static gboolean
 tpdomain_remove (ThreadPoolDomain *tpdomain)
 {
        g_assert (tpdomain);
-       return g_ptr_array_remove (threadpool->domains, tpdomain);
+       return g_ptr_array_remove (threadpool.domains, tpdomain);
 }
 
 /* LOCKING: domains_lock must be held */
@@ -291,10 +278,10 @@ tpdomain_get (MonoDomain *domain, gboolean create)
 
        g_assert (domain);
 
-       for (i = 0; i < threadpool->domains->len; ++i) {
+       for (i = 0; i < threadpool.domains->len; ++i) {
                ThreadPoolDomain *tpdomain;
 
-               tpdomain = (ThreadPoolDomain *)g_ptr_array_index (threadpool->domains, i);
+               tpdomain = (ThreadPoolDomain *)g_ptr_array_index (threadpool.domains, i);
                if (tpdomain->domain == domain)
                        return tpdomain;
        }
@@ -322,22 +309,21 @@ static ThreadPoolDomain *
 tpdomain_get_next (ThreadPoolDomain *current)
 {
        ThreadPoolDomain *tpdomain = NULL;
-       guint len;
+       gint len;
 
-       len = threadpool->domains->len;
+       len = threadpool.domains->len;
        if (len > 0) {
-               guint i, current_idx = -1;
+               gint i, current_idx = -1;
                if (current) {
                        for (i = 0; i < len; ++i) {
-                               if (current == g_ptr_array_index (threadpool->domains, i)) {
+                               if (current == g_ptr_array_index (threadpool.domains, i)) {
                                        current_idx = i;
                                        break;
                                }
                        }
-                       g_assert (current_idx != (guint)-1);
                }
                for (i = current_idx + 1; i < len + current_idx + 1; ++i) {
-                       ThreadPoolDomain *tmp = (ThreadPoolDomain *)g_ptr_array_index (threadpool->domains, i % len);
+                       ThreadPoolDomain *tmp = (ThreadPoolDomain *)g_ptr_array_index (threadpool.domains, i % len);
                        if (tmp->outstanding_request > 0) {
                                tpdomain = tmp;
                                break;
@@ -348,6 +334,15 @@ tpdomain_get_next (ThreadPoolDomain *current)
        return tpdomain;
 }
 
+static MonoObject*
+try_invoke_perform_wait_callback (MonoObject** exc, MonoError *error)
+{
+       HANDLE_FUNCTION_ENTER ();
+       mono_error_init (error);
+       MonoObject *res = mono_runtime_try_invoke (mono_defaults.threadpool_perform_wait_callback_method, NULL, NULL, exc, error);
+       HANDLE_FUNCTION_RETURN_VAL (res);
+}
+
 static void
 worker_callback (gpointer unused)
 {
@@ -358,23 +353,26 @@ worker_callback (gpointer unused)
 
        thread = mono_thread_internal_current ();
 
-       COUNTER_ATOMIC (threadpool, counter, {
+       COUNTER_ATOMIC (counter, {
+               if (!(counter._.working < 32767 /* G_MAXINT16 */))
+                       g_error ("%s: counter._.working = %d, but should be < 32767", __func__, counter._.working);
+
                counter._.starting --;
                counter._.working ++;
        });
 
        if (mono_runtime_is_shutting_down ()) {
-               COUNTER_ATOMIC (threadpool, counter, {
+               COUNTER_ATOMIC (counter, {
                        counter._.working --;
                });
 
-               mono_refcount_dec (threadpool);
+               mono_refcount_dec (&threadpool);
                return;
        }
 
-       mono_coop_mutex_lock (&threadpool->threads_lock);
-       g_ptr_array_add (threadpool->threads, thread);
-       mono_coop_mutex_unlock (&threadpool->threads_lock);
+       mono_coop_mutex_lock (&threadpool.threads_lock);
+       g_ptr_array_add (threadpool.threads, thread);
+       mono_coop_mutex_unlock (&threadpool.threads_lock);
 
        /*
         * This is needed so there is always an lmf frame in the runtime invoke call below,
@@ -389,9 +387,12 @@ worker_callback (gpointer unused)
        while (!mono_runtime_is_shutting_down ()) {
                gboolean retire = FALSE;
 
-               if ((thread->state & (ThreadState_AbortRequested | ThreadState_SuspendRequested)) != 0) {
+               if (thread->state & (ThreadState_AbortRequested | ThreadState_SuspendRequested)) {
                        domains_unlock ();
-                       mono_thread_interruption_checkpoint ();
+                       if (mono_thread_interruption_checkpoint ()) {
+                               domains_lock ();
+                               continue;
+                       }
                        domains_lock ();
                }
 
@@ -410,6 +411,9 @@ worker_callback (gpointer unused)
 
                domains_unlock ();
 
+               mono_thread_set_name_internal (thread, mono_string_new (mono_get_root_domain (), "Threadpool worker"), FALSE, TRUE, &error);
+               mono_error_assert_ok (&error);
+
                mono_thread_clr_state (thread, (MonoThreadState)~ThreadState_Background);
                if (!mono_thread_test_state (thread , ThreadState_Background))
                        ves_icall_System_Threading_Thread_SetState (thread, ThreadState_Background);
@@ -418,7 +422,7 @@ worker_callback (gpointer unused)
                if (mono_domain_set (tpdomain->domain, FALSE)) {
                        MonoObject *exc = NULL, *res;
 
-                       res = mono_runtime_try_invoke (mono_defaults.threadpool_perform_wait_callback_method, NULL, NULL, &exc, &error);
+                       res = try_invoke_perform_wait_callback (&exc, &error);
                        if (exc || !mono_error_ok(&error)) {
                                if (exc == NULL)
                                        exc = (MonoObject *) mono_error_convert_to_exception (&error);
@@ -456,19 +460,19 @@ worker_callback (gpointer unused)
 
        domains_unlock ();
 
-       mono_coop_mutex_lock (&threadpool->threads_lock);
+       mono_coop_mutex_lock (&threadpool.threads_lock);
 
-       COUNTER_ATOMIC (threadpool, counter, {
-               counter._.working --;
-       });
+       g_ptr_array_remove_fast (threadpool.threads, thread);
 
-       g_ptr_array_remove_fast (threadpool->threads, thread);
+       mono_coop_cond_signal (&threadpool.threads_exit_cond);
 
-       mono_coop_cond_signal (&threadpool->threads_exit_cond);
+       mono_coop_mutex_unlock (&threadpool.threads_lock);
 
-       mono_coop_mutex_unlock (&threadpool->threads_lock);
+       COUNTER_ATOMIC (counter, {
+               counter._.working --;
+       });
 
-       mono_refcount_dec (threadpool);
+       mono_refcount_dec (&threadpool);
 }
 
 void
@@ -556,7 +560,7 @@ mono_threadpool_end_invoke (MonoAsyncResult *ares, MonoArray **out_args, MonoObj
                        g_assert(wait_event);
                        MonoWaitHandle *wait_handle = mono_wait_handle_new (mono_object_domain (ares), wait_event, error);
                        if (!is_ok (error)) {
-                               CloseHandle (wait_event);
+                               mono_w32event_close (wait_event);
                                return NULL;
                        }
                        MONO_OBJECT_SETREF (ares, handle, (MonoObject*) wait_handle);
@@ -607,12 +611,18 @@ mono_threadpool_remove_domain_jobs (MonoDomain *domain, int timeout)
         * The is_unloading () check in worker_request () ensures that
         * no new jobs are added after we enter the lock below.
         */
-       mono_lazy_initialize (&status, initialize);
+
+       if (!mono_lazy_is_initialized (&status))
+               return TRUE;
+
+       mono_refcount_inc (&threadpool);
+
        domains_lock ();
 
        tpdomain = tpdomain_get (domain, FALSE);
        if (!tpdomain) {
                domains_unlock ();
+               mono_refcount_dec (&threadpool);
                return TRUE;
        }
 
@@ -620,7 +630,7 @@ mono_threadpool_remove_domain_jobs (MonoDomain *domain, int timeout)
 
        while (tpdomain->outstanding_request + tpdomain->threadpool_jobs > 0) {
                if (timeout == -1) {
-                       mono_coop_cond_wait (&tpdomain->cleanup_cond, &threadpool->domains_lock);
+                       mono_coop_cond_wait (&tpdomain->cleanup_cond, &threadpool.domains_lock);
                } else {
                        gint64 now;
                        gint res;
@@ -631,7 +641,7 @@ mono_threadpool_remove_domain_jobs (MonoDomain *domain, int timeout)
                                break;
                        }
 
-                       res = mono_coop_cond_timedwait (&tpdomain->cleanup_cond, &threadpool->domains_lock, end - now);
+                       res = mono_coop_cond_timedwait (&tpdomain->cleanup_cond, &threadpool.domains_lock, end - now);
                        if (res != 0) {
                                ret = FALSE;
                                break;
@@ -647,21 +657,23 @@ mono_threadpool_remove_domain_jobs (MonoDomain *domain, int timeout)
        mono_coop_cond_destroy (&tpdomain->cleanup_cond);
        tpdomain_free (tpdomain);
 
+       mono_refcount_dec (&threadpool);
+
        return ret;
 }
 
 void
 mono_threadpool_suspend (void)
 {
-       if (threadpool)
-               mono_threadpool_worker_set_suspended (threadpool->worker, TRUE);
+       if (mono_lazy_is_initialized (&status))
+               mono_threadpool_worker_set_suspended (TRUE);
 }
 
 void
 mono_threadpool_resume (void)
 {
-       if (threadpool)
-               mono_threadpool_worker_set_suspended (threadpool->worker, FALSE);
+       if (mono_lazy_is_initialized (&status))
+               mono_threadpool_worker_set_suspended (FALSE);
 }
 
 void
@@ -674,10 +686,10 @@ ves_icall_System_Threading_ThreadPool_GetAvailableThreadsNative (gint32 *worker_
 
        mono_lazy_initialize (&status, initialize);
 
-       counter = COUNTER_READ (threadpool);
+       counter = COUNTER_READ ();
 
-       *worker_threads = MAX (0, mono_threadpool_worker_get_max (threadpool->worker) - counter._.working);
-       *completion_port_threads = threadpool->limit_io_max;
+       *worker_threads = MAX (0, mono_threadpool_worker_get_max () - counter._.working);
+       *completion_port_threads = threadpool.limit_io_max;
 }
 
 void
@@ -688,8 +700,8 @@ ves_icall_System_Threading_ThreadPool_GetMinThreadsNative (gint32 *worker_thread
 
        mono_lazy_initialize (&status, initialize);
 
-       *worker_threads = mono_threadpool_worker_get_min (threadpool->worker);
-       *completion_port_threads = threadpool->limit_io_min;
+       *worker_threads = mono_threadpool_worker_get_min ();
+       *completion_port_threads = threadpool.limit_io_min;
 }
 
 void
@@ -700,8 +712,8 @@ ves_icall_System_Threading_ThreadPool_GetMaxThreadsNative (gint32 *worker_thread
 
        mono_lazy_initialize (&status, initialize);
 
-       *worker_threads = mono_threadpool_worker_get_max (threadpool->worker);
-       *completion_port_threads = threadpool->limit_io_max;
+       *worker_threads = mono_threadpool_worker_get_max ();
+       *completion_port_threads = threadpool.limit_io_max;
 }
 
 MonoBoolean
@@ -709,13 +721,13 @@ ves_icall_System_Threading_ThreadPool_SetMinThreadsNative (gint32 worker_threads
 {
        mono_lazy_initialize (&status, initialize);
 
-       if (completion_port_threads <= 0 || completion_port_threads > threadpool->limit_io_max)
+       if (completion_port_threads <= 0 || completion_port_threads > threadpool.limit_io_max)
                return FALSE;
 
-       if (!mono_threadpool_worker_set_min (threadpool->worker, worker_threads))
+       if (!mono_threadpool_worker_set_min (worker_threads))
                return FALSE;
 
-       threadpool->limit_io_min = completion_port_threads;
+       threadpool.limit_io_min = completion_port_threads;
 
        return TRUE;
 }
@@ -727,13 +739,13 @@ ves_icall_System_Threading_ThreadPool_SetMaxThreadsNative (gint32 worker_threads
 
        mono_lazy_initialize (&status, initialize);
 
-       if (completion_port_threads < threadpool->limit_io_min || completion_port_threads < cpu_count)
+       if (completion_port_threads < threadpool.limit_io_min || completion_port_threads < cpu_count)
                return FALSE;
 
-       if (!mono_threadpool_worker_set_max (threadpool->worker, worker_threads))
+       if (!mono_threadpool_worker_set_max (worker_threads))
                return FALSE;
 
-       threadpool->limit_io_max = completion_port_threads;
+       threadpool.limit_io_max = completion_port_threads;
 
        return TRUE;
 }
@@ -755,13 +767,13 @@ ves_icall_System_Threading_ThreadPool_NotifyWorkItemComplete (void)
        if (mono_domain_is_unloading (mono_domain_get ()) || mono_runtime_is_shutting_down ())
                return FALSE;
 
-       return mono_threadpool_worker_notify_completed (threadpool->worker);
+       return mono_threadpool_worker_notify_completed ();
 }
 
 void
 ves_icall_System_Threading_ThreadPool_NotifyWorkItemProgressNative (void)
 {
-       mono_threadpool_worker_notify_completed (threadpool->worker);
+       mono_threadpool_worker_notify_completed ();
 }
 
 void
@@ -784,11 +796,17 @@ ves_icall_System_Threading_ThreadPool_RequestWorkerThread (void)
        if (mono_domain_is_unloading (domain))
                return FALSE;
 
+       if (!mono_refcount_tryinc (&threadpool)) {
+               /* threadpool has been destroyed, we are shutting down */
+               return FALSE;
+       }
+
        domains_lock ();
 
        /* synchronize with mono_threadpool_remove_domain_jobs */
        if (mono_domain_is_unloading (domain)) {
                domains_unlock ();
+               mono_refcount_dec (&threadpool);
                return FALSE;
        }
 
@@ -798,15 +816,21 @@ ves_icall_System_Threading_ThreadPool_RequestWorkerThread (void)
        tpdomain->outstanding_request ++;
        g_assert (tpdomain->outstanding_request >= 1);
 
-       mono_refcount_inc (threadpool);
+       domains_unlock ();
+
+       COUNTER_ATOMIC (counter, {
+               if (counter._.starting == 16) {
+                       mono_refcount_dec (&threadpool);
+                       return TRUE;
+               }
 
-       COUNTER_ATOMIC (threadpool, counter, {
                counter._.starting ++;
        });
 
-       mono_threadpool_worker_enqueue (threadpool->worker, worker_callback, NULL);
+       mono_threadpool_worker_enqueue (worker_callback, NULL);
 
-       domains_unlock ();
+       /* we do not decrement the threadpool refcount,
+        * as it's going to be done in the worker_callback */
 
        return TRUE;
 }