Apply patch from Steven Boswell (ulatekh@yahoo.com) to fix #5591
[mono.git] / mono / metadata / threadpool.c
index 9fa42580ef227c6e72ce30928348659d03518238..272bd1bcdb9afa586e94c672635d9b05b0ff53c7 100644 (file)
@@ -7,6 +7,7 @@
  *
  * Copyright 2001-2003 Ximian, Inc (http://www.ximian.com)
  * Copyright 2004-2010 Novell, Inc (http://www.novell.com)
+ * Copyright 2001 Xamarin Inc (http://www.xamarin.com)
  */
 
 #include <config.h>
 #include <mono/metadata/threads-types.h>
 #include <mono/metadata/threadpool-internals.h>
 #include <mono/metadata/exception.h>
+#include <mono/metadata/environment.h>
 #include <mono/metadata/mono-mlist.h>
 #include <mono/metadata/mono-perfcounters.h>
 #include <mono/metadata/socket-io.h>
 #include <mono/metadata/mono-cq.h>
 #include <mono/metadata/mono-wsq.h>
+#include <mono/metadata/mono-ptr-array.h>
 #include <mono/io-layer/io-layer.h>
 #include <mono/utils/mono-time.h>
 #include <mono/utils/mono-proclib.h>
@@ -395,8 +398,14 @@ threadpool_jobs_inc (MonoObject *obj)
 static gboolean
 threadpool_jobs_dec (MonoObject *obj)
 {
-       MonoDomain *domain = obj->vtable->domain;
-       int remaining_jobs = InterlockedDecrement (&domain->threadpool_jobs);
+       MonoDomain *domain;
+       int remaining_jobs;
+
+       if (obj == NULL)
+               return FALSE;
+
+       domain = obj->vtable->domain;
+       remaining_jobs = InterlockedDecrement (&domain->threadpool_jobs);
        if (remaining_jobs == 0 && domain->cleanup_semaphore) {
                ReleaseSemaphore (domain->cleanup_semaphore, 1, NULL);
                return TRUE;
@@ -479,8 +488,14 @@ static void
 init_event_system (SocketIOData *data)
 {
 #ifdef HAVE_EPOLL
-       if (data->event_system == EPOLL_BACKEND)
+       if (data->event_system == EPOLL_BACKEND) {
                data->event_data = tp_epoll_init (data);
+               if (data->event_data == NULL) {
+                       if (g_getenv ("MONO_DEBUG"))
+                               g_message ("Falling back to poll()");
+                       data->event_system = POLL_BACKEND;
+               }
+       }
 #elif defined(HAVE_KQUEUE)
        if (data->event_system == KQUEUE_BACKEND)
                data->event_data = tp_kqueue_init (data);
@@ -522,7 +537,7 @@ socket_io_init (SocketIOData *data)
        mono_thread_create_internal (mono_get_root_domain (), data->wait, data, TRUE, SMALL_STACK);
        LeaveCriticalSection (&data->io_lock);
        data->inited = 2;
-       mono_thread_create_internal (mono_get_root_domain (), threadpool_start_idle_threads, &async_io_tp, TRUE, SMALL_STACK);
+       threadpool_start_thread (&async_io_tp);
 }
 
 static void
@@ -615,7 +630,7 @@ mono_async_invoke (ThreadPool *tp, MonoAsyncResult *ares)
        if (ac == NULL) {
                /* Fast path from ThreadPool.*QueueUserWorkItem */
                void *pa = ares->async_state;
-               mono_runtime_delegate_invoke (ares->async_delegate, &pa, &exc);
+               res = mono_runtime_delegate_invoke (ares->async_delegate, &pa, &exc);
        } else {
                MonoObject *cb_exc = NULL;
 
@@ -639,7 +654,6 @@ mono_async_invoke (ThreadPool *tp, MonoAsyncResult *ares)
                        void *pa = &ares;
                        cb_exc = NULL;
                        mono_runtime_invoke (ac->cb_method, ac->cb_target, pa, &cb_exc);
-                       MONO_OBJECT_SETREF (ac->msg, exc, cb_exc);
                        exc = cb_exc;
                } else {
                        exc = NULL;
@@ -749,15 +763,16 @@ signal_handler (int signo)
 #endif
 
 static void
-monitor_thread (gpointer data)
+monitor_thread (gpointer unused)
 {
-       ThreadPool *tp;
+       ThreadPool *pools [2];
        MonoInternalThread *thread;
        guint32 ms;
        gboolean need_one;
        int i;
 
-       tp = data;
+       pools [0] = &async_tp;
+       pools [1] = &async_io_tp;
        thread = mono_thread_internal_current ();
        ves_icall_System_Threading_Thread_SetName_internal (thread, mono_string_new (mono_domain_get (), "Threadpool monitor"));
        while (1) {
@@ -776,23 +791,28 @@ monitor_thread (gpointer data)
 
                if (mono_runtime_is_shutting_down ())
                        break;
-               if (tp->waiting > 0)
-                       continue;
-               need_one = (mono_cq_count (tp->queue) > 0);
-               if (!need_one) {
-                       EnterCriticalSection (&wsqs_lock);
-                       for (i = 0; wsqs != NULL && i < wsqs->len; i++) {
-                               MonoWSQ *wsq;
-                               wsq = g_ptr_array_index (wsqs, i);
-                               if (mono_wsq_count (wsq) > 0) {
-                                       need_one = TRUE;
-                                       break;
+
+               for (i = 0; i < 2; i++) {
+                       ThreadPool *tp;
+                       tp = pools [i];
+                       if (tp->waiting > 0)
+                               continue;
+                       need_one = (mono_cq_count (tp->queue) > 0);
+                       if (!need_one && !tp->is_io) {
+                               EnterCriticalSection (&wsqs_lock);
+                               for (i = 0; wsqs != NULL && i < wsqs->len; i++) {
+                                       MonoWSQ *wsq;
+                                       wsq = g_ptr_array_index (wsqs, i);
+                                       if (mono_wsq_count (wsq) != 0) {
+                                               need_one = TRUE;
+                                               break;
+                                       }
                                }
+                               LeaveCriticalSection (&wsqs_lock);
                        }
-                       LeaveCriticalSection (&wsqs_lock);
+                       if (need_one)
+                               threadpool_start_thread (tp);
                }
-               if (need_one)
-                       threadpool_start_thread (tp);
        }
 }
 
@@ -963,26 +983,31 @@ threadpool_kill_idle_threads (ThreadPool *tp)
 void
 mono_thread_pool_cleanup (void)
 {
-       if (!(async_tp.pool_status == 0 || async_tp.pool_status == 2)) {
-               if (!(async_tp.pool_status == 1 && InterlockedCompareExchange (&async_tp.pool_status, 2, 1) == 2)) {
-                       InterlockedExchange (&async_io_tp.pool_status, 2);
-                       threadpool_free_queue (&async_tp);
-                       threadpool_kill_idle_threads (&async_tp);
-
-                       socket_io_cleanup (&socket_io_data); /* Empty when DISABLE_SOCKETS is defined */
-                       threadpool_free_queue (&async_io_tp);
-                       threadpool_kill_idle_threads (&async_io_tp);
-                       MONO_SEM_DESTROY (&async_io_tp.new_job);
-               }
+       if (InterlockedExchange (&async_io_tp.pool_status, 2) == 1) {
+               socket_io_cleanup (&socket_io_data); /* Empty when DISABLE_SOCKETS is defined */
+               threadpool_kill_idle_threads (&async_io_tp);
        }
 
-       EnterCriticalSection (&wsqs_lock);
-       mono_wsq_cleanup ();
-       if (wsqs)
-               g_ptr_array_free (wsqs, TRUE);
-       wsqs = NULL;
-       LeaveCriticalSection (&wsqs_lock);
-       MONO_SEM_DESTROY (&async_tp.new_job);
+       if (async_io_tp.queue != NULL) {
+               MONO_SEM_DESTROY (&async_io_tp.new_job);
+               threadpool_free_queue (&async_io_tp);
+       }
+
+
+       if (InterlockedExchange (&async_tp.pool_status, 2) == 1) {
+               threadpool_kill_idle_threads (&async_tp);
+               threadpool_free_queue (&async_tp);
+       }
+
+       if (wsqs) {
+               EnterCriticalSection (&wsqs_lock);
+               mono_wsq_cleanup ();
+               if (wsqs)
+                       g_ptr_array_free (wsqs, TRUE);
+               wsqs = NULL;
+               LeaveCriticalSection (&wsqs_lock);
+               MONO_SEM_DESTROY (&async_tp.new_job);
+       }
 }
 
 static gboolean
@@ -1033,8 +1058,10 @@ threadpool_append_jobs (ThreadPool *tp, MonoObject **jobs, gint njobs)
                return;
 
        if (tp->pool_status == 0 && InterlockedCompareExchange (&tp->pool_status, 1, 0) == 0) {
-               if (!tp->is_io)
-                       mono_thread_create_internal (mono_get_root_domain (), monitor_thread, tp, TRUE, SMALL_STACK);
+               if (!tp->is_io) {
+                       mono_thread_create_internal (mono_get_root_domain (), monitor_thread, NULL, TRUE, SMALL_STACK);
+                       threadpool_start_thread (tp);
+               }
                /* Create on demand up to min_threads to avoid startup penalty for apps that don't use
                 * the threadpool that much
                * mono_thread_create_internal (mono_get_root_domain (), threadpool_start_idle_threads, tp, TRUE, SMALL_STACK);
@@ -1066,17 +1093,14 @@ threadpool_clear_queue (ThreadPool *tp, MonoDomain *domain)
 {
        MonoObject *obj;
        MonoMList *other;
-       int domain_count;
 
        other = NULL;
-       domain_count = 0;
        while (mono_cq_dequeue (tp->queue, &obj)) {
-               if (obj != NULL && obj->vtable->domain == domain) {
-                       domain_count++;
-                       threadpool_jobs_dec (obj);
-               } else if (obj != NULL) {
+               if (obj == NULL)
+                       continue;
+               if (obj->vtable->domain != domain)
                        other = mono_mlist_prepend (other, obj);
-               }
+               threadpool_jobs_dec (obj);
        }
 
        while (other) {
@@ -1201,7 +1225,7 @@ remove_wsq (MonoWSQ *wsq)
 }
 
 static void
-try_steal (gpointer *data, gboolean retry)
+try_steal (MonoWSQ *local_wsq, gpointer *data, gboolean retry)
 {
        int i;
        int ms;
@@ -1213,27 +1237,33 @@ try_steal (gpointer *data, gboolean retry)
        do {
                if (mono_runtime_is_shutting_down ())
                        return;
+
+               EnterCriticalSection (&wsqs_lock);
                for (i = 0; wsqs != NULL && i < wsqs->len; i++) {
-                       if (mono_runtime_is_shutting_down ()) {
-                               return;
-                       }
+                       MonoWSQ *wsq;
+
+                       wsq = wsqs->pdata [i];
+                       if (wsq == local_wsq || mono_wsq_count (wsq) == 0)
+                               continue;
                        mono_wsq_try_steal (wsqs->pdata [i], data, ms);
                        if (*data != NULL) {
+                               LeaveCriticalSection (&wsqs_lock);
                                return;
                        }
                }
+               LeaveCriticalSection (&wsqs_lock);
                ms += 10;
        } while (retry && ms < 11);
 }
 
 static gboolean
-dequeue_or_steal (ThreadPool *tp, gpointer *data)
+dequeue_or_steal (ThreadPool *tp, gpointer *data, MonoWSQ *local_wsq)
 {
        if (mono_runtime_is_shutting_down ())
                return FALSE;
        mono_cq_dequeue (tp->queue, (MonoObject **) data);
        if (!tp->is_io && !*data)
-               try_steal (data, FALSE);
+               try_steal (local_wsq, data, FALSE);
        return (*data != NULL);
 }
 
@@ -1343,7 +1373,7 @@ async_invoke_thread (gpointer data)
 
        mono_profiler_thread_start (thread->tid);
        name = (tp->is_io) ? "IO Threadpool worker" : "Threadpool worker";
-       ves_icall_System_Threading_Thread_SetName_internal (thread, mono_string_new (mono_domain_get (), name));
+       mono_thread_set_name_internal (thread, mono_string_new (mono_domain_get (), name), FALSE);
 
        if (tp_start_func)
                tp_start_func (tp_hooks_user_data);
@@ -1409,19 +1439,8 @@ async_invoke_thread (gpointer data)
                                        exc = mono_async_invoke (tp, ar);
                                        if (tp_item_end_func)
                                                tp_item_end_func (tp_item_user_data);
-                                       if (exc && mono_runtime_unhandled_exception_policy_get () == MONO_UNHANDLED_POLICY_CURRENT) {
-                                               gboolean unloaded;
-                                               MonoClass *klass;
-
-                                               klass = exc->vtable->klass;
-                                               unloaded = is_appdomainunloaded_exception (exc->vtable->domain, klass);
-                                               if (!unloaded && klass != mono_defaults.threadabortexception_class) {
-                                                       mono_unhandled_exception (exc);
-                                                       exit (255);
-                                               }
-                                               if (klass == mono_defaults.threadabortexception_class)
-                                                       mono_thread_internal_reset_abort (thread);
-                                       }
+                                       if (exc)
+                                               mono_internal_thread_unhandled_exception (exc);
                                        if (is_socket && tp->is_io) {
                                                MonoSocketAsyncResult *state = (MonoSocketAsyncResult *) data;
 
@@ -1447,17 +1466,28 @@ async_invoke_thread (gpointer data)
                data = NULL;
                must_die = should_i_die (tp);
                if (!must_die && (tp->is_io || !mono_wsq_local_pop (&data)))
-                       dequeue_or_steal (tp, &data);
+                       dequeue_or_steal (tp, &data, wsq);
 
                n_naps = 0;
                while (!must_die && !data && n_naps < 4) {
                        gboolean res;
 
                        InterlockedIncrement (&tp->waiting);
+
+                       // Another thread may have added a job into its wsq since the last call to dequeue_or_steal
+                       // Check all the queues again before entering the wait loop
+                       dequeue_or_steal (tp, &data, wsq);
+                       if (data) {
+                               InterlockedDecrement (&tp->waiting);
+                               break;
+                       }
+
+                       mono_gc_set_skip_thread (TRUE);
+
 #if defined(__OpenBSD__)
-                       while ((res = mono_sem_wait (&tp->new_job, TRUE)) == -1) {// && errno == EINTR) {
+                       while (mono_cq_count (tp->queue) == 0 && (res = mono_sem_wait (&tp->new_job, TRUE)) == -1) {// && errno == EINTR) {
 #else
-                       while ((res = mono_sem_timedwait (&tp->new_job, 2000, TRUE)) == -1) {// && errno == EINTR) {
+                       while (mono_cq_count (tp->queue) == 0 && (res = mono_sem_timedwait (&tp->new_job, 2000, TRUE)) == -1) {// && errno == EINTR) {
 #endif
                                if (mono_runtime_is_shutting_down ())
                                        break;
@@ -1465,10 +1495,13 @@ async_invoke_thread (gpointer data)
                                        mono_thread_interruption_checkpoint ();
                        }
                        InterlockedDecrement (&tp->waiting);
+
+                       mono_gc_set_skip_thread (FALSE);
+
                        if (mono_runtime_is_shutting_down ())
                                break;
                        must_die = should_i_die (tp);
-                       dequeue_or_steal (tp, &data);
+                       dequeue_or_steal (tp, &data, wsq);
                        n_naps++;
                }
 
@@ -1608,3 +1641,21 @@ mono_install_threadpool_item_hooks (MonoThreadPoolItemFunc begin_func, MonoThrea
        tp_item_user_data = user_data;
 }
 
+void
+mono_internal_thread_unhandled_exception (MonoObject* exc)
+{
+       if (mono_runtime_unhandled_exception_policy_get () == MONO_UNHANDLED_POLICY_CURRENT) {
+               gboolean unloaded;
+               MonoClass *klass;
+
+               klass = exc->vtable->klass;
+               unloaded = is_appdomainunloaded_exception (exc->vtable->domain, klass);
+               if (!unloaded && klass != mono_defaults.threadabortexception_class) {
+                       mono_unhandled_exception (exc);
+                       if (mono_environment_exitcode_get () == 1)
+                               exit (255);
+               }
+               if (klass == mono_defaults.threadabortexception_class)
+                mono_thread_internal_reset_abort (mono_thread_internal_current ());
+       }
+}