Merge pull request #2102 from AdamBurgess/master
[mono.git] / mono / metadata / threadpool-ms.c
index 26ed2a1331958e33cbafd2a5e3cfc2e3cd654801..b9d9f941c0cb13b2d380bd5418c286df5b3dcdc3 100644 (file)
@@ -87,7 +87,6 @@ typedef struct {
 } ThreadPoolDomain;
 
 typedef MonoInternalThread ThreadPoolWorkingThread;
-typedef mono_cond_t ThreadPoolParkedThread;
 
 typedef struct {
        gint32 wave_period;
@@ -129,7 +128,8 @@ typedef struct {
        mono_mutex_t domains_lock;
 
        GPtrArray *working_threads; // ThreadPoolWorkingThread* []
-       GPtrArray *parked_threads; // ThreadPoolParkedThread* []
+       gint32 parked_threads_count;
+       mono_cond_t parked_threads_cond;
        mono_mutex_t active_threads_lock; /* protect access to working_threads and parked_threads */
 
        gint32 heuristic_completions;
@@ -250,9 +250,10 @@ initialize (void)
        threadpool->domains = g_ptr_array_new ();
        mono_mutex_init_recursive (&threadpool->domains_lock);
 
-       threadpool->parked_threads = g_ptr_array_new ();
+       threadpool->parked_threads_count = 0;
+       mono_cond_init (&threadpool->parked_threads_cond, NULL);
        threadpool->working_threads = g_ptr_array_new ();
-       mono_mutex_init (&threadpool->active_threads_lock);
+       mono_mutex_init_recursive (&threadpool->active_threads_lock);
 
        threadpool->heuristic_adjustment_interval = 10;
        mono_mutex_init (&threadpool->heuristic_lock);
@@ -303,7 +304,6 @@ initialize (void)
        threadpool->suspended = FALSE;
 }
 
-static void worker_unpark (ThreadPoolParkedThread *thread);
 static void worker_kill (ThreadPoolWorkingThread *thread);
 
 static void
@@ -315,18 +315,21 @@ cleanup (void)
         * cleaning up only if the runtime is shutting down */
        g_assert (mono_runtime_is_shutting_down ());
 
+       MONO_PREPARE_BLOCKING;
        while (monitor_status != MONITOR_STATUS_NOT_RUNNING)
                g_usleep (1000);
+       MONO_FINISH_BLOCKING;
 
+       MONO_PREPARE_BLOCKING;
        mono_mutex_lock (&threadpool->active_threads_lock);
+       MONO_FINISH_BLOCKING;
 
        /* stop all threadpool->working_threads */
        for (i = 0; i < threadpool->working_threads->len; ++i)
                worker_kill ((ThreadPoolWorkingThread*) g_ptr_array_index (threadpool->working_threads, i));
 
        /* unpark all threadpool->parked_threads */
-       for (i = 0; i < threadpool->parked_threads->len; ++i)
-               worker_unpark ((ThreadPoolParkedThread*) g_ptr_array_index (threadpool->parked_threads, i));
+       mono_cond_broadcast (&threadpool->parked_threads_cond);
 
        mono_mutex_unlock (&threadpool->active_threads_lock);
 }
@@ -481,64 +484,88 @@ domain_get_next (ThreadPoolDomain *current)
 }
 
 static void
-worker_park (void)
+worker_wait_interrupt (gpointer data)
 {
-       mono_cond_t cond;
-       MonoInternalThread *thread = mono_thread_internal_current ();
+       mono_mutex_lock (&threadpool->active_threads_lock);
+       mono_cond_signal (&threadpool->parked_threads_cond);
+       mono_mutex_unlock (&threadpool->active_threads_lock);
+}
 
-       mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] current worker parking", GetCurrentThreadId ());
+/* return TRUE if timeout, FALSE otherwise (worker unpark or interrupt) */
+static gboolean
+worker_park (void)
+{
+       gboolean timeout = FALSE;
 
-       mono_cond_init (&cond, NULL);
+       mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] current worker parking", mono_native_thread_id_get ());
 
        mono_gc_set_skip_thread (TRUE);
 
+       MONO_PREPARE_BLOCKING;
+
        mono_mutex_lock (&threadpool->active_threads_lock);
 
        if (!mono_runtime_is_shutting_down ()) {
-               g_ptr_array_add (threadpool->parked_threads, &cond);
-               g_ptr_array_remove_fast (threadpool->working_threads, thread);
+               static gpointer rand_handle = NULL;
+               MonoInternalThread *thread_internal;
+               gboolean interrupted = FALSE;
 
-               mono_cond_wait (&cond, &threadpool->active_threads_lock);
+               if (!rand_handle)
+                       rand_handle = rand_create ();
+               g_assert (rand_handle);
 
-               g_ptr_array_add (threadpool->working_threads, thread);
-               g_ptr_array_remove (threadpool->parked_threads, &cond);
+               thread_internal = mono_thread_internal_current ();
+               g_assert (thread_internal);
+
+               threadpool->parked_threads_count += 1;
+               g_ptr_array_remove_fast (threadpool->working_threads, thread_internal);
+
+               mono_thread_info_install_interrupt (worker_wait_interrupt, NULL, &interrupted);
+               if (interrupted)
+                       goto done;
+
+               if (mono_cond_timedwait_ms (&threadpool->parked_threads_cond, &threadpool->active_threads_lock, rand_next (rand_handle, 5 * 1000, 60 * 1000)) != 0)
+                       timeout = TRUE;
+
+               mono_thread_info_uninstall_interrupt (&interrupted);
+
+done:
+               g_ptr_array_add (threadpool->working_threads, thread_internal);
+               threadpool->parked_threads_count -= 1;
        }
 
        mono_mutex_unlock (&threadpool->active_threads_lock);
 
+       MONO_FINISH_BLOCKING;
+
        mono_gc_set_skip_thread (FALSE);
 
-       mono_cond_destroy (&cond);
+       mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] current worker unparking, timeout? %s", mono_native_thread_id_get (), timeout ? "yes" : "no");
 
-       mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] current worker unparking", GetCurrentThreadId ());
+       return timeout;
 }
 
 static gboolean
 worker_try_unpark (void)
 {
        gboolean res = FALSE;
-       guint len;
 
-       mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try unpark worker", GetCurrentThreadId ());
+       mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try unpark worker", mono_native_thread_id_get ());
+
+       MONO_PREPARE_BLOCKING;
 
        mono_mutex_lock (&threadpool->active_threads_lock);
-       len = threadpool->parked_threads->len;
-       if (len > 0) {
-               mono_cond_t *cond = (mono_cond_t*) g_ptr_array_index (threadpool->parked_threads, len - 1);
-               mono_cond_signal (cond);
+       if (threadpool->parked_threads_count > 0) {
+               mono_cond_signal (&threadpool->parked_threads_cond);
                res = TRUE;
        }
        mono_mutex_unlock (&threadpool->active_threads_lock);
 
-       mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try unpark worker, success? %s", GetCurrentThreadId (), res ? "yes" : "no");
+       MONO_FINISH_BLOCKING;
 
-       return res;
-}
+       mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try unpark worker, success? %s", mono_native_thread_id_get (), res ? "yes" : "no");
 
-static void
-worker_unpark (ThreadPoolParkedThread *thread)
-{
-       mono_cond_signal ((mono_cond_t*) thread);
+       return res;
 }
 
 static void
@@ -558,7 +585,7 @@ worker_thread (gpointer data)
        ThreadPoolCounter counter;
        gboolean retire = FALSE;
 
-       mono_trace (G_LOG_LEVEL_INFO, MONO_TRACE_THREADPOOL, "[%p] worker starting", GetCurrentThreadId ());
+       mono_trace (G_LOG_LEVEL_INFO, MONO_TRACE_THREADPOOL, "[%p] worker starting", mono_native_thread_id_get ());
 
        g_assert (threadpool);
 
@@ -567,9 +594,11 @@ worker_thread (gpointer data)
 
        mono_thread_set_name_internal (thread, mono_string_new (mono_domain_get (), "Threadpool worker"), FALSE);
 
+       MONO_PREPARE_BLOCKING;
        mono_mutex_lock (&threadpool->active_threads_lock);
        g_ptr_array_add (threadpool->working_threads, thread);
        mono_mutex_unlock (&threadpool->active_threads_lock);
+       MONO_FINISH_BLOCKING;
 
        previous_tpdomain = NULL;
 
@@ -585,13 +614,15 @@ worker_thread (gpointer data)
                }
 
                if (retire || !(tpdomain = domain_get_next (previous_tpdomain))) {
+                       gboolean timeout;
+
                        COUNTER_ATOMIC (counter, {
                                counter._.working --;
                                counter._.parked ++;
                        });
 
                        mono_mutex_unlock (&threadpool->domains_lock);
-                       worker_park ();
+                       timeout = worker_park ();
                        mono_mutex_lock (&threadpool->domains_lock);
 
                        COUNTER_ATOMIC (counter, {
@@ -599,6 +630,9 @@ worker_thread (gpointer data)
                                counter._.parked --;
                        });
 
+                       if (timeout)
+                               break;
+
                        if (retire)
                                retire = FALSE;
 
@@ -609,7 +643,7 @@ worker_thread (gpointer data)
                g_assert (tpdomain->outstanding_request >= 0);
 
                mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] worker running in domain %p",
-                       GetCurrentThreadId (), tpdomain->domain, tpdomain->outstanding_request);
+                       mono_native_thread_id_get (), tpdomain->domain, tpdomain->outstanding_request);
 
                g_assert (tpdomain->domain);
                g_assert (tpdomain->domain->threadpool_jobs >= 0);
@@ -653,16 +687,18 @@ worker_thread (gpointer data)
 
        mono_mutex_unlock (&threadpool->domains_lock);
 
+       MONO_PREPARE_BLOCKING;
        mono_mutex_lock (&threadpool->active_threads_lock);
        g_ptr_array_remove_fast (threadpool->working_threads, thread);
        mono_mutex_unlock (&threadpool->active_threads_lock);
+       MONO_FINISH_BLOCKING;
 
        COUNTER_ATOMIC (counter, {
                counter._.working--;
                counter._.active --;
        });
 
-       mono_trace (G_LOG_LEVEL_INFO, MONO_TRACE_THREADPOOL, "[%p] worker finishing", GetCurrentThreadId ());
+       mono_trace (G_LOG_LEVEL_INFO, MONO_TRACE_THREADPOOL, "[%p] worker finishing", mono_native_thread_id_get ());
 }
 
 static gboolean
@@ -671,7 +707,7 @@ worker_try_create (void)
        ThreadPoolCounter counter;
        MonoInternalThread *thread;
 
-       mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try create worker", GetCurrentThreadId ());
+       mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try create worker", mono_native_thread_id_get ());
 
        COUNTER_ATOMIC (counter, {
                if (counter._.working >= counter._.max_working)
@@ -682,11 +718,11 @@ worker_try_create (void)
 
        if ((thread = mono_thread_create_internal (mono_get_root_domain (), worker_thread, NULL, TRUE, 0)) != NULL) {
                mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try create worker, created %p",
-                       GetCurrentThreadId (), thread->tid);
+                       mono_native_thread_id_get (), thread->tid);
                return TRUE;
        }
 
-       mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try create worker, failed", GetCurrentThreadId ());
+       mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try create worker, failed", mono_native_thread_id_get ());
 
        COUNTER_ATOMIC (counter, {
                counter._.working --;
@@ -722,7 +758,7 @@ worker_request (MonoDomain *domain)
        tpdomain->outstanding_request ++;
 
        mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] request worker, domain = %p, outstanding_request = %d",
-               GetCurrentThreadId (), tpdomain->domain, tpdomain->outstanding_request);
+               mono_native_thread_id_get (), tpdomain->domain, tpdomain->outstanding_request);
 
        mono_mutex_unlock (&threadpool->domains_lock);
 
@@ -732,16 +768,16 @@ worker_request (MonoDomain *domain)
        monitor_ensure_running ();
 
        if (worker_try_unpark ()) {
-               mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] request worker, unparked", GetCurrentThreadId ());
+               mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] request worker, unparked", mono_native_thread_id_get ());
                return TRUE;
        }
 
        if (worker_try_create ()) {
-               mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] request worker, created", GetCurrentThreadId ());
+               mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] request worker, created", mono_native_thread_id_get ());
                return TRUE;
        }
 
-       mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] request worker, failed", GetCurrentThreadId ());
+       mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] request worker, failed", mono_native_thread_id_get ());
        return FALSE;
 }
 
@@ -811,7 +847,7 @@ monitor_thread (void)
 
        mono_cpu_usage (threadpool->cpu_usage_state);
 
-       mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] monitor thread, started", GetCurrentThreadId ());
+       mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] monitor thread, started", mono_native_thread_id_get ());
 
        do {
                MonoInternalThread *thread;
@@ -825,12 +861,13 @@ monitor_thread (void)
 
                do {
                        guint32 ts;
+                       gboolean alerted = FALSE;
 
                        if (mono_runtime_is_shutting_down ())
                                break;
 
                        ts = mono_msec_ticks ();
-                       if (SleepEx (interval_left, TRUE) == 0)
+                       if (mono_thread_info_sleep (interval_left, &alerted) == 0)
                                break;
                        interval_left -= mono_msec_ticks () - ts;
 
@@ -848,6 +885,7 @@ monitor_thread (void)
                if (mono_runtime_is_shutting_down () || !domain_any_has_request ())
                        continue;
 
+               MONO_PREPARE_BLOCKING;
                mono_mutex_lock (&threadpool->active_threads_lock);
                for (i = 0; i < threadpool->working_threads->len; ++i) {
                        thread = g_ptr_array_index (threadpool->working_threads, i);
@@ -857,6 +895,7 @@ monitor_thread (void)
                        }
                }
                mono_mutex_unlock (&threadpool->active_threads_lock);
+               MONO_FINISH_BLOCKING;
 
                if (all_waitsleepjoin) {
                        ThreadPoolCounter counter;
@@ -872,19 +911,19 @@ monitor_thread (void)
                                        break;
 
                                if (worker_try_unpark ()) {
-                                       mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] monitor thread, unparked", GetCurrentThreadId ());
+                                       mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] monitor thread, unparked", mono_native_thread_id_get ());
                                        break;
                                }
 
                                if (worker_try_create ()) {
-                                       mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] monitor thread, created", GetCurrentThreadId ());
+                                       mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] monitor thread, created", mono_native_thread_id_get ());
                                        break;
                                }
                        }
                }
        } while (monitor_should_keep_running ());
 
-       mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] monitor thread, finished", GetCurrentThreadId ());
+       mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] monitor thread, finished", mono_native_thread_id_get ());
 }
 
 static void
@@ -920,7 +959,7 @@ hill_climbing_change_thread_count (gint16 new_thread_count, ThreadPoolHeuristicS
 
        hc = &threadpool->heuristic_hill_climbing;
 
-       mono_trace (G_LOG_LEVEL_INFO, MONO_TRACE_THREADPOOL, "[%p] hill climbing, change max number of threads %d", GetCurrentThreadId (), new_thread_count);
+       mono_trace (G_LOG_LEVEL_INFO, MONO_TRACE_THREADPOOL, "[%p] hill climbing, change max number of threads %d", mono_native_thread_id_get (), new_thread_count);
 
        hc->last_thread_count = new_thread_count;
        hc->current_sample_interval = rand_next (&hc->random_interval_generator, hc->sample_interval_low, hc->sample_interval_high);
@@ -1274,11 +1313,6 @@ mono_threadpool_ms_begin_invoke (MonoDomain *domain, MonoObject *target, MonoMet
        async_result = mono_async_result_new (domain, NULL, async_call->state, NULL, (MonoObject*) async_call);
        MONO_OBJECT_SETREF (async_result, async_delegate, target);
 
-#ifndef DISABLE_SOCKETS
-       if (mono_threadpool_ms_is_io (target, state))
-               return mono_threadpool_ms_io_add (async_result, (MonoSocketAsyncResult*) state);
-#endif
-
        mono_threadpool_ms_enqueue_work_item (domain, (MonoObject*) async_result);
 
        return async_result;
@@ -1304,7 +1338,7 @@ mono_threadpool_ms_end_invoke (MonoAsyncResult *ares, MonoArray **out_args, Mono
                return NULL;
        }
 
-       MONO_OBJECT_SETREF (ares, endinvoke_called, 1);
+       ares->endinvoke_called = 1;
 
        /* wait until we are really finished */
        if (ares->completed) {