Fix warnings reported by clang.
[mono.git] / mono / metadata / threadpool.c
index 3edd5982bc5a1f61fb7c346d8f004e2a595d42f7..db6243b2133d2ca20b41d809a4321f74b78b74f2 100644 (file)
@@ -7,6 +7,7 @@
  *
  * Copyright 2001-2003 Ximian, Inc (http://www.ximian.com)
  * Copyright 2004-2010 Novell, Inc (http://www.novell.com)
+ * Copyright 2001 Xamarin Inc (http://www.xamarin.com)
  */
 
 #include <config.h>
 #include <mono/metadata/threads-types.h>
 #include <mono/metadata/threadpool-internals.h>
 #include <mono/metadata/exception.h>
+#include <mono/metadata/environment.h>
 #include <mono/metadata/mono-mlist.h>
 #include <mono/metadata/mono-perfcounters.h>
 #include <mono/metadata/socket-io.h>
 #include <mono/metadata/mono-cq.h>
 #include <mono/metadata/mono-wsq.h>
+#include <mono/metadata/mono-ptr-array.h>
 #include <mono/io-layer/io-layer.h>
 #include <mono/utils/mono-time.h>
 #include <mono/utils/mono-proclib.h>
@@ -64,6 +67,7 @@
                        } while (1)
 
 #define SPIN_UNLOCK(i) i = 0
+#define SMALL_STACK (128 * (sizeof (gpointer) / 4) * 1024)
 
 /* DEBUG: prints tp data every 2s */
 #undef DEBUG 
@@ -394,8 +398,14 @@ threadpool_jobs_inc (MonoObject *obj)
 static gboolean
 threadpool_jobs_dec (MonoObject *obj)
 {
-       MonoDomain *domain = obj->vtable->domain;
-       int remaining_jobs = InterlockedDecrement (&domain->threadpool_jobs);
+       MonoDomain *domain;
+       int remaining_jobs;
+
+       if (obj == NULL)
+               return FALSE;
+
+       domain = obj->vtable->domain;
+       remaining_jobs = InterlockedDecrement (&domain->threadpool_jobs);
        if (remaining_jobs == 0 && domain->cleanup_semaphore) {
                ReleaseSemaphore (domain->cleanup_semaphore, 1, NULL);
                return TRUE;
@@ -478,8 +488,14 @@ static void
 init_event_system (SocketIOData *data)
 {
 #ifdef HAVE_EPOLL
-       if (data->event_system == EPOLL_BACKEND)
+       if (data->event_system == EPOLL_BACKEND) {
                data->event_data = tp_epoll_init (data);
+               if (data->event_data == NULL) {
+                       if (g_getenv ("MONO_DEBUG"))
+                               g_message ("Falling back to poll()");
+                       data->event_system = POLL_BACKEND;
+               }
+       }
 #elif defined(HAVE_KQUEUE)
        if (data->event_system == KQUEUE_BACKEND)
                data->event_data = tp_kqueue_init (data);
@@ -492,7 +508,6 @@ static void
 socket_io_init (SocketIOData *data)
 {
        int inited;
-       guint32 stack_size;
 
        if (data->inited >= 2) // 2 -> initialized, 3-> cleaned up
                return;
@@ -519,12 +534,10 @@ socket_io_init (SocketIOData *data)
                data->event_system = POLL_BACKEND;
 
        init_event_system (data);
-       stack_size = mono_threads_get_default_stacksize ();
-       mono_threads_set_default_stacksize (128 * (sizeof (gpointer) / 4) * 1024);
-       mono_thread_create_internal (mono_get_root_domain (), data->wait, data, TRUE);
-       mono_threads_set_default_stacksize (stack_size);
+       mono_thread_create_internal (mono_get_root_domain (), data->wait, data, TRUE, SMALL_STACK);
        LeaveCriticalSection (&data->io_lock);
        data->inited = 2;
+       threadpool_start_thread (&async_io_tp);
 }
 
 static void
@@ -660,9 +673,9 @@ static void
 threadpool_start_idle_threads (ThreadPool *tp)
 {
        int n;
+       guint32 stack_size;
 
-       if (tp->pool_status == 1 && !tp->is_io)
-               mono_thread_create_internal (mono_get_root_domain (), monitor_thread, tp, TRUE);
+       stack_size = (!tp->is_io) ? 0 : SMALL_STACK;
        do {
                while (1) {
                        n = tp->nthreads;
@@ -672,7 +685,7 @@ threadpool_start_idle_threads (ThreadPool *tp)
                                break;
                }
                mono_perfcounter_update_value (tp->pc_nthreads, TRUE, 1);
-               mono_thread_create_internal (mono_get_root_domain (), tp->async_invoke, tp, TRUE);
+               mono_thread_create_internal (mono_get_root_domain (), tp->async_invoke, tp, TRUE, stack_size);
                SleepEx (100, TRUE);
        } while (1);
 }
@@ -751,15 +764,16 @@ signal_handler (int signo)
 #endif
 
 static void
-monitor_thread (gpointer data)
+monitor_thread (gpointer unused)
 {
-       ThreadPool *tp;
+       ThreadPool *pools [2];
        MonoInternalThread *thread;
        guint32 ms;
        gboolean need_one;
        int i;
 
-       tp = data;
+       pools [0] = &async_tp;
+       pools [1] = &async_io_tp;
        thread = mono_thread_internal_current ();
        ves_icall_System_Threading_Thread_SetName_internal (thread, mono_string_new (mono_domain_get (), "Threadpool monitor"));
        while (1) {
@@ -778,23 +792,28 @@ monitor_thread (gpointer data)
 
                if (mono_runtime_is_shutting_down ())
                        break;
-               if (tp->waiting > 0)
-                       continue;
-               need_one = (mono_cq_count (tp->queue) > 0);
-               if (!need_one) {
-                       EnterCriticalSection (&wsqs_lock);
-                       for (i = 0; wsqs != NULL && i < wsqs->len; i++) {
-                               MonoWSQ *wsq;
-                               wsq = g_ptr_array_index (wsqs, i);
-                               if (mono_wsq_count (wsq) > 0) {
-                                       need_one = TRUE;
-                                       break;
+
+               for (i = 0; i < 2; i++) {
+                       ThreadPool *tp;
+                       tp = pools [i];
+                       if (tp->waiting > 0)
+                               continue;
+                       need_one = (mono_cq_count (tp->queue) > 0);
+                       if (!need_one && !tp->is_io) {
+                               EnterCriticalSection (&wsqs_lock);
+                               for (i = 0; wsqs != NULL && i < wsqs->len; i++) {
+                                       MonoWSQ *wsq;
+                                       wsq = g_ptr_array_index (wsqs, i);
+                                       if (mono_wsq_count (wsq) != 0) {
+                                               need_one = TRUE;
+                                               break;
+                                       }
                                }
+                               LeaveCriticalSection (&wsqs_lock);
                        }
-                       LeaveCriticalSection (&wsqs_lock);
+                       if (need_one)
+                               threadpool_start_thread (tp);
                }
-               if (need_one)
-                       threadpool_start_thread (tp);
        }
 }
 
@@ -965,37 +984,44 @@ threadpool_kill_idle_threads (ThreadPool *tp)
 void
 mono_thread_pool_cleanup (void)
 {
-       if (!(async_tp.pool_status == 0 || async_tp.pool_status == 2)) {
-               if (!(async_tp.pool_status == 1 && InterlockedCompareExchange (&async_tp.pool_status, 2, 1) == 2)) {
-                       InterlockedExchange (&async_io_tp.pool_status, 2);
-                       threadpool_free_queue (&async_tp);
-                       threadpool_kill_idle_threads (&async_tp);
-
-                       socket_io_cleanup (&socket_io_data); /* Empty when DISABLE_SOCKETS is defined */
-                       threadpool_free_queue (&async_io_tp);
-                       threadpool_kill_idle_threads (&async_io_tp);
-                       MONO_SEM_DESTROY (&async_io_tp.new_job);
-               }
+       if (InterlockedExchange (&async_io_tp.pool_status, 2) == 1) {
+               socket_io_cleanup (&socket_io_data); /* Empty when DISABLE_SOCKETS is defined */
+               threadpool_kill_idle_threads (&async_io_tp);
        }
 
-       EnterCriticalSection (&wsqs_lock);
-       mono_wsq_cleanup ();
-       if (wsqs)
-               g_ptr_array_free (wsqs, TRUE);
-       wsqs = NULL;
-       LeaveCriticalSection (&wsqs_lock);
-       MONO_SEM_DESTROY (&async_tp.new_job);
+       if (async_io_tp.queue != NULL) {
+               MONO_SEM_DESTROY (&async_io_tp.new_job);
+               threadpool_free_queue (&async_io_tp);
+       }
+
+
+       if (InterlockedExchange (&async_tp.pool_status, 2) == 1) {
+               threadpool_kill_idle_threads (&async_tp);
+               threadpool_free_queue (&async_tp);
+       }
+
+       if (wsqs) {
+               EnterCriticalSection (&wsqs_lock);
+               mono_wsq_cleanup ();
+               if (wsqs)
+                       g_ptr_array_free (wsqs, TRUE);
+               wsqs = NULL;
+               LeaveCriticalSection (&wsqs_lock);
+               MONO_SEM_DESTROY (&async_tp.new_job);
+       }
 }
 
 static gboolean
 threadpool_start_thread (ThreadPool *tp)
 {
        gint n;
+       guint32 stack_size;
 
+       stack_size = (!tp->is_io) ? 0 : SMALL_STACK;
        while (!mono_runtime_is_shutting_down () && (n = tp->nthreads) < tp->max_threads) {
                if (InterlockedCompareExchange (&tp->nthreads, n + 1, n) == n) {
                        mono_perfcounter_update_value (tp->pc_nthreads, TRUE, 1);
-                       mono_thread_create_internal (mono_get_root_domain (), tp->async_invoke, tp, TRUE);
+                       mono_thread_create_internal (mono_get_root_domain (), tp->async_invoke, tp, TRUE, stack_size);
                        return TRUE;
                }
        }
@@ -1013,7 +1039,7 @@ pulse_on_new_job (ThreadPool *tp)
 void
 icall_append_job (MonoObject *ar)
 {
-       threadpool_append_job (&async_tp, ar);
+       threadpool_append_jobs (&async_tp, &ar, 1);
 }
 
 static void
@@ -1032,8 +1058,16 @@ threadpool_append_jobs (ThreadPool *tp, MonoObject **jobs, gint njobs)
        if (mono_runtime_is_shutting_down ())
                return;
 
-       if (tp->pool_status == 0 && InterlockedCompareExchange (&tp->pool_status, 1, 0) == 0)
-               mono_thread_create_internal (mono_get_root_domain (), threadpool_start_idle_threads, tp, TRUE);
+       if (tp->pool_status == 0 && InterlockedCompareExchange (&tp->pool_status, 1, 0) == 0) {
+               if (!tp->is_io) {
+                       mono_thread_create_internal (mono_get_root_domain (), monitor_thread, NULL, TRUE, SMALL_STACK);
+                       threadpool_start_thread (tp);
+               }
+               /* Create on demand up to min_threads to avoid startup penalty for apps that don't use
+                * the threadpool that much
+               * mono_thread_create_internal (mono_get_root_domain (), threadpool_start_idle_threads, tp, TRUE, SMALL_STACK);
+               */
+       }
 
        for (i = 0; i < njobs; i++) {
                ar = jobs [i];
@@ -1051,7 +1085,7 @@ threadpool_append_jobs (ThreadPool *tp, MonoObject **jobs, gint njobs)
                mono_cq_enqueue (tp->queue, ar);
        }
 
-       for (i = 0; i < MIN(njobs, tp->max_threads); i++)
+       for (i = 0; tp->waiting > 0 && i < MIN(njobs, tp->max_threads); i++)
                pulse_on_new_job (tp);
 }
 
@@ -1060,17 +1094,14 @@ threadpool_clear_queue (ThreadPool *tp, MonoDomain *domain)
 {
        MonoObject *obj;
        MonoMList *other;
-       int domain_count;
 
        other = NULL;
-       domain_count = 0;
        while (mono_cq_dequeue (tp->queue, &obj)) {
-               if (obj != NULL && obj->vtable->domain == domain) {
-                       domain_count++;
-                       threadpool_jobs_dec (obj);
-               } else if (obj != NULL) {
+               if (obj == NULL)
+                       continue;
+               if (obj->vtable->domain != domain)
                        other = mono_mlist_prepend (other, obj);
-               }
+               threadpool_jobs_dec (obj);
        }
 
        while (other) {
@@ -1195,7 +1226,7 @@ remove_wsq (MonoWSQ *wsq)
 }
 
 static void
-try_steal (gpointer *data, gboolean retry)
+try_steal (MonoWSQ *local_wsq, gpointer *data, gboolean retry)
 {
        int i;
        int ms;
@@ -1207,27 +1238,33 @@ try_steal (gpointer *data, gboolean retry)
        do {
                if (mono_runtime_is_shutting_down ())
                        return;
+
+               EnterCriticalSection (&wsqs_lock);
                for (i = 0; wsqs != NULL && i < wsqs->len; i++) {
-                       if (mono_runtime_is_shutting_down ()) {
-                               return;
-                       }
+                       MonoWSQ *wsq;
+
+                       wsq = wsqs->pdata [i];
+                       if (wsq == local_wsq || mono_wsq_count (wsq) == 0)
+                               continue;
                        mono_wsq_try_steal (wsqs->pdata [i], data, ms);
                        if (*data != NULL) {
+                               LeaveCriticalSection (&wsqs_lock);
                                return;
                        }
                }
+               LeaveCriticalSection (&wsqs_lock);
                ms += 10;
        } while (retry && ms < 11);
 }
 
 static gboolean
-dequeue_or_steal (ThreadPool *tp, gpointer *data)
+dequeue_or_steal (ThreadPool *tp, gpointer *data, MonoWSQ *local_wsq)
 {
        if (mono_runtime_is_shutting_down ())
                return FALSE;
        mono_cq_dequeue (tp->queue, (MonoObject **) data);
        if (!tp->is_io && !*data)
-               try_steal (data, FALSE);
+               try_steal (local_wsq, data, FALSE);
        return (*data != NULL);
 }
 
@@ -1326,6 +1363,7 @@ async_invoke_thread (gpointer data)
        MonoWSQ *wsq;
        ThreadPool *tp;
        gboolean must_die;
+       const gchar *name;
   
        tp = data;
        wsq = NULL;
@@ -1335,7 +1373,8 @@ async_invoke_thread (gpointer data)
        thread = mono_thread_internal_current ();
 
        mono_profiler_thread_start (thread->tid);
-       ves_icall_System_Threading_Thread_SetName_internal (thread, mono_string_new (mono_domain_get (), "Threadpool worker"));
+       name = (tp->is_io) ? "IO Threadpool worker" : "Threadpool worker";
+       ves_icall_System_Threading_Thread_SetName_internal (thread, mono_string_new (mono_domain_get (), name));
 
        if (tp_start_func)
                tp_start_func (tp_hooks_user_data);
@@ -1409,7 +1448,8 @@ async_invoke_thread (gpointer data)
                                                unloaded = is_appdomainunloaded_exception (exc->vtable->domain, klass);
                                                if (!unloaded && klass != mono_defaults.threadabortexception_class) {
                                                        mono_unhandled_exception (exc);
-                                                       exit (255);
+                                                       if (mono_environment_exitcode_get () == 1)
+                                                               exit (255);
                                                }
                                                if (klass == mono_defaults.threadabortexception_class)
                                                        mono_thread_internal_reset_abort (thread);
@@ -1439,17 +1479,19 @@ async_invoke_thread (gpointer data)
                data = NULL;
                must_die = should_i_die (tp);
                if (!must_die && (tp->is_io || !mono_wsq_local_pop (&data)))
-                       dequeue_or_steal (tp, &data);
+                       dequeue_or_steal (tp, &data, wsq);
 
                n_naps = 0;
                while (!must_die && !data && n_naps < 4) {
                        gboolean res;
 
+                       mono_gc_set_skip_thread (TRUE);
+
                        InterlockedIncrement (&tp->waiting);
 #if defined(__OpenBSD__)
-                       while ((res = mono_sem_wait (&tp->new_job, TRUE)) == -1) {// && errno == EINTR) {
+                       while (mono_cq_count (tp->queue) == 0 && (res = mono_sem_wait (&tp->new_job, TRUE)) == -1) {// && errno == EINTR) {
 #else
-                       while ((res = mono_sem_timedwait (&tp->new_job, 2000, TRUE)) == -1) {// && errno == EINTR) {
+                       while (mono_cq_count (tp->queue) == 0 && (res = mono_sem_timedwait (&tp->new_job, 2000, TRUE)) == -1) {// && errno == EINTR) {
 #endif
                                if (mono_runtime_is_shutting_down ())
                                        break;
@@ -1457,10 +1499,13 @@ async_invoke_thread (gpointer data)
                                        mono_thread_interruption_checkpoint ();
                        }
                        InterlockedDecrement (&tp->waiting);
+
+                       mono_gc_set_skip_thread (FALSE);
+
                        if (mono_runtime_is_shutting_down ())
                                break;
                        must_die = should_i_die (tp);
-                       dequeue_or_steal (tp, &data);
+                       dequeue_or_steal (tp, &data, wsq);
                        n_naps++;
                }
 
@@ -1537,9 +1582,9 @@ ves_icall_System_Threading_ThreadPool_SetMinThreads (gint workerThreads, gint co
        InterlockedExchange (&async_tp.min_threads, workerThreads);
        InterlockedExchange (&async_io_tp.min_threads, completionPortThreads);
        if (workerThreads > async_tp.nthreads)
-               mono_thread_create_internal (mono_get_root_domain (), threadpool_start_idle_threads, &async_tp, TRUE);
+               mono_thread_create_internal (mono_get_root_domain (), threadpool_start_idle_threads, &async_tp, TRUE, SMALL_STACK);
        if (completionPortThreads > async_io_tp.nthreads)
-               mono_thread_create_internal (mono_get_root_domain (), threadpool_start_idle_threads, &async_io_tp, TRUE);
+               mono_thread_create_internal (mono_get_root_domain (), threadpool_start_idle_threads, &async_io_tp, TRUE, SMALL_STACK);
        return TRUE;
 }