Merge pull request #900 from Blewzman/FixAggregateExceptionGetBaseException
[mono.git] / mono / metadata / threadpool.c
index 1eb527e00fc72013b136c9e4660051d0580fb778..f0bcf61114aabd68706c6e4e4d28edac5f39ea0c 100644 (file)
@@ -19,6 +19,7 @@
 #include <mono/metadata/threadpool-internals.h>
 #include <mono/metadata/exception.h>
 #include <mono/metadata/environment.h>
+#include <mono/metadata/mono-config.h>
 #include <mono/metadata/mono-mlist.h>
 #include <mono/metadata/mono-perfcounters.h>
 #include <mono/metadata/socket-io.h>
@@ -29,6 +30,7 @@
 #include <mono/utils/mono-time.h>
 #include <mono/utils/mono-proclib.h>
 #include <mono/utils/mono-semaphore.h>
+#include <mono/utils/atomic.h>
 #include <errno.h>
 #ifdef HAVE_SYS_TIME_H
 #include <sys/time.h>
@@ -88,7 +90,7 @@ typedef struct {
 
        gint event_system;
        gpointer event_data;
-       void (*modify) (gpointer event_data, int fd, int operation, int events, gboolean is_new);
+       void (*modify) (gpointer p, int fd, int operation, int events, gboolean is_new);
        void (*wait) (gpointer sock_data);
        void (*shutdown) (gpointer event_data);
 } SocketIOData;
@@ -151,6 +153,7 @@ static void socket_io_cleanup (SocketIOData *data);
 static MonoObject *get_io_event (MonoMList **list, gint event);
 static int get_events_from_list (MonoMList *list);
 static int get_event_from_state (MonoSocketAsyncResult *state);
+static void check_for_interruption_critical (void);
 
 static MonoClass *async_call_klass;
 static MonoClass *socket_async_call_klass;
@@ -158,6 +161,7 @@ static MonoClass *process_async_call_klass;
 
 static GPtrArray *wsqs;
 CRITICAL_SECTION wsqs_lock;
+static gboolean suspended;
 
 /* Hooks */
 static MonoThreadPoolFunc tp_start_func;
@@ -189,7 +193,7 @@ enum {
 #include <mono/metadata/tpool-poll.c>
 #ifdef HAVE_EPOLL
 #include <mono/metadata/tpool-epoll.c>
-#elif defined(HAVE_KQUEUE)
+#elif defined(USE_KQUEUE_FOR_THREADPOOL)
 #include <mono/metadata/tpool-kqueue.c>
 #endif
 /*
@@ -215,30 +219,11 @@ is_corlib_type (MonoDomain *domain, MonoClass *klass)
 /*
  * Note that we call it is_socket_type() where 'socket' refers to the image
  * that contains the System.Net.Sockets.Socket type.
- * For moonlight there is a System.Net.Sockets.Socket class in both System.dll and System.Net.dll.
 */
 static gboolean
 is_socket_type (MonoDomain *domain, MonoClass *klass)
 {
-       static const char *version = NULL;
-       static gboolean moonlight;
-
-       if (is_system_type (domain, klass))
-               return TRUE;
-
-       /* If moonlight, check if the type is in System.Net.dll too */
-       if (version == NULL) {
-               version = mono_get_runtime_info ()->framework_version;
-               moonlight = !strcmp (version, "2.1");
-       }
-
-       if (!moonlight)
-               return FALSE;
-
-       if (domain->system_net_dll == NULL)
-               domain->system_net_dll = mono_image_loaded ("System.Net");
-       
-       return klass->image == domain->system_net_dll;
+       return is_system_type (domain, klass);
 }
 
 #define check_type_cached(domain, ASSEMBLY, _class, _namespace, _name, loc) do { \
@@ -496,7 +481,7 @@ init_event_system (SocketIOData *data)
                        data->event_system = POLL_BACKEND;
                }
        }
-#elif defined(HAVE_KQUEUE)
+#elif defined(USE_KQUEUE_FOR_THREADPOOL)
        if (data->event_system == KQUEUE_BACKEND)
                data->event_data = tp_kqueue_init (data);
 #endif
@@ -525,7 +510,7 @@ socket_io_init (SocketIOData *data)
        data->sock_to_state = mono_g_hash_table_new_type (g_direct_hash, g_direct_equal, MONO_HASH_VALUE_GC);
 #ifdef HAVE_EPOLL
        data->event_system = EPOLL_BACKEND;
-#elif defined(HAVE_KQUEUE)
+#elif defined(USE_KQUEUE_FOR_THREADPOOL)
        data->event_system = KQUEUE_BACKEND;
 #else
        data->event_system = POLL_BACKEND;
@@ -574,8 +559,8 @@ socket_io_add (MonoAsyncResult *ares, MonoSocketAsyncResult *state)
 
        mono_g_hash_table_replace (data->sock_to_state, state->handle, list);
        ievt = get_events_from_list (list);
-       data->modify (data->event_data, fd, state->operation, ievt, is_new);
-        LeaveCriticalSection (&data->io_lock);
+       /* The modify function leaves the io_lock critical section. */
+       data->modify (data, fd, state->operation, ievt, is_new);
 }
 
 #ifndef DISABLE_SOCKETS
@@ -618,6 +603,7 @@ mono_async_invoke (ThreadPool *tp, MonoAsyncResult *ares)
        MonoObject *res, *exc = NULL;
        MonoArray *out_args = NULL;
        HANDLE wait_event = NULL;
+       MonoInternalThread *thread = mono_thread_internal_current ();
 
        if (ares->execution_context) {
                /* use captured ExecutionContext (if available) */
@@ -630,7 +616,10 @@ mono_async_invoke (ThreadPool *tp, MonoAsyncResult *ares)
        if (ac == NULL) {
                /* Fast path from ThreadPool.*QueueUserWorkItem */
                void *pa = ares->async_state;
+               /* The debugger needs this */
+               thread->async_invoke_method = ((MonoDelegate*)ares->async_delegate)->method;
                res = mono_runtime_delegate_invoke (ares->async_delegate, &pa, &exc);
+               thread->async_invoke_method = NULL;
        } else {
                MonoObject *cb_exc = NULL;
 
@@ -653,7 +642,9 @@ mono_async_invoke (ThreadPool *tp, MonoAsyncResult *ares)
                if (ac != NULL && ac->cb_method) {
                        void *pa = &ares;
                        cb_exc = NULL;
+                       thread->async_invoke_method = ac->cb_method;
                        mono_runtime_invoke (ac->cb_method, ac->cb_target, pa, &cb_exc);
+                       thread->async_invoke_method = NULL;
                        exc = cb_exc;
                } else {
                        exc = NULL;
@@ -683,7 +674,9 @@ threadpool_start_idle_threads (ThreadPool *tp)
                        if (InterlockedCompareExchange (&tp->nthreads, n + 1, n) == n)
                                break;
                }
+#ifndef DISABLE_PERFCOUNTERS
                mono_perfcounter_update_value (tp->pc_nthreads, TRUE, 1);
+#endif
                mono_thread_create_internal (mono_get_root_domain (), tp->async_invoke, tp, TRUE, stack_size);
                SleepEx (100, TRUE);
        } while (1);
@@ -700,6 +693,7 @@ threadpool_init (ThreadPool *tp, int min_threads, int max_threads, void (*async_
        MONO_SEM_INIT (&tp->new_job, 0);
 }
 
+#ifndef DISABLE_PERFCOUNTERS
 static void *
 init_perf_counter (const char *category, const char *counter)
 {
@@ -718,6 +712,7 @@ init_perf_counter (const char *category, const char *counter)
        machine = mono_string_new (root, ".");
        return mono_perfcounter_get_impl (category_str, counter_str, NULL, machine, &type, &custom);
 }
+#endif
 
 #ifdef DEBUG
 static void
@@ -777,6 +772,7 @@ monitor_thread (gpointer unused)
        ves_icall_System_Threading_Thread_SetName_internal (thread, mono_string_new (mono_domain_get (), "Threadpool monitor"));
        while (1) {
                ms = 500;
+               i = 10; //number of spurious awakes we tolerate before doing a round of rebalancing.
                do {
                        guint32 ts;
                        ts = mono_msec_ticks ();
@@ -787,11 +783,14 @@ monitor_thread (gpointer unused)
                                break;
                        if (THREAD_WANTS_A_BREAK (thread))
                                mono_thread_interruption_checkpoint ();
-               } while (ms > 0);
+               } while (ms > 0 && i--);
 
                if (mono_runtime_is_shutting_down ())
                        break;
 
+               if (suspended)
+                       continue;
+
                for (i = 0; i < 2; i++) {
                        ThreadPool *tp;
                        tp = pools [i];
@@ -817,7 +816,13 @@ monitor_thread (gpointer unused)
 }
 
 void
-mono_thread_pool_init ()
+mono_thread_pool_init_tls (void)
+{
+       mono_wsq_init ();
+}
+
+void
+mono_thread_pool_init (void)
 {
        gint threads_per_cpu = 1;
        gint thread_count;
@@ -854,8 +859,8 @@ mono_thread_pool_init ()
 
        InitializeCriticalSection (&wsqs_lock);
        wsqs = g_ptr_array_sized_new (MAX (100 * cpu_count, thread_count));
-       mono_wsq_init ();
 
+#ifndef DISABLE_PERFCOUNTERS
        async_tp.pc_nitems = init_perf_counter ("Mono Threadpool", "Work Items Added");
        g_assert (async_tp.pc_nitems);
 
@@ -867,6 +872,7 @@ mono_thread_pool_init ()
 
        async_io_tp.pc_nthreads = init_perf_counter ("Mono Threadpool", "# of IO Threads");
        g_assert (async_io_tp.pc_nthreads);
+#endif
        tp_inited = 2;
 #ifdef DEBUG
        signal (SIGALRM, signal_handler);
@@ -1019,7 +1025,9 @@ threadpool_start_thread (ThreadPool *tp)
        stack_size = (!tp->is_io) ? 0 : SMALL_STACK;
        while (!mono_runtime_is_shutting_down () && (n = tp->nthreads) < tp->max_threads) {
                if (InterlockedCompareExchange (&tp->nthreads, n + 1, n) == n) {
+#ifndef DISABLE_PERFCOUNTERS
                        mono_perfcounter_update_value (tp->pc_nthreads, TRUE, 1);
+#endif
                        mono_thread_create_internal (mono_get_root_domain (), tp->async_invoke, tp, TRUE, stack_size);
                        return TRUE;
                }
@@ -1064,8 +1072,10 @@ threadpool_append_jobs (ThreadPool *tp, MonoObject **jobs, gint njobs)
                }
                /* Create on demand up to min_threads to avoid startup penalty for apps that don't use
                 * the threadpool that much
-               * mono_thread_create_internal (mono_get_root_domain (), threadpool_start_idle_threads, tp, TRUE, SMALL_STACK);
-               */
+                */
+               if (mono_config_is_server_mode ()) {
+                       mono_thread_create_internal (mono_get_root_domain (), threadpool_start_idle_threads, tp, TRUE, SMALL_STACK);
+               }
        }
 
        for (i = 0; i < njobs; i++) {
@@ -1077,7 +1087,9 @@ threadpool_append_jobs (ThreadPool *tp, MonoObject **jobs, gint njobs)
                        o->add_time = mono_100ns_ticks ();
                }
                threadpool_jobs_inc (ar); 
+#ifndef DISABLE_PERFCOUNTERS
                mono_perfcounter_update_value (tp->pc_nitems, TRUE, 1);
+#endif
                if (!tp->is_io && mono_wsq_local_push (ar))
                        continue;
 
@@ -1092,10 +1104,13 @@ static void
 threadpool_clear_queue (ThreadPool *tp, MonoDomain *domain)
 {
        MonoObject *obj;
-       MonoMList *other;
+       MonoMList *other = NULL;
+       MonoCQ *queue = tp->queue;
 
-       other = NULL;
-       while (mono_cq_dequeue (tp->queue, &obj)) {
+       if (!queue)
+               return;
+
+       while (mono_cq_dequeue (queue, &obj)) {
                if (obj == NULL)
                        continue;
                if (obj->vtable->domain != domain)
@@ -1103,6 +1118,9 @@ threadpool_clear_queue (ThreadPool *tp, MonoDomain *domain)
                threadpool_jobs_dec (obj);
        }
 
+       if (mono_runtime_is_shutting_down ())
+               return;
+
        while (other) {
                threadpool_append_job (tp, (MonoObject *) mono_mlist_get_data (other));
                other = mono_mlist_next (other);
@@ -1282,9 +1300,10 @@ try_steal (MonoWSQ *local_wsq, gpointer *data, gboolean retry)
 static gboolean
 dequeue_or_steal (ThreadPool *tp, gpointer *data, MonoWSQ *local_wsq)
 {
-       if (mono_runtime_is_shutting_down ())
+       MonoCQ *queue = tp->queue;
+       if (mono_runtime_is_shutting_down () || !queue)
                return FALSE;
-       mono_cq_dequeue (tp->queue, (MonoObject **) data);
+       mono_cq_dequeue (queue, (MonoObject **) data);
        if (!tp->is_io && !*data)
                try_steal (local_wsq, data, FALSE);
        return (*data != NULL);
@@ -1377,26 +1396,63 @@ should_i_die (ThreadPool *tp)
        return result;
 }
 
+static void
+set_tp_thread_info (ThreadPool *tp)
+{
+       const gchar *name;
+       MonoInternalThread *thread = mono_thread_internal_current ();
+
+       mono_profiler_thread_start (thread->tid);
+       name = (tp->is_io) ? "IO Threadpool worker" : "Threadpool worker";
+       mono_thread_set_name_internal (thread, mono_string_new (mono_domain_get (), name), FALSE);
+}
+
+static void
+clear_thread_state (void)
+{
+       MonoInternalThread *thread = mono_thread_internal_current ();
+       /* If the callee changes the background status, set it back to TRUE */
+       mono_thread_clr_state (thread , ~ThreadState_Background);
+       if (!mono_thread_test_state (thread , ThreadState_Background))
+               ves_icall_System_Threading_Thread_SetState (thread, ThreadState_Background);
+}
+
+static void
+check_for_interruption_critical (void)
+{
+       MonoInternalThread *thread;
+       /*RULE NUMBER ONE OF SKIP_THREAD: NEVER POKE MANAGED STATE.*/
+       mono_gc_set_skip_thread (FALSE);
+
+       thread = mono_thread_internal_current ();
+       if (THREAD_WANTS_A_BREAK (thread))
+               mono_thread_interruption_checkpoint ();
+
+       /*RULE NUMBER TWO OF SKIP_THREAD: READ RULE NUMBER ONE.*/
+       mono_gc_set_skip_thread (TRUE);
+}
+
+static void
+fire_profiler_thread_end (void)
+{
+       MonoInternalThread *thread = mono_thread_internal_current ();
+       mono_profiler_thread_end (thread->tid);
+}
+
 static void
 async_invoke_thread (gpointer data)
 {
        MonoDomain *domain;
-       MonoInternalThread *thread;
        MonoWSQ *wsq;
        ThreadPool *tp;
        gboolean must_die;
-       const gchar *name;
   
        tp = data;
        wsq = NULL;
        if (!tp->is_io)
                wsq = add_wsq ();
 
-       thread = mono_thread_internal_current ();
-
-       mono_profiler_thread_start (thread->tid);
-       name = (tp->is_io) ? "IO Threadpool worker" : "Threadpool worker";
-       mono_thread_set_name_internal (thread, mono_string_new (mono_domain_get (), name), FALSE);
+       set_tp_thread_info (tp);
 
        if (tp_start_func)
                tp_start_func (tp_hooks_user_data);
@@ -1478,10 +1534,7 @@ async_invoke_thread (gpointer data)
                                }
                                mono_thread_pop_appdomain_ref ();
                                InterlockedDecrement (&tp->busy_threads);
-                               /* If the callee changes the background status, set it back to TRUE */
-                               mono_thread_clr_state (thread , ~ThreadState_Background);
-                               if (!mono_thread_test_state (thread , ThreadState_Background))
-                                       ves_icall_System_Threading_Thread_SetState (thread, ThreadState_Background);
+                               clear_thread_state ();
                        }
                }
 
@@ -1514,8 +1567,7 @@ async_invoke_thread (gpointer data)
 #endif
                                if (mono_runtime_is_shutting_down ())
                                        break;
-                               if (THREAD_WANTS_A_BREAK (thread))
-                                       mono_thread_interruption_checkpoint ();
+                               check_for_interruption_critical ();
                        }
                        InterlockedDecrement (&tp->waiting);
 
@@ -1545,12 +1597,14 @@ async_invoke_thread (gpointer data)
                                if (!down && nt <= tp->min_threads)
                                        break;
                                if (down || InterlockedCompareExchange (&tp->nthreads, nt - 1, nt) == nt) {
+#ifndef DISABLE_PERFCOUNTERS
                                        mono_perfcounter_update_value (tp->pc_nthreads, TRUE, -1);
+#endif
                                        if (!tp->is_io) {
                                                remove_wsq (wsq);
                                        }
 
-                                       mono_profiler_thread_end (thread->tid);
+                                       fire_profiler_thread_end ();
 
                                        if (tp_finish_func)
                                                tp_finish_func (tp_hooks_user_data);
@@ -1682,3 +1736,21 @@ mono_internal_thread_unhandled_exception (MonoObject* exc)
                 mono_thread_internal_reset_abort (mono_thread_internal_current ());
        }
 }
+
+/*
+ * Suspend creation of new threads.
+ */
+void
+mono_thread_pool_suspend (void)
+{
+       suspended = TRUE;
+}
+
+/*
+ * Resume creation of new threads.
+ */
+void
+mono_thread_pool_resume (void)
+{
+       suspended = FALSE;
+}