#include <mono/metadata/threadpool-internals.h>
#include <mono/metadata/exception.h>
#include <mono/metadata/environment.h>
+#include <mono/metadata/mono-config.h>
#include <mono/metadata/mono-mlist.h>
#include <mono/metadata/mono-perfcounters.h>
#include <mono/metadata/socket-io.h>
#include <mono/utils/mono-time.h>
#include <mono/utils/mono-proclib.h>
#include <mono/utils/mono-semaphore.h>
+#include <mono/utils/atomic.h>
#include <errno.h>
#ifdef HAVE_SYS_TIME_H
#include <sys/time.h>
gint event_system;
gpointer event_data;
- void (*modify) (gpointer event_data, int fd, int operation, int events, gboolean is_new);
+ void (*modify) (gpointer p, int fd, int operation, int events, gboolean is_new);
void (*wait) (gpointer sock_data);
void (*shutdown) (gpointer event_data);
} SocketIOData;
static MonoObject *get_io_event (MonoMList **list, gint event);
static int get_events_from_list (MonoMList *list);
static int get_event_from_state (MonoSocketAsyncResult *state);
+static void check_for_interruption_critical (void);
static MonoClass *async_call_klass;
static MonoClass *socket_async_call_klass;
static GPtrArray *wsqs;
CRITICAL_SECTION wsqs_lock;
+static gboolean suspended;
/* Hooks */
static MonoThreadPoolFunc tp_start_func;
#include <mono/metadata/tpool-poll.c>
#ifdef HAVE_EPOLL
#include <mono/metadata/tpool-epoll.c>
-#elif defined(HAVE_KQUEUE)
+#elif defined(USE_KQUEUE_FOR_THREADPOOL)
#include <mono/metadata/tpool-kqueue.c>
#endif
/*
/*
* Note that we call it is_socket_type() where 'socket' refers to the image
* that contains the System.Net.Sockets.Socket type.
- * For moonlight there is a System.Net.Sockets.Socket class in both System.dll and System.Net.dll.
*/
static gboolean
is_socket_type (MonoDomain *domain, MonoClass *klass)
{
- static const char *version = NULL;
- static gboolean moonlight;
-
- if (is_system_type (domain, klass))
- return TRUE;
-
- /* If moonlight, check if the type is in System.Net.dll too */
- if (version == NULL) {
- version = mono_get_runtime_info ()->framework_version;
- moonlight = !strcmp (version, "2.1");
- }
-
- if (!moonlight)
- return FALSE;
-
- if (domain->system_net_dll == NULL)
- domain->system_net_dll = mono_image_loaded ("System.Net");
-
- return klass->image == domain->system_net_dll;
+ return is_system_type (domain, klass);
}
#define check_type_cached(domain, ASSEMBLY, _class, _namespace, _name, loc) do { \
data->event_system = POLL_BACKEND;
}
}
-#elif defined(HAVE_KQUEUE)
+#elif defined(USE_KQUEUE_FOR_THREADPOOL)
if (data->event_system == KQUEUE_BACKEND)
data->event_data = tp_kqueue_init (data);
#endif
data->sock_to_state = mono_g_hash_table_new_type (g_direct_hash, g_direct_equal, MONO_HASH_VALUE_GC);
#ifdef HAVE_EPOLL
data->event_system = EPOLL_BACKEND;
-#elif defined(HAVE_KQUEUE)
+#elif defined(USE_KQUEUE_FOR_THREADPOOL)
data->event_system = KQUEUE_BACKEND;
#else
data->event_system = POLL_BACKEND;
mono_g_hash_table_replace (data->sock_to_state, state->handle, list);
ievt = get_events_from_list (list);
- data->modify (data->event_data, fd, state->operation, ievt, is_new);
- LeaveCriticalSection (&data->io_lock);
+ /* The modify function leaves the io_lock critical section. */
+ data->modify (data, fd, state->operation, ievt, is_new);
}
#ifndef DISABLE_SOCKETS
MonoObject *res, *exc = NULL;
MonoArray *out_args = NULL;
HANDLE wait_event = NULL;
+ MonoInternalThread *thread = mono_thread_internal_current ();
if (ares->execution_context) {
/* use captured ExecutionContext (if available) */
if (ac == NULL) {
/* Fast path from ThreadPool.*QueueUserWorkItem */
void *pa = ares->async_state;
+ /* The debugger needs this */
+ thread->async_invoke_method = ((MonoDelegate*)ares->async_delegate)->method;
res = mono_runtime_delegate_invoke (ares->async_delegate, &pa, &exc);
+ thread->async_invoke_method = NULL;
} else {
MonoObject *cb_exc = NULL;
if (ac != NULL && ac->cb_method) {
void *pa = &ares;
cb_exc = NULL;
+ thread->async_invoke_method = ac->cb_method;
mono_runtime_invoke (ac->cb_method, ac->cb_target, pa, &cb_exc);
+ thread->async_invoke_method = NULL;
exc = cb_exc;
} else {
exc = NULL;
if (InterlockedCompareExchange (&tp->nthreads, n + 1, n) == n)
break;
}
+#ifndef DISABLE_PERFCOUNTERS
mono_perfcounter_update_value (tp->pc_nthreads, TRUE, 1);
+#endif
mono_thread_create_internal (mono_get_root_domain (), tp->async_invoke, tp, TRUE, stack_size);
SleepEx (100, TRUE);
} while (1);
MONO_SEM_INIT (&tp->new_job, 0);
}
+#ifndef DISABLE_PERFCOUNTERS
static void *
init_perf_counter (const char *category, const char *counter)
{
machine = mono_string_new (root, ".");
return mono_perfcounter_get_impl (category_str, counter_str, NULL, machine, &type, &custom);
}
+#endif
#ifdef DEBUG
static void
ves_icall_System_Threading_Thread_SetName_internal (thread, mono_string_new (mono_domain_get (), "Threadpool monitor"));
while (1) {
ms = 500;
+ i = 10; //number of spurious awakes we tolerate before doing a round of rebalancing.
do {
guint32 ts;
ts = mono_msec_ticks ();
break;
if (THREAD_WANTS_A_BREAK (thread))
mono_thread_interruption_checkpoint ();
- } while (ms > 0);
+ } while (ms > 0 && i--);
if (mono_runtime_is_shutting_down ())
break;
+ if (suspended)
+ continue;
+
for (i = 0; i < 2; i++) {
ThreadPool *tp;
tp = pools [i];
}
void
-mono_thread_pool_init ()
+mono_thread_pool_init_tls (void)
+{
+ mono_wsq_init ();
+}
+
+void
+mono_thread_pool_init (void)
{
gint threads_per_cpu = 1;
gint thread_count;
InitializeCriticalSection (&wsqs_lock);
wsqs = g_ptr_array_sized_new (MAX (100 * cpu_count, thread_count));
- mono_wsq_init ();
+#ifndef DISABLE_PERFCOUNTERS
async_tp.pc_nitems = init_perf_counter ("Mono Threadpool", "Work Items Added");
g_assert (async_tp.pc_nitems);
async_io_tp.pc_nthreads = init_perf_counter ("Mono Threadpool", "# of IO Threads");
g_assert (async_io_tp.pc_nthreads);
+#endif
tp_inited = 2;
#ifdef DEBUG
signal (SIGALRM, signal_handler);
stack_size = (!tp->is_io) ? 0 : SMALL_STACK;
while (!mono_runtime_is_shutting_down () && (n = tp->nthreads) < tp->max_threads) {
if (InterlockedCompareExchange (&tp->nthreads, n + 1, n) == n) {
+#ifndef DISABLE_PERFCOUNTERS
mono_perfcounter_update_value (tp->pc_nthreads, TRUE, 1);
+#endif
mono_thread_create_internal (mono_get_root_domain (), tp->async_invoke, tp, TRUE, stack_size);
return TRUE;
}
}
/* Create on demand up to min_threads to avoid startup penalty for apps that don't use
* the threadpool that much
- * mono_thread_create_internal (mono_get_root_domain (), threadpool_start_idle_threads, tp, TRUE, SMALL_STACK);
- */
+ */
+ if (mono_config_is_server_mode ()) {
+ mono_thread_create_internal (mono_get_root_domain (), threadpool_start_idle_threads, tp, TRUE, SMALL_STACK);
+ }
}
for (i = 0; i < njobs; i++) {
o->add_time = mono_100ns_ticks ();
}
threadpool_jobs_inc (ar);
+#ifndef DISABLE_PERFCOUNTERS
mono_perfcounter_update_value (tp->pc_nitems, TRUE, 1);
+#endif
if (!tp->is_io && mono_wsq_local_push (ar))
continue;
threadpool_clear_queue (ThreadPool *tp, MonoDomain *domain)
{
MonoObject *obj;
- MonoMList *other;
+ MonoMList *other = NULL;
+ MonoCQ *queue = tp->queue;
- other = NULL;
- while (mono_cq_dequeue (tp->queue, &obj)) {
+ if (!queue)
+ return;
+
+ while (mono_cq_dequeue (queue, &obj)) {
if (obj == NULL)
continue;
if (obj->vtable->domain != domain)
threadpool_jobs_dec (obj);
}
+ if (mono_runtime_is_shutting_down ())
+ return;
+
while (other) {
threadpool_append_job (tp, (MonoObject *) mono_mlist_get_data (other));
other = mono_mlist_next (other);
static gboolean
dequeue_or_steal (ThreadPool *tp, gpointer *data, MonoWSQ *local_wsq)
{
- if (mono_runtime_is_shutting_down ())
+ MonoCQ *queue = tp->queue;
+ if (mono_runtime_is_shutting_down () || !queue)
return FALSE;
- mono_cq_dequeue (tp->queue, (MonoObject **) data);
+ mono_cq_dequeue (queue, (MonoObject **) data);
if (!tp->is_io && !*data)
try_steal (local_wsq, data, FALSE);
return (*data != NULL);
return result;
}
+static void
+set_tp_thread_info (ThreadPool *tp)
+{
+ const gchar *name;
+ MonoInternalThread *thread = mono_thread_internal_current ();
+
+ mono_profiler_thread_start (thread->tid);
+ name = (tp->is_io) ? "IO Threadpool worker" : "Threadpool worker";
+ mono_thread_set_name_internal (thread, mono_string_new (mono_domain_get (), name), FALSE);
+}
+
+static void
+clear_thread_state (void)
+{
+ MonoInternalThread *thread = mono_thread_internal_current ();
+ /* If the callee changes the background status, set it back to TRUE */
+ mono_thread_clr_state (thread , ~ThreadState_Background);
+ if (!mono_thread_test_state (thread , ThreadState_Background))
+ ves_icall_System_Threading_Thread_SetState (thread, ThreadState_Background);
+}
+
+static void
+check_for_interruption_critical (void)
+{
+ MonoInternalThread *thread;
+ /*RULE NUMBER ONE OF SKIP_THREAD: NEVER POKE MANAGED STATE.*/
+ mono_gc_set_skip_thread (FALSE);
+
+ thread = mono_thread_internal_current ();
+ if (THREAD_WANTS_A_BREAK (thread))
+ mono_thread_interruption_checkpoint ();
+
+ /*RULE NUMBER TWO OF SKIP_THREAD: READ RULE NUMBER ONE.*/
+ mono_gc_set_skip_thread (TRUE);
+}
+
+static void
+fire_profiler_thread_end (void)
+{
+ MonoInternalThread *thread = mono_thread_internal_current ();
+ mono_profiler_thread_end (thread->tid);
+}
+
static void
async_invoke_thread (gpointer data)
{
MonoDomain *domain;
- MonoInternalThread *thread;
MonoWSQ *wsq;
ThreadPool *tp;
gboolean must_die;
- const gchar *name;
tp = data;
wsq = NULL;
if (!tp->is_io)
wsq = add_wsq ();
- thread = mono_thread_internal_current ();
-
- mono_profiler_thread_start (thread->tid);
- name = (tp->is_io) ? "IO Threadpool worker" : "Threadpool worker";
- mono_thread_set_name_internal (thread, mono_string_new (mono_domain_get (), name), FALSE);
+ set_tp_thread_info (tp);
if (tp_start_func)
tp_start_func (tp_hooks_user_data);
}
mono_thread_pop_appdomain_ref ();
InterlockedDecrement (&tp->busy_threads);
- /* If the callee changes the background status, set it back to TRUE */
- mono_thread_clr_state (thread , ~ThreadState_Background);
- if (!mono_thread_test_state (thread , ThreadState_Background))
- ves_icall_System_Threading_Thread_SetState (thread, ThreadState_Background);
+ clear_thread_state ();
}
}
#endif
if (mono_runtime_is_shutting_down ())
break;
- if (THREAD_WANTS_A_BREAK (thread))
- mono_thread_interruption_checkpoint ();
+ check_for_interruption_critical ();
}
InterlockedDecrement (&tp->waiting);
if (!down && nt <= tp->min_threads)
break;
if (down || InterlockedCompareExchange (&tp->nthreads, nt - 1, nt) == nt) {
+#ifndef DISABLE_PERFCOUNTERS
mono_perfcounter_update_value (tp->pc_nthreads, TRUE, -1);
+#endif
if (!tp->is_io) {
remove_wsq (wsq);
}
- mono_profiler_thread_end (thread->tid);
+ fire_profiler_thread_end ();
if (tp_finish_func)
tp_finish_func (tp_hooks_user_data);
mono_thread_internal_reset_abort (mono_thread_internal_current ());
}
}
+
+/*
+ * Suspend creation of new threads.
+ */
+void
+mono_thread_pool_suspend (void)
+{
+ suspended = TRUE;
+}
+
+/*
+ * Resume creation of new threads.
+ */
+void
+mono_thread_pool_resume (void)
+{
+ suspended = FALSE;
+}