*
* Copyright 2001-2003 Ximian, Inc (http://www.ximian.com)
* Copyright 2004-2010 Novell, Inc (http://www.novell.com)
+ * Copyright 2001 Xamarin Inc (http://www.xamarin.com)
*/
#include <config.h>
#include <mono/metadata/threads-types.h>
#include <mono/metadata/threadpool-internals.h>
#include <mono/metadata/exception.h>
+#include <mono/metadata/environment.h>
#include <mono/metadata/mono-mlist.h>
#include <mono/metadata/mono-perfcounters.h>
#include <mono/metadata/socket-io.h>
#include <mono/metadata/mono-cq.h>
#include <mono/metadata/mono-wsq.h>
+#include <mono/metadata/mono-ptr-array.h>
#include <mono/io-layer/io-layer.h>
#include <mono/utils/mono-time.h>
#include <mono/utils/mono-proclib.h>
static gboolean
threadpool_jobs_dec (MonoObject *obj)
{
- MonoDomain *domain = obj->vtable->domain;
- int remaining_jobs = InterlockedDecrement (&domain->threadpool_jobs);
+ MonoDomain *domain;
+ int remaining_jobs;
+
+ if (obj == NULL)
+ return FALSE;
+
+ domain = obj->vtable->domain;
+ remaining_jobs = InterlockedDecrement (&domain->threadpool_jobs);
if (remaining_jobs == 0 && domain->cleanup_semaphore) {
ReleaseSemaphore (domain->cleanup_semaphore, 1, NULL);
return TRUE;
init_event_system (SocketIOData *data)
{
#ifdef HAVE_EPOLL
- if (data->event_system == EPOLL_BACKEND)
+ if (data->event_system == EPOLL_BACKEND) {
data->event_data = tp_epoll_init (data);
+ if (data->event_data == NULL) {
+ if (g_getenv ("MONO_DEBUG"))
+ g_message ("Falling back to poll()");
+ data->event_system = POLL_BACKEND;
+ }
+ }
#elif defined(HAVE_KQUEUE)
if (data->event_system == KQUEUE_BACKEND)
data->event_data = tp_kqueue_init (data);
mono_thread_create_internal (mono_get_root_domain (), data->wait, data, TRUE, SMALL_STACK);
LeaveCriticalSection (&data->io_lock);
data->inited = 2;
- mono_thread_create_internal (mono_get_root_domain (), threadpool_start_idle_threads, &async_io_tp, TRUE, SMALL_STACK);
+ threadpool_start_thread (&async_io_tp);
}
static void
if (ac == NULL) {
/* Fast path from ThreadPool.*QueueUserWorkItem */
void *pa = ares->async_state;
- mono_runtime_delegate_invoke (ares->async_delegate, &pa, &exc);
+ res = mono_runtime_delegate_invoke (ares->async_delegate, &pa, &exc);
} else {
MonoObject *cb_exc = NULL;
void *pa = &ares;
cb_exc = NULL;
mono_runtime_invoke (ac->cb_method, ac->cb_target, pa, &cb_exc);
- MONO_OBJECT_SETREF (ac->msg, exc, cb_exc);
exc = cb_exc;
} else {
exc = NULL;
#endif
static void
-monitor_thread (gpointer data)
+monitor_thread (gpointer unused)
{
- ThreadPool *tp;
+ ThreadPool *pools [2];
MonoInternalThread *thread;
guint32 ms;
gboolean need_one;
int i;
- tp = data;
+ pools [0] = &async_tp;
+ pools [1] = &async_io_tp;
thread = mono_thread_internal_current ();
ves_icall_System_Threading_Thread_SetName_internal (thread, mono_string_new (mono_domain_get (), "Threadpool monitor"));
while (1) {
if (mono_runtime_is_shutting_down ())
break;
- if (tp->waiting > 0)
- continue;
- need_one = (mono_cq_count (tp->queue) > 0);
- if (!need_one) {
- EnterCriticalSection (&wsqs_lock);
- for (i = 0; wsqs != NULL && i < wsqs->len; i++) {
- MonoWSQ *wsq;
- wsq = g_ptr_array_index (wsqs, i);
- if (mono_wsq_count (wsq) > 0) {
- need_one = TRUE;
- break;
+
+ for (i = 0; i < 2; i++) {
+ ThreadPool *tp;
+ tp = pools [i];
+ if (tp->waiting > 0)
+ continue;
+ need_one = (mono_cq_count (tp->queue) > 0);
+ if (!need_one && !tp->is_io) {
+ EnterCriticalSection (&wsqs_lock);
+ for (i = 0; wsqs != NULL && i < wsqs->len; i++) {
+ MonoWSQ *wsq;
+ wsq = g_ptr_array_index (wsqs, i);
+ if (mono_wsq_count (wsq) != 0) {
+ need_one = TRUE;
+ break;
+ }
}
+ LeaveCriticalSection (&wsqs_lock);
}
- LeaveCriticalSection (&wsqs_lock);
+ if (need_one)
+ threadpool_start_thread (tp);
}
- if (need_one)
- threadpool_start_thread (tp);
}
}
void
mono_thread_pool_cleanup (void)
{
- if (!(async_tp.pool_status == 0 || async_tp.pool_status == 2)) {
- if (!(async_tp.pool_status == 1 && InterlockedCompareExchange (&async_tp.pool_status, 2, 1) == 2)) {
- InterlockedExchange (&async_io_tp.pool_status, 2);
- threadpool_free_queue (&async_tp);
- threadpool_kill_idle_threads (&async_tp);
-
- socket_io_cleanup (&socket_io_data); /* Empty when DISABLE_SOCKETS is defined */
- threadpool_free_queue (&async_io_tp);
- threadpool_kill_idle_threads (&async_io_tp);
- MONO_SEM_DESTROY (&async_io_tp.new_job);
- }
+ if (InterlockedExchange (&async_io_tp.pool_status, 2) == 1) {
+ socket_io_cleanup (&socket_io_data); /* Empty when DISABLE_SOCKETS is defined */
+ threadpool_kill_idle_threads (&async_io_tp);
}
- EnterCriticalSection (&wsqs_lock);
- mono_wsq_cleanup ();
- if (wsqs)
- g_ptr_array_free (wsqs, TRUE);
- wsqs = NULL;
- LeaveCriticalSection (&wsqs_lock);
- MONO_SEM_DESTROY (&async_tp.new_job);
+ if (async_io_tp.queue != NULL) {
+ MONO_SEM_DESTROY (&async_io_tp.new_job);
+ threadpool_free_queue (&async_io_tp);
+ }
+
+
+ if (InterlockedExchange (&async_tp.pool_status, 2) == 1) {
+ threadpool_kill_idle_threads (&async_tp);
+ threadpool_free_queue (&async_tp);
+ }
+
+ if (wsqs) {
+ EnterCriticalSection (&wsqs_lock);
+ mono_wsq_cleanup ();
+ if (wsqs)
+ g_ptr_array_free (wsqs, TRUE);
+ wsqs = NULL;
+ LeaveCriticalSection (&wsqs_lock);
+ MONO_SEM_DESTROY (&async_tp.new_job);
+ }
}
static gboolean
return;
if (tp->pool_status == 0 && InterlockedCompareExchange (&tp->pool_status, 1, 0) == 0) {
- if (!tp->is_io)
- mono_thread_create_internal (mono_get_root_domain (), monitor_thread, tp, TRUE, SMALL_STACK);
+ if (!tp->is_io) {
+ mono_thread_create_internal (mono_get_root_domain (), monitor_thread, NULL, TRUE, SMALL_STACK);
+ threadpool_start_thread (tp);
+ }
/* Create on demand up to min_threads to avoid startup penalty for apps that don't use
* the threadpool that much
* mono_thread_create_internal (mono_get_root_domain (), threadpool_start_idle_threads, tp, TRUE, SMALL_STACK);
{
MonoObject *obj;
MonoMList *other;
- int domain_count;
other = NULL;
- domain_count = 0;
while (mono_cq_dequeue (tp->queue, &obj)) {
- if (obj != NULL && obj->vtable->domain == domain) {
- domain_count++;
- threadpool_jobs_dec (obj);
- } else if (obj != NULL) {
+ if (obj == NULL)
+ continue;
+ if (obj->vtable->domain != domain)
other = mono_mlist_prepend (other, obj);
- }
+ threadpool_jobs_dec (obj);
}
while (other) {
}
static void
-try_steal (gpointer *data, gboolean retry)
+try_steal (MonoWSQ *local_wsq, gpointer *data, gboolean retry)
{
int i;
int ms;
do {
if (mono_runtime_is_shutting_down ())
return;
+
+ EnterCriticalSection (&wsqs_lock);
for (i = 0; wsqs != NULL && i < wsqs->len; i++) {
- if (mono_runtime_is_shutting_down ()) {
- return;
- }
+ MonoWSQ *wsq;
+
+ wsq = wsqs->pdata [i];
+ if (wsq == local_wsq || mono_wsq_count (wsq) == 0)
+ continue;
mono_wsq_try_steal (wsqs->pdata [i], data, ms);
if (*data != NULL) {
+ LeaveCriticalSection (&wsqs_lock);
return;
}
}
+ LeaveCriticalSection (&wsqs_lock);
ms += 10;
} while (retry && ms < 11);
}
static gboolean
-dequeue_or_steal (ThreadPool *tp, gpointer *data)
+dequeue_or_steal (ThreadPool *tp, gpointer *data, MonoWSQ *local_wsq)
{
if (mono_runtime_is_shutting_down ())
return FALSE;
mono_cq_dequeue (tp->queue, (MonoObject **) data);
if (!tp->is_io && !*data)
- try_steal (data, FALSE);
+ try_steal (local_wsq, data, FALSE);
return (*data != NULL);
}
mono_profiler_thread_start (thread->tid);
name = (tp->is_io) ? "IO Threadpool worker" : "Threadpool worker";
- ves_icall_System_Threading_Thread_SetName_internal (thread, mono_string_new (mono_domain_get (), name));
+ mono_thread_set_name_internal (thread, mono_string_new (mono_domain_get (), name), FALSE);
if (tp_start_func)
tp_start_func (tp_hooks_user_data);
exc = mono_async_invoke (tp, ar);
if (tp_item_end_func)
tp_item_end_func (tp_item_user_data);
- if (exc && mono_runtime_unhandled_exception_policy_get () == MONO_UNHANDLED_POLICY_CURRENT) {
- gboolean unloaded;
- MonoClass *klass;
-
- klass = exc->vtable->klass;
- unloaded = is_appdomainunloaded_exception (exc->vtable->domain, klass);
- if (!unloaded && klass != mono_defaults.threadabortexception_class) {
- mono_unhandled_exception (exc);
- exit (255);
- }
- if (klass == mono_defaults.threadabortexception_class)
- mono_thread_internal_reset_abort (thread);
- }
+ if (exc)
+ mono_internal_thread_unhandled_exception (exc);
if (is_socket && tp->is_io) {
MonoSocketAsyncResult *state = (MonoSocketAsyncResult *) data;
data = NULL;
must_die = should_i_die (tp);
if (!must_die && (tp->is_io || !mono_wsq_local_pop (&data)))
- dequeue_or_steal (tp, &data);
+ dequeue_or_steal (tp, &data, wsq);
n_naps = 0;
while (!must_die && !data && n_naps < 4) {
gboolean res;
InterlockedIncrement (&tp->waiting);
+
+ // Another thread may have added a job into its wsq since the last call to dequeue_or_steal
+ // Check all the queues again before entering the wait loop
+ dequeue_or_steal (tp, &data, wsq);
+ if (data) {
+ InterlockedDecrement (&tp->waiting);
+ break;
+ }
+
+ mono_gc_set_skip_thread (TRUE);
+
#if defined(__OpenBSD__)
- while ((res = mono_sem_wait (&tp->new_job, TRUE)) == -1) {// && errno == EINTR) {
+ while (mono_cq_count (tp->queue) == 0 && (res = mono_sem_wait (&tp->new_job, TRUE)) == -1) {// && errno == EINTR) {
#else
- while ((res = mono_sem_timedwait (&tp->new_job, 2000, TRUE)) == -1) {// && errno == EINTR) {
+ while (mono_cq_count (tp->queue) == 0 && (res = mono_sem_timedwait (&tp->new_job, 2000, TRUE)) == -1) {// && errno == EINTR) {
#endif
if (mono_runtime_is_shutting_down ())
break;
mono_thread_interruption_checkpoint ();
}
InterlockedDecrement (&tp->waiting);
+
+ mono_gc_set_skip_thread (FALSE);
+
if (mono_runtime_is_shutting_down ())
break;
must_die = should_i_die (tp);
- dequeue_or_steal (tp, &data);
+ dequeue_or_steal (tp, &data, wsq);
n_naps++;
}
tp_item_user_data = user_data;
}
+void
+mono_internal_thread_unhandled_exception (MonoObject* exc)
+{
+ if (mono_runtime_unhandled_exception_policy_get () == MONO_UNHANDLED_POLICY_CURRENT) {
+ gboolean unloaded;
+ MonoClass *klass;
+
+ klass = exc->vtable->klass;
+ unloaded = is_appdomainunloaded_exception (exc->vtable->domain, klass);
+ if (!unloaded && klass != mono_defaults.threadabortexception_class) {
+ mono_unhandled_exception (exc);
+ if (mono_environment_exitcode_get () == 1)
+ exit (255);
+ }
+ if (klass == mono_defaults.threadabortexception_class)
+ mono_thread_internal_reset_abort (mono_thread_internal_current ());
+ }
+}