*
* Copyright 2001-2003 Ximian, Inc (http://www.ximian.com)
* Copyright 2004-2010 Novell, Inc (http://www.novell.com)
+ * Copyright 2001 Xamarin Inc (http://www.xamarin.com)
*/
#include <config.h>
#include <mono/metadata/threads-types.h>
#include <mono/metadata/threadpool-internals.h>
#include <mono/metadata/exception.h>
+#include <mono/metadata/environment.h>
#include <mono/metadata/mono-mlist.h>
#include <mono/metadata/mono-perfcounters.h>
#include <mono/metadata/socket-io.h>
#include <mono/metadata/mono-cq.h>
#include <mono/metadata/mono-wsq.h>
+#include <mono/metadata/mono-ptr-array.h>
#include <mono/io-layer/io-layer.h>
#include <mono/utils/mono-time.h>
#include <mono/utils/mono-proclib.h>
} while (1)
#define SPIN_UNLOCK(i) i = 0
+#define SMALL_STACK (128 * (sizeof (gpointer) / 4) * 1024)
/* DEBUG: prints tp data every 2s */
#undef DEBUG
static gboolean
threadpool_jobs_dec (MonoObject *obj)
{
- MonoDomain *domain = obj->vtable->domain;
- int remaining_jobs = InterlockedDecrement (&domain->threadpool_jobs);
+ MonoDomain *domain;
+ int remaining_jobs;
+
+ if (obj == NULL)
+ return FALSE;
+
+ domain = obj->vtable->domain;
+ remaining_jobs = InterlockedDecrement (&domain->threadpool_jobs);
if (remaining_jobs == 0 && domain->cleanup_semaphore) {
ReleaseSemaphore (domain->cleanup_semaphore, 1, NULL);
return TRUE;
init_event_system (SocketIOData *data)
{
#ifdef HAVE_EPOLL
- if (data->event_system == EPOLL_BACKEND)
+ if (data->event_system == EPOLL_BACKEND) {
data->event_data = tp_epoll_init (data);
+ if (data->event_data == NULL) {
+ if (g_getenv ("MONO_DEBUG"))
+ g_message ("Falling back to poll()");
+ data->event_system = POLL_BACKEND;
+ }
+ }
#elif defined(HAVE_KQUEUE)
if (data->event_system == KQUEUE_BACKEND)
data->event_data = tp_kqueue_init (data);
socket_io_init (SocketIOData *data)
{
int inited;
- guint32 stack_size;
if (data->inited >= 2) // 2 -> initialized, 3-> cleaned up
return;
data->event_system = POLL_BACKEND;
init_event_system (data);
- stack_size = mono_threads_get_default_stacksize ();
- mono_threads_set_default_stacksize (128 * (sizeof (gpointer) / 4) * 1024);
- mono_thread_create_internal (mono_get_root_domain (), data->wait, data, TRUE);
- mono_threads_set_default_stacksize (stack_size);
+ mono_thread_create_internal (mono_get_root_domain (), data->wait, data, TRUE, SMALL_STACK);
LeaveCriticalSection (&data->io_lock);
data->inited = 2;
+ threadpool_start_thread (&async_io_tp);
}
static void
threadpool_start_idle_threads (ThreadPool *tp)
{
int n;
+ guint32 stack_size;
- if (tp->pool_status == 1 && !tp->is_io)
- mono_thread_create_internal (mono_get_root_domain (), monitor_thread, tp, TRUE);
+ stack_size = (!tp->is_io) ? 0 : SMALL_STACK;
do {
while (1) {
n = tp->nthreads;
break;
}
mono_perfcounter_update_value (tp->pc_nthreads, TRUE, 1);
- mono_thread_create_internal (mono_get_root_domain (), tp->async_invoke, tp, TRUE);
+ mono_thread_create_internal (mono_get_root_domain (), tp->async_invoke, tp, TRUE, stack_size);
SleepEx (100, TRUE);
} while (1);
}
#endif
static void
-monitor_thread (gpointer data)
+monitor_thread (gpointer unused)
{
- ThreadPool *tp;
+ ThreadPool *pools [2];
MonoInternalThread *thread;
guint32 ms;
gboolean need_one;
int i;
- tp = data;
+ pools [0] = &async_tp;
+ pools [1] = &async_io_tp;
thread = mono_thread_internal_current ();
ves_icall_System_Threading_Thread_SetName_internal (thread, mono_string_new (mono_domain_get (), "Threadpool monitor"));
while (1) {
if (mono_runtime_is_shutting_down ())
break;
- if (tp->waiting > 0)
- continue;
- need_one = (mono_cq_count (tp->queue) > 0);
- if (!need_one) {
- EnterCriticalSection (&wsqs_lock);
- for (i = 0; wsqs != NULL && i < wsqs->len; i++) {
- MonoWSQ *wsq;
- wsq = g_ptr_array_index (wsqs, i);
- if (mono_wsq_count (wsq) > 0) {
- need_one = TRUE;
- break;
+
+ for (i = 0; i < 2; i++) {
+ ThreadPool *tp;
+ tp = pools [i];
+ if (tp->waiting > 0)
+ continue;
+ need_one = (mono_cq_count (tp->queue) > 0);
+ if (!need_one && !tp->is_io) {
+ EnterCriticalSection (&wsqs_lock);
+ for (i = 0; wsqs != NULL && i < wsqs->len; i++) {
+ MonoWSQ *wsq;
+ wsq = g_ptr_array_index (wsqs, i);
+ if (mono_wsq_count (wsq) != 0) {
+ need_one = TRUE;
+ break;
+ }
}
+ LeaveCriticalSection (&wsqs_lock);
}
- LeaveCriticalSection (&wsqs_lock);
+ if (need_one)
+ threadpool_start_thread (tp);
}
- if (need_one)
- threadpool_start_thread (tp);
}
}
void
mono_thread_pool_cleanup (void)
{
- if (!(async_tp.pool_status == 0 || async_tp.pool_status == 2)) {
- if (!(async_tp.pool_status == 1 && InterlockedCompareExchange (&async_tp.pool_status, 2, 1) == 2)) {
- InterlockedExchange (&async_io_tp.pool_status, 2);
- threadpool_free_queue (&async_tp);
- threadpool_kill_idle_threads (&async_tp);
-
- socket_io_cleanup (&socket_io_data); /* Empty when DISABLE_SOCKETS is defined */
- threadpool_free_queue (&async_io_tp);
- threadpool_kill_idle_threads (&async_io_tp);
- MONO_SEM_DESTROY (&async_io_tp.new_job);
- }
+ if (InterlockedExchange (&async_io_tp.pool_status, 2) == 1) {
+ socket_io_cleanup (&socket_io_data); /* Empty when DISABLE_SOCKETS is defined */
+ threadpool_kill_idle_threads (&async_io_tp);
}
- EnterCriticalSection (&wsqs_lock);
- mono_wsq_cleanup ();
- if (wsqs)
- g_ptr_array_free (wsqs, TRUE);
- wsqs = NULL;
- LeaveCriticalSection (&wsqs_lock);
- MONO_SEM_DESTROY (&async_tp.new_job);
+ if (async_io_tp.queue != NULL) {
+ MONO_SEM_DESTROY (&async_io_tp.new_job);
+ threadpool_free_queue (&async_io_tp);
+ }
+
+
+ if (InterlockedExchange (&async_tp.pool_status, 2) == 1) {
+ threadpool_kill_idle_threads (&async_tp);
+ threadpool_free_queue (&async_tp);
+ }
+
+ if (wsqs) {
+ EnterCriticalSection (&wsqs_lock);
+ mono_wsq_cleanup ();
+ if (wsqs)
+ g_ptr_array_free (wsqs, TRUE);
+ wsqs = NULL;
+ LeaveCriticalSection (&wsqs_lock);
+ MONO_SEM_DESTROY (&async_tp.new_job);
+ }
}
static gboolean
threadpool_start_thread (ThreadPool *tp)
{
gint n;
+ guint32 stack_size;
+ stack_size = (!tp->is_io) ? 0 : SMALL_STACK;
while (!mono_runtime_is_shutting_down () && (n = tp->nthreads) < tp->max_threads) {
if (InterlockedCompareExchange (&tp->nthreads, n + 1, n) == n) {
mono_perfcounter_update_value (tp->pc_nthreads, TRUE, 1);
- mono_thread_create_internal (mono_get_root_domain (), tp->async_invoke, tp, TRUE);
+ mono_thread_create_internal (mono_get_root_domain (), tp->async_invoke, tp, TRUE, stack_size);
return TRUE;
}
}
void
icall_append_job (MonoObject *ar)
{
- threadpool_append_job (&async_tp, ar);
+ threadpool_append_jobs (&async_tp, &ar, 1);
}
static void
if (mono_runtime_is_shutting_down ())
return;
- if (tp->pool_status == 0 && InterlockedCompareExchange (&tp->pool_status, 1, 0) == 0)
- mono_thread_create_internal (mono_get_root_domain (), threadpool_start_idle_threads, tp, TRUE);
+ if (tp->pool_status == 0 && InterlockedCompareExchange (&tp->pool_status, 1, 0) == 0) {
+ if (!tp->is_io) {
+ mono_thread_create_internal (mono_get_root_domain (), monitor_thread, NULL, TRUE, SMALL_STACK);
+ threadpool_start_thread (tp);
+ }
+ /* Create on demand up to min_threads to avoid startup penalty for apps that don't use
+ * the threadpool that much
+ * mono_thread_create_internal (mono_get_root_domain (), threadpool_start_idle_threads, tp, TRUE, SMALL_STACK);
+ */
+ }
for (i = 0; i < njobs; i++) {
ar = jobs [i];
mono_cq_enqueue (tp->queue, ar);
}
- for (i = 0; i < MIN(njobs, tp->max_threads); i++)
+ for (i = 0; tp->waiting > 0 && i < MIN(njobs, tp->max_threads); i++)
pulse_on_new_job (tp);
}
{
MonoObject *obj;
MonoMList *other;
- int domain_count;
other = NULL;
- domain_count = 0;
while (mono_cq_dequeue (tp->queue, &obj)) {
- if (obj != NULL && obj->vtable->domain == domain) {
- domain_count++;
- threadpool_jobs_dec (obj);
- } else if (obj != NULL) {
+ if (obj == NULL)
+ continue;
+ if (obj->vtable->domain != domain)
other = mono_mlist_prepend (other, obj);
- }
+ threadpool_jobs_dec (obj);
}
while (other) {
}
static void
-try_steal (gpointer *data, gboolean retry)
+try_steal (MonoWSQ *local_wsq, gpointer *data, gboolean retry)
{
int i;
int ms;
do {
if (mono_runtime_is_shutting_down ())
return;
+
+ EnterCriticalSection (&wsqs_lock);
for (i = 0; wsqs != NULL && i < wsqs->len; i++) {
- if (mono_runtime_is_shutting_down ()) {
- return;
- }
+ MonoWSQ *wsq;
+
+ wsq = wsqs->pdata [i];
+ if (wsq == local_wsq || mono_wsq_count (wsq) == 0)
+ continue;
mono_wsq_try_steal (wsqs->pdata [i], data, ms);
if (*data != NULL) {
+ LeaveCriticalSection (&wsqs_lock);
return;
}
}
+ LeaveCriticalSection (&wsqs_lock);
ms += 10;
} while (retry && ms < 11);
}
static gboolean
-dequeue_or_steal (ThreadPool *tp, gpointer *data)
+dequeue_or_steal (ThreadPool *tp, gpointer *data, MonoWSQ *local_wsq)
{
if (mono_runtime_is_shutting_down ())
return FALSE;
mono_cq_dequeue (tp->queue, (MonoObject **) data);
if (!tp->is_io && !*data)
- try_steal (data, FALSE);
+ try_steal (local_wsq, data, FALSE);
return (*data != NULL);
}
MonoWSQ *wsq;
ThreadPool *tp;
gboolean must_die;
+ const gchar *name;
tp = data;
wsq = NULL;
thread = mono_thread_internal_current ();
mono_profiler_thread_start (thread->tid);
- ves_icall_System_Threading_Thread_SetName_internal (thread, mono_string_new (mono_domain_get (), "Threadpool worker"));
+ name = (tp->is_io) ? "IO Threadpool worker" : "Threadpool worker";
+ ves_icall_System_Threading_Thread_SetName_internal (thread, mono_string_new (mono_domain_get (), name));
if (tp_start_func)
tp_start_func (tp_hooks_user_data);
unloaded = is_appdomainunloaded_exception (exc->vtable->domain, klass);
if (!unloaded && klass != mono_defaults.threadabortexception_class) {
mono_unhandled_exception (exc);
- exit (255);
+ if (mono_environment_exitcode_get () == 1)
+ exit (255);
}
if (klass == mono_defaults.threadabortexception_class)
mono_thread_internal_reset_abort (thread);
data = NULL;
must_die = should_i_die (tp);
if (!must_die && (tp->is_io || !mono_wsq_local_pop (&data)))
- dequeue_or_steal (tp, &data);
+ dequeue_or_steal (tp, &data, wsq);
n_naps = 0;
while (!must_die && !data && n_naps < 4) {
gboolean res;
+ mono_gc_set_skip_thread (TRUE);
+
InterlockedIncrement (&tp->waiting);
#if defined(__OpenBSD__)
- while ((res = mono_sem_wait (&tp->new_job, TRUE)) == -1) {// && errno == EINTR) {
+ while (mono_cq_count (tp->queue) == 0 && (res = mono_sem_wait (&tp->new_job, TRUE)) == -1) {// && errno == EINTR) {
#else
- while ((res = mono_sem_timedwait (&tp->new_job, 2000, TRUE)) == -1) {// && errno == EINTR) {
+ while (mono_cq_count (tp->queue) == 0 && (res = mono_sem_timedwait (&tp->new_job, 2000, TRUE)) == -1) {// && errno == EINTR) {
#endif
if (mono_runtime_is_shutting_down ())
break;
mono_thread_interruption_checkpoint ();
}
InterlockedDecrement (&tp->waiting);
+
+ mono_gc_set_skip_thread (FALSE);
+
if (mono_runtime_is_shutting_down ())
break;
must_die = should_i_die (tp);
- dequeue_or_steal (tp, &data);
+ dequeue_or_steal (tp, &data, wsq);
n_naps++;
}
InterlockedExchange (&async_tp.min_threads, workerThreads);
InterlockedExchange (&async_io_tp.min_threads, completionPortThreads);
if (workerThreads > async_tp.nthreads)
- mono_thread_create_internal (mono_get_root_domain (), threadpool_start_idle_threads, &async_tp, TRUE);
+ mono_thread_create_internal (mono_get_root_domain (), threadpool_start_idle_threads, &async_tp, TRUE, SMALL_STACK);
if (completionPortThreads > async_io_tp.nthreads)
- mono_thread_create_internal (mono_get_root_domain (), threadpool_start_idle_threads, &async_io_tp, TRUE);
+ mono_thread_create_internal (mono_get_root_domain (), threadpool_start_idle_threads, &async_io_tp, TRUE, SMALL_STACK);
return TRUE;
}