Merge pull request #615 from nealef/master
[mono.git] / mono / metadata / threadpool.c
index e74f498a247b164f1aee09807d8d95be45949a20..9a9e7a4cb345ac73cd88bce8d3ed4a273fd8d461 100644 (file)
@@ -7,6 +7,7 @@
  *
  * Copyright 2001-2003 Ximian, Inc (http://www.ximian.com)
  * Copyright 2004-2010 Novell, Inc (http://www.novell.com)
+ * Copyright 2001 Xamarin Inc (http://www.xamarin.com)
  */
 
 #include <config.h>
@@ -17,6 +18,7 @@
 #include <mono/metadata/threads-types.h>
 #include <mono/metadata/threadpool-internals.h>
 #include <mono/metadata/exception.h>
+#include <mono/metadata/environment.h>
 #include <mono/metadata/mono-mlist.h>
 #include <mono/metadata/mono-perfcounters.h>
 #include <mono/metadata/socket-io.h>
@@ -27,6 +29,7 @@
 #include <mono/utils/mono-time.h>
 #include <mono/utils/mono-proclib.h>
 #include <mono/utils/mono-semaphore.h>
+#include <mono/utils/atomic.h>
 #include <errno.h>
 #ifdef HAVE_SYS_TIME_H
 #include <sys/time.h>
@@ -156,6 +159,7 @@ static MonoClass *process_async_call_klass;
 
 static GPtrArray *wsqs;
 CRITICAL_SECTION wsqs_lock;
+static gboolean suspended;
 
 /* Hooks */
 static MonoThreadPoolFunc tp_start_func;
@@ -187,7 +191,7 @@ enum {
 #include <mono/metadata/tpool-poll.c>
 #ifdef HAVE_EPOLL
 #include <mono/metadata/tpool-epoll.c>
-#elif defined(HAVE_KQUEUE)
+#elif defined(USE_KQUEUE_FOR_THREADPOOL)
 #include <mono/metadata/tpool-kqueue.c>
 #endif
 /*
@@ -213,30 +217,11 @@ is_corlib_type (MonoDomain *domain, MonoClass *klass)
 /*
  * Note that we call it is_socket_type() where 'socket' refers to the image
  * that contains the System.Net.Sockets.Socket type.
- * For moonlight there is a System.Net.Sockets.Socket class in both System.dll and System.Net.dll.
 */
 static gboolean
 is_socket_type (MonoDomain *domain, MonoClass *klass)
 {
-       static const char *version = NULL;
-       static gboolean moonlight;
-
-       if (is_system_type (domain, klass))
-               return TRUE;
-
-       /* If moonlight, check if the type is in System.Net.dll too */
-       if (version == NULL) {
-               version = mono_get_runtime_info ()->framework_version;
-               moonlight = !strcmp (version, "2.1");
-       }
-
-       if (!moonlight)
-               return FALSE;
-
-       if (domain->system_net_dll == NULL)
-               domain->system_net_dll = mono_image_loaded ("System.Net");
-       
-       return klass->image == domain->system_net_dll;
+       return is_system_type (domain, klass);
 }
 
 #define check_type_cached(domain, ASSEMBLY, _class, _namespace, _name, loc) do { \
@@ -396,8 +381,14 @@ threadpool_jobs_inc (MonoObject *obj)
 static gboolean
 threadpool_jobs_dec (MonoObject *obj)
 {
-       MonoDomain *domain = obj->vtable->domain;
-       int remaining_jobs = InterlockedDecrement (&domain->threadpool_jobs);
+       MonoDomain *domain;
+       int remaining_jobs;
+
+       if (obj == NULL)
+               return FALSE;
+
+       domain = obj->vtable->domain;
+       remaining_jobs = InterlockedDecrement (&domain->threadpool_jobs);
        if (remaining_jobs == 0 && domain->cleanup_semaphore) {
                ReleaseSemaphore (domain->cleanup_semaphore, 1, NULL);
                return TRUE;
@@ -480,9 +471,15 @@ static void
 init_event_system (SocketIOData *data)
 {
 #ifdef HAVE_EPOLL
-       if (data->event_system == EPOLL_BACKEND)
+       if (data->event_system == EPOLL_BACKEND) {
                data->event_data = tp_epoll_init (data);
-#elif defined(HAVE_KQUEUE)
+               if (data->event_data == NULL) {
+                       if (g_getenv ("MONO_DEBUG"))
+                               g_message ("Falling back to poll()");
+                       data->event_system = POLL_BACKEND;
+               }
+       }
+#elif defined(USE_KQUEUE_FOR_THREADPOOL)
        if (data->event_system == KQUEUE_BACKEND)
                data->event_data = tp_kqueue_init (data);
 #endif
@@ -511,7 +508,7 @@ socket_io_init (SocketIOData *data)
        data->sock_to_state = mono_g_hash_table_new_type (g_direct_hash, g_direct_equal, MONO_HASH_VALUE_GC);
 #ifdef HAVE_EPOLL
        data->event_system = EPOLL_BACKEND;
-#elif defined(HAVE_KQUEUE)
+#elif defined(USE_KQUEUE_FOR_THREADPOOL)
        data->event_system = KQUEUE_BACKEND;
 #else
        data->event_system = POLL_BACKEND;
@@ -520,7 +517,7 @@ socket_io_init (SocketIOData *data)
                data->event_system = POLL_BACKEND;
 
        init_event_system (data);
-       mono_thread_create_internal (mono_get_root_domain (), data->wait, data, TRUE, SMALL_STACK);
+       mono_thread_create_internal (mono_get_root_domain (), data->wait, data, TRUE, FALSE, SMALL_STACK);
        LeaveCriticalSection (&data->io_lock);
        data->inited = 2;
        threadpool_start_thread (&async_io_tp);
@@ -616,7 +613,7 @@ mono_async_invoke (ThreadPool *tp, MonoAsyncResult *ares)
        if (ac == NULL) {
                /* Fast path from ThreadPool.*QueueUserWorkItem */
                void *pa = ares->async_state;
-               mono_runtime_delegate_invoke (ares->async_delegate, &pa, &exc);
+               res = mono_runtime_delegate_invoke (ares->async_delegate, &pa, &exc);
        } else {
                MonoObject *cb_exc = NULL;
 
@@ -640,7 +637,6 @@ mono_async_invoke (ThreadPool *tp, MonoAsyncResult *ares)
                        void *pa = &ares;
                        cb_exc = NULL;
                        mono_runtime_invoke (ac->cb_method, ac->cb_target, pa, &cb_exc);
-                       MONO_OBJECT_SETREF (ac->msg, exc, cb_exc);
                        exc = cb_exc;
                } else {
                        exc = NULL;
@@ -670,8 +666,10 @@ threadpool_start_idle_threads (ThreadPool *tp)
                        if (InterlockedCompareExchange (&tp->nthreads, n + 1, n) == n)
                                break;
                }
+#ifndef DISABLE_PERFCOUNTERS
                mono_perfcounter_update_value (tp->pc_nthreads, TRUE, 1);
-               mono_thread_create_internal (mono_get_root_domain (), tp->async_invoke, tp, TRUE, stack_size);
+#endif
+               mono_thread_create_internal (mono_get_root_domain (), tp->async_invoke, tp, TRUE, FALSE, stack_size);
                SleepEx (100, TRUE);
        } while (1);
 }
@@ -687,6 +685,7 @@ threadpool_init (ThreadPool *tp, int min_threads, int max_threads, void (*async_
        MONO_SEM_INIT (&tp->new_job, 0);
 }
 
+#ifndef DISABLE_PERFCOUNTERS
 static void *
 init_perf_counter (const char *category, const char *counter)
 {
@@ -705,6 +704,7 @@ init_perf_counter (const char *category, const char *counter)
        machine = mono_string_new (root, ".");
        return mono_perfcounter_get_impl (category_str, counter_str, NULL, machine, &type, &custom);
 }
+#endif
 
 #ifdef DEBUG
 static void
@@ -779,6 +779,9 @@ monitor_thread (gpointer unused)
                if (mono_runtime_is_shutting_down ())
                        break;
 
+               if (suspended)
+                       continue;
+
                for (i = 0; i < 2; i++) {
                        ThreadPool *tp;
                        tp = pools [i];
@@ -843,6 +846,7 @@ mono_thread_pool_init ()
        wsqs = g_ptr_array_sized_new (MAX (100 * cpu_count, thread_count));
        mono_wsq_init ();
 
+#ifndef DISABLE_PERFCOUNTERS
        async_tp.pc_nitems = init_perf_counter ("Mono Threadpool", "Work Items Added");
        g_assert (async_tp.pc_nitems);
 
@@ -854,6 +858,7 @@ mono_thread_pool_init ()
 
        async_io_tp.pc_nthreads = init_perf_counter ("Mono Threadpool", "# of IO Threads");
        g_assert (async_io_tp.pc_nthreads);
+#endif
        tp_inited = 2;
 #ifdef DEBUG
        signal (SIGALRM, signal_handler);
@@ -1006,8 +1011,10 @@ threadpool_start_thread (ThreadPool *tp)
        stack_size = (!tp->is_io) ? 0 : SMALL_STACK;
        while (!mono_runtime_is_shutting_down () && (n = tp->nthreads) < tp->max_threads) {
                if (InterlockedCompareExchange (&tp->nthreads, n + 1, n) == n) {
+#ifndef DISABLE_PERFCOUNTERS
                        mono_perfcounter_update_value (tp->pc_nthreads, TRUE, 1);
-                       mono_thread_create_internal (mono_get_root_domain (), tp->async_invoke, tp, TRUE, stack_size);
+#endif
+                       mono_thread_create_internal (mono_get_root_domain (), tp->async_invoke, tp, TRUE, FALSE, stack_size);
                        return TRUE;
                }
        }
@@ -1045,11 +1052,13 @@ threadpool_append_jobs (ThreadPool *tp, MonoObject **jobs, gint njobs)
                return;
 
        if (tp->pool_status == 0 && InterlockedCompareExchange (&tp->pool_status, 1, 0) == 0) {
-               if (!tp->is_io)
-                       mono_thread_create_internal (mono_get_root_domain (), monitor_thread, NULL, TRUE, SMALL_STACK);
+               if (!tp->is_io) {
+                       mono_thread_create_internal (mono_get_root_domain (), monitor_thread, NULL, TRUE, FALSE, SMALL_STACK);
+                       threadpool_start_thread (tp);
+               }
                /* Create on demand up to min_threads to avoid startup penalty for apps that don't use
                 * the threadpool that much
-               * mono_thread_create_internal (mono_get_root_domain (), threadpool_start_idle_threads, tp, TRUE, SMALL_STACK);
+               * mono_thread_create_internal (mono_get_root_domain (), threadpool_start_idle_threads, tp, TRUE, FALSE, SMALL_STACK);
                */
        }
 
@@ -1062,7 +1071,9 @@ threadpool_append_jobs (ThreadPool *tp, MonoObject **jobs, gint njobs)
                        o->add_time = mono_100ns_ticks ();
                }
                threadpool_jobs_inc (ar); 
+#ifndef DISABLE_PERFCOUNTERS
                mono_perfcounter_update_value (tp->pc_nitems, TRUE, 1);
+#endif
                if (!tp->is_io && mono_wsq_local_push (ar))
                        continue;
 
@@ -1094,6 +1105,23 @@ threadpool_clear_queue (ThreadPool *tp, MonoDomain *domain)
        }
 }
 
+static gboolean
+remove_sockstate_for_domain (gpointer key, gpointer value, gpointer user_data)
+{
+       MonoMList *list = value;
+       gboolean remove = FALSE;
+       while (list) {
+               MonoObject *data = mono_mlist_get_data (list);
+               if (mono_object_domain (data) == user_data) {
+                       remove = TRUE;
+                       mono_mlist_set_data (list, NULL);
+               }
+               list = mono_mlist_next (list);
+       }
+       //FIXME is there some sort of additional unregistration we need to perform here?
+       return remove;
+}
+
 /*
  * Clean up the threadpool of all domain jobs.
  * Can only be called as part of the domain unloading process as
@@ -1111,6 +1139,12 @@ mono_thread_pool_remove_domain_jobs (MonoDomain *domain, int timeout)
        threadpool_clear_queue (&async_tp, domain);
        threadpool_clear_queue (&async_io_tp, domain);
 
+       EnterCriticalSection (&socket_io_data.io_lock);
+       if (socket_io_data.sock_to_state)
+               mono_g_hash_table_foreach_remove (socket_io_data.sock_to_state, remove_sockstate_for_domain, domain);
+
+       LeaveCriticalSection (&socket_io_data.io_lock);
+       
        /*
         * There might be some threads out that could be about to execute stuff from the given domain.
         * We avoid that by setting up a semaphore to be pulsed by the thread that reaches zero.
@@ -1358,7 +1392,7 @@ async_invoke_thread (gpointer data)
 
        mono_profiler_thread_start (thread->tid);
        name = (tp->is_io) ? "IO Threadpool worker" : "Threadpool worker";
-       ves_icall_System_Threading_Thread_SetName_internal (thread, mono_string_new (mono_domain_get (), name));
+       mono_thread_set_name_internal (thread, mono_string_new (mono_domain_get (), name), FALSE);
 
        if (tp_start_func)
                tp_start_func (tp_hooks_user_data);
@@ -1424,19 +1458,8 @@ async_invoke_thread (gpointer data)
                                        exc = mono_async_invoke (tp, ar);
                                        if (tp_item_end_func)
                                                tp_item_end_func (tp_item_user_data);
-                                       if (exc && mono_runtime_unhandled_exception_policy_get () == MONO_UNHANDLED_POLICY_CURRENT) {
-                                               gboolean unloaded;
-                                               MonoClass *klass;
-
-                                               klass = exc->vtable->klass;
-                                               unloaded = is_appdomainunloaded_exception (exc->vtable->domain, klass);
-                                               if (!unloaded && klass != mono_defaults.threadabortexception_class) {
-                                                       mono_unhandled_exception (exc);
-                                                       exit (255);
-                                               }
-                                               if (klass == mono_defaults.threadabortexception_class)
-                                                       mono_thread_internal_reset_abort (thread);
-                                       }
+                                       if (exc)
+                                               mono_internal_thread_unhandled_exception (exc);
                                        if (is_socket && tp->is_io) {
                                                MonoSocketAsyncResult *state = (MonoSocketAsyncResult *) data;
 
@@ -1469,10 +1492,21 @@ async_invoke_thread (gpointer data)
                        gboolean res;
 
                        InterlockedIncrement (&tp->waiting);
+
+                       // Another thread may have added a job into its wsq since the last call to dequeue_or_steal
+                       // Check all the queues again before entering the wait loop
+                       dequeue_or_steal (tp, &data, wsq);
+                       if (data) {
+                               InterlockedDecrement (&tp->waiting);
+                               break;
+                       }
+
+                       mono_gc_set_skip_thread (TRUE);
+
 #if defined(__OpenBSD__)
-                       while ((res = mono_sem_wait (&tp->new_job, TRUE)) == -1) {// && errno == EINTR) {
+                       while (mono_cq_count (tp->queue) == 0 && (res = mono_sem_wait (&tp->new_job, TRUE)) == -1) {// && errno == EINTR) {
 #else
-                       while ((res = mono_sem_timedwait (&tp->new_job, 2000, TRUE)) == -1) {// && errno == EINTR) {
+                       while (mono_cq_count (tp->queue) == 0 && (res = mono_sem_timedwait (&tp->new_job, 2000, TRUE)) == -1) {// && errno == EINTR) {
 #endif
                                if (mono_runtime_is_shutting_down ())
                                        break;
@@ -1480,6 +1514,9 @@ async_invoke_thread (gpointer data)
                                        mono_thread_interruption_checkpoint ();
                        }
                        InterlockedDecrement (&tp->waiting);
+
+                       mono_gc_set_skip_thread (FALSE);
+
                        if (mono_runtime_is_shutting_down ())
                                break;
                        must_die = should_i_die (tp);
@@ -1504,7 +1541,9 @@ async_invoke_thread (gpointer data)
                                if (!down && nt <= tp->min_threads)
                                        break;
                                if (down || InterlockedCompareExchange (&tp->nthreads, nt - 1, nt) == nt) {
+#ifndef DISABLE_PERFCOUNTERS
                                        mono_perfcounter_update_value (tp->pc_nthreads, TRUE, -1);
+#endif
                                        if (!tp->is_io) {
                                                remove_wsq (wsq);
                                        }
@@ -1560,9 +1599,9 @@ ves_icall_System_Threading_ThreadPool_SetMinThreads (gint workerThreads, gint co
        InterlockedExchange (&async_tp.min_threads, workerThreads);
        InterlockedExchange (&async_io_tp.min_threads, completionPortThreads);
        if (workerThreads > async_tp.nthreads)
-               mono_thread_create_internal (mono_get_root_domain (), threadpool_start_idle_threads, &async_tp, TRUE, SMALL_STACK);
+               mono_thread_create_internal (mono_get_root_domain (), threadpool_start_idle_threads, &async_tp, TRUE, FALSE, SMALL_STACK);
        if (completionPortThreads > async_io_tp.nthreads)
-               mono_thread_create_internal (mono_get_root_domain (), threadpool_start_idle_threads, &async_io_tp, TRUE, SMALL_STACK);
+               mono_thread_create_internal (mono_get_root_domain (), threadpool_start_idle_threads, &async_io_tp, TRUE, FALSE, SMALL_STACK);
        return TRUE;
 }
 
@@ -1623,3 +1662,39 @@ mono_install_threadpool_item_hooks (MonoThreadPoolItemFunc begin_func, MonoThrea
        tp_item_user_data = user_data;
 }
 
+void
+mono_internal_thread_unhandled_exception (MonoObject* exc)
+{
+       if (mono_runtime_unhandled_exception_policy_get () == MONO_UNHANDLED_POLICY_CURRENT) {
+               gboolean unloaded;
+               MonoClass *klass;
+
+               klass = exc->vtable->klass;
+               unloaded = is_appdomainunloaded_exception (exc->vtable->domain, klass);
+               if (!unloaded && klass != mono_defaults.threadabortexception_class) {
+                       mono_unhandled_exception (exc);
+                       if (mono_environment_exitcode_get () == 1)
+                               exit (255);
+               }
+               if (klass == mono_defaults.threadabortexception_class)
+                mono_thread_internal_reset_abort (mono_thread_internal_current ());
+       }
+}
+
+/*
+ * Suspend creation of new threads.
+ */
+void
+mono_thread_pool_suspend (void)
+{
+       suspended = TRUE;
+}
+
+/*
+ * Resume creation of new threads.
+ */
+void
+mono_thread_pool_resume (void)
+{
+       suspended = FALSE;
+}