*
* Copyright 2001-2003 Ximian, Inc (http://www.ximian.com)
* Copyright 2004-2010 Novell, Inc (http://www.novell.com)
+ * Copyright 2001 Xamarin Inc (http://www.xamarin.com)
*/
#include <config.h>
#include <mono/metadata/threads-types.h>
#include <mono/metadata/threadpool-internals.h>
#include <mono/metadata/exception.h>
+#include <mono/metadata/environment.h>
#include <mono/metadata/mono-mlist.h>
#include <mono/metadata/mono-perfcounters.h>
#include <mono/metadata/socket-io.h>
#include <mono/utils/mono-time.h>
#include <mono/utils/mono-proclib.h>
#include <mono/utils/mono-semaphore.h>
+#include <mono/utils/atomic.h>
#include <errno.h>
#ifdef HAVE_SYS_TIME_H
#include <sys/time.h>
static GPtrArray *wsqs;
CRITICAL_SECTION wsqs_lock;
+static gboolean suspended;
/* Hooks */
static MonoThreadPoolFunc tp_start_func;
#include <mono/metadata/tpool-poll.c>
#ifdef HAVE_EPOLL
#include <mono/metadata/tpool-epoll.c>
-#elif defined(HAVE_KQUEUE)
+#elif defined(USE_KQUEUE_FOR_THREADPOOL)
#include <mono/metadata/tpool-kqueue.c>
#endif
/*
/*
* Note that we call it is_socket_type() where 'socket' refers to the image
* that contains the System.Net.Sockets.Socket type.
- * For moonlight there is a System.Net.Sockets.Socket class in both System.dll and System.Net.dll.
*/
static gboolean
is_socket_type (MonoDomain *domain, MonoClass *klass)
{
- static const char *version = NULL;
- static gboolean moonlight;
-
- if (is_system_type (domain, klass))
- return TRUE;
-
- /* If moonlight, check if the type is in System.Net.dll too */
- if (version == NULL) {
- version = mono_get_runtime_info ()->framework_version;
- moonlight = !strcmp (version, "2.1");
- }
-
- if (!moonlight)
- return FALSE;
-
- if (domain->system_net_dll == NULL)
- domain->system_net_dll = mono_image_loaded ("System.Net");
-
- return klass->image == domain->system_net_dll;
+ return is_system_type (domain, klass);
}
#define check_type_cached(domain, ASSEMBLY, _class, _namespace, _name, loc) do { \
static gboolean
threadpool_jobs_dec (MonoObject *obj)
{
- MonoDomain *domain = obj->vtable->domain;
- int remaining_jobs = InterlockedDecrement (&domain->threadpool_jobs);
+ MonoDomain *domain;
+ int remaining_jobs;
+
+ if (obj == NULL)
+ return FALSE;
+
+ domain = obj->vtable->domain;
+ remaining_jobs = InterlockedDecrement (&domain->threadpool_jobs);
if (remaining_jobs == 0 && domain->cleanup_semaphore) {
ReleaseSemaphore (domain->cleanup_semaphore, 1, NULL);
return TRUE;
init_event_system (SocketIOData *data)
{
#ifdef HAVE_EPOLL
- if (data->event_system == EPOLL_BACKEND)
+ if (data->event_system == EPOLL_BACKEND) {
data->event_data = tp_epoll_init (data);
-#elif defined(HAVE_KQUEUE)
+ if (data->event_data == NULL) {
+ if (g_getenv ("MONO_DEBUG"))
+ g_message ("Falling back to poll()");
+ data->event_system = POLL_BACKEND;
+ }
+ }
+#elif defined(USE_KQUEUE_FOR_THREADPOOL)
if (data->event_system == KQUEUE_BACKEND)
data->event_data = tp_kqueue_init (data);
#endif
data->sock_to_state = mono_g_hash_table_new_type (g_direct_hash, g_direct_equal, MONO_HASH_VALUE_GC);
#ifdef HAVE_EPOLL
data->event_system = EPOLL_BACKEND;
-#elif defined(HAVE_KQUEUE)
+#elif defined(USE_KQUEUE_FOR_THREADPOOL)
data->event_system = KQUEUE_BACKEND;
#else
data->event_system = POLL_BACKEND;
data->event_system = POLL_BACKEND;
init_event_system (data);
- mono_thread_create_internal (mono_get_root_domain (), data->wait, data, TRUE, SMALL_STACK);
+ mono_thread_create_internal (mono_get_root_domain (), data->wait, data, TRUE, FALSE, SMALL_STACK);
LeaveCriticalSection (&data->io_lock);
data->inited = 2;
threadpool_start_thread (&async_io_tp);
if (ac == NULL) {
/* Fast path from ThreadPool.*QueueUserWorkItem */
void *pa = ares->async_state;
- mono_runtime_delegate_invoke (ares->async_delegate, &pa, &exc);
+ res = mono_runtime_delegate_invoke (ares->async_delegate, &pa, &exc);
} else {
MonoObject *cb_exc = NULL;
void *pa = &ares;
cb_exc = NULL;
mono_runtime_invoke (ac->cb_method, ac->cb_target, pa, &cb_exc);
- MONO_OBJECT_SETREF (ac->msg, exc, cb_exc);
exc = cb_exc;
} else {
exc = NULL;
if (InterlockedCompareExchange (&tp->nthreads, n + 1, n) == n)
break;
}
+#ifndef DISABLE_PERFCOUNTERS
mono_perfcounter_update_value (tp->pc_nthreads, TRUE, 1);
- mono_thread_create_internal (mono_get_root_domain (), tp->async_invoke, tp, TRUE, stack_size);
+#endif
+ mono_thread_create_internal (mono_get_root_domain (), tp->async_invoke, tp, TRUE, FALSE, stack_size);
SleepEx (100, TRUE);
} while (1);
}
MONO_SEM_INIT (&tp->new_job, 0);
}
+#ifndef DISABLE_PERFCOUNTERS
static void *
init_perf_counter (const char *category, const char *counter)
{
machine = mono_string_new (root, ".");
return mono_perfcounter_get_impl (category_str, counter_str, NULL, machine, &type, &custom);
}
+#endif
#ifdef DEBUG
static void
if (mono_runtime_is_shutting_down ())
break;
+ if (suspended)
+ continue;
+
for (i = 0; i < 2; i++) {
ThreadPool *tp;
tp = pools [i];
wsqs = g_ptr_array_sized_new (MAX (100 * cpu_count, thread_count));
mono_wsq_init ();
+#ifndef DISABLE_PERFCOUNTERS
async_tp.pc_nitems = init_perf_counter ("Mono Threadpool", "Work Items Added");
g_assert (async_tp.pc_nitems);
async_io_tp.pc_nthreads = init_perf_counter ("Mono Threadpool", "# of IO Threads");
g_assert (async_io_tp.pc_nthreads);
+#endif
tp_inited = 2;
#ifdef DEBUG
signal (SIGALRM, signal_handler);
stack_size = (!tp->is_io) ? 0 : SMALL_STACK;
while (!mono_runtime_is_shutting_down () && (n = tp->nthreads) < tp->max_threads) {
if (InterlockedCompareExchange (&tp->nthreads, n + 1, n) == n) {
+#ifndef DISABLE_PERFCOUNTERS
mono_perfcounter_update_value (tp->pc_nthreads, TRUE, 1);
- mono_thread_create_internal (mono_get_root_domain (), tp->async_invoke, tp, TRUE, stack_size);
+#endif
+ mono_thread_create_internal (mono_get_root_domain (), tp->async_invoke, tp, TRUE, FALSE, stack_size);
return TRUE;
}
}
return;
if (tp->pool_status == 0 && InterlockedCompareExchange (&tp->pool_status, 1, 0) == 0) {
- if (!tp->is_io)
- mono_thread_create_internal (mono_get_root_domain (), monitor_thread, NULL, TRUE, SMALL_STACK);
+ if (!tp->is_io) {
+ mono_thread_create_internal (mono_get_root_domain (), monitor_thread, NULL, TRUE, FALSE, SMALL_STACK);
+ threadpool_start_thread (tp);
+ }
/* Create on demand up to min_threads to avoid startup penalty for apps that don't use
* the threadpool that much
- * mono_thread_create_internal (mono_get_root_domain (), threadpool_start_idle_threads, tp, TRUE, SMALL_STACK);
+ * mono_thread_create_internal (mono_get_root_domain (), threadpool_start_idle_threads, tp, TRUE, FALSE, SMALL_STACK);
*/
}
o->add_time = mono_100ns_ticks ();
}
threadpool_jobs_inc (ar);
+#ifndef DISABLE_PERFCOUNTERS
mono_perfcounter_update_value (tp->pc_nitems, TRUE, 1);
+#endif
if (!tp->is_io && mono_wsq_local_push (ar))
continue;
}
}
+static gboolean
+remove_sockstate_for_domain (gpointer key, gpointer value, gpointer user_data)
+{
+ MonoMList *list = value;
+ gboolean remove = FALSE;
+ while (list) {
+ MonoObject *data = mono_mlist_get_data (list);
+ if (mono_object_domain (data) == user_data) {
+ remove = TRUE;
+ mono_mlist_set_data (list, NULL);
+ }
+ list = mono_mlist_next (list);
+ }
+ //FIXME is there some sort of additional unregistration we need to perform here?
+ return remove;
+}
+
/*
* Clean up the threadpool of all domain jobs.
* Can only be called as part of the domain unloading process as
threadpool_clear_queue (&async_tp, domain);
threadpool_clear_queue (&async_io_tp, domain);
+ EnterCriticalSection (&socket_io_data.io_lock);
+ if (socket_io_data.sock_to_state)
+ mono_g_hash_table_foreach_remove (socket_io_data.sock_to_state, remove_sockstate_for_domain, domain);
+
+ LeaveCriticalSection (&socket_io_data.io_lock);
+
/*
* There might be some threads out that could be about to execute stuff from the given domain.
* We avoid that by setting up a semaphore to be pulsed by the thread that reaches zero.
mono_profiler_thread_start (thread->tid);
name = (tp->is_io) ? "IO Threadpool worker" : "Threadpool worker";
- ves_icall_System_Threading_Thread_SetName_internal (thread, mono_string_new (mono_domain_get (), name));
+ mono_thread_set_name_internal (thread, mono_string_new (mono_domain_get (), name), FALSE);
if (tp_start_func)
tp_start_func (tp_hooks_user_data);
exc = mono_async_invoke (tp, ar);
if (tp_item_end_func)
tp_item_end_func (tp_item_user_data);
- if (exc && mono_runtime_unhandled_exception_policy_get () == MONO_UNHANDLED_POLICY_CURRENT) {
- gboolean unloaded;
- MonoClass *klass;
-
- klass = exc->vtable->klass;
- unloaded = is_appdomainunloaded_exception (exc->vtable->domain, klass);
- if (!unloaded && klass != mono_defaults.threadabortexception_class) {
- mono_unhandled_exception (exc);
- exit (255);
- }
- if (klass == mono_defaults.threadabortexception_class)
- mono_thread_internal_reset_abort (thread);
- }
+ if (exc)
+ mono_internal_thread_unhandled_exception (exc);
if (is_socket && tp->is_io) {
MonoSocketAsyncResult *state = (MonoSocketAsyncResult *) data;
gboolean res;
InterlockedIncrement (&tp->waiting);
+
+ // Another thread may have added a job into its wsq since the last call to dequeue_or_steal
+ // Check all the queues again before entering the wait loop
+ dequeue_or_steal (tp, &data, wsq);
+ if (data) {
+ InterlockedDecrement (&tp->waiting);
+ break;
+ }
+
+ mono_gc_set_skip_thread (TRUE);
+
#if defined(__OpenBSD__)
- while ((res = mono_sem_wait (&tp->new_job, TRUE)) == -1) {// && errno == EINTR) {
+ while (mono_cq_count (tp->queue) == 0 && (res = mono_sem_wait (&tp->new_job, TRUE)) == -1) {// && errno == EINTR) {
#else
- while ((res = mono_sem_timedwait (&tp->new_job, 2000, TRUE)) == -1) {// && errno == EINTR) {
+ while (mono_cq_count (tp->queue) == 0 && (res = mono_sem_timedwait (&tp->new_job, 2000, TRUE)) == -1) {// && errno == EINTR) {
#endif
if (mono_runtime_is_shutting_down ())
break;
mono_thread_interruption_checkpoint ();
}
InterlockedDecrement (&tp->waiting);
+
+ mono_gc_set_skip_thread (FALSE);
+
if (mono_runtime_is_shutting_down ())
break;
must_die = should_i_die (tp);
if (!down && nt <= tp->min_threads)
break;
if (down || InterlockedCompareExchange (&tp->nthreads, nt - 1, nt) == nt) {
+#ifndef DISABLE_PERFCOUNTERS
mono_perfcounter_update_value (tp->pc_nthreads, TRUE, -1);
+#endif
if (!tp->is_io) {
remove_wsq (wsq);
}
InterlockedExchange (&async_tp.min_threads, workerThreads);
InterlockedExchange (&async_io_tp.min_threads, completionPortThreads);
if (workerThreads > async_tp.nthreads)
- mono_thread_create_internal (mono_get_root_domain (), threadpool_start_idle_threads, &async_tp, TRUE, SMALL_STACK);
+ mono_thread_create_internal (mono_get_root_domain (), threadpool_start_idle_threads, &async_tp, TRUE, FALSE, SMALL_STACK);
if (completionPortThreads > async_io_tp.nthreads)
- mono_thread_create_internal (mono_get_root_domain (), threadpool_start_idle_threads, &async_io_tp, TRUE, SMALL_STACK);
+ mono_thread_create_internal (mono_get_root_domain (), threadpool_start_idle_threads, &async_io_tp, TRUE, FALSE, SMALL_STACK);
return TRUE;
}
tp_item_user_data = user_data;
}
+void
+mono_internal_thread_unhandled_exception (MonoObject* exc)
+{
+ if (mono_runtime_unhandled_exception_policy_get () == MONO_UNHANDLED_POLICY_CURRENT) {
+ gboolean unloaded;
+ MonoClass *klass;
+
+ klass = exc->vtable->klass;
+ unloaded = is_appdomainunloaded_exception (exc->vtable->domain, klass);
+ if (!unloaded && klass != mono_defaults.threadabortexception_class) {
+ mono_unhandled_exception (exc);
+ if (mono_environment_exitcode_get () == 1)
+ exit (255);
+ }
+ if (klass == mono_defaults.threadabortexception_class)
+ mono_thread_internal_reset_abort (mono_thread_internal_current ());
+ }
+}
+
+/*
+ * Suspend creation of new threads.
+ */
+void
+mono_thread_pool_suspend (void)
+{
+ suspended = TRUE;
+}
+
+/*
+ * Resume creation of new threads.
+ */
+void
+mono_thread_pool_resume (void)
+{
+ suspended = FALSE;
+}