#include <mono/metadata/mono-cq.h>
#include <mono/metadata/mono-wsq.h>
#include <mono/metadata/mono-ptr-array.h>
+#include <mono/metadata/object-internals.h>
#include <mono/io-layer/io-layer.h>
#include <mono/utils/mono-time.h>
#include <mono/utils/mono-proclib.h>
#endif
#include "threadpool.h"
+#include "threadpool-ms.h"
+#include "threadpool-ms-io.h"
+
+static gboolean
+use_ms_threadpool (void)
+{
+ static gboolean use_ms_tp = -1;
+ const gchar *mono_threadpool_env;
+ if (use_ms_tp != -1)
+ return use_ms_tp;
+ else if (!(mono_threadpool_env = g_getenv ("MONO_THREADPOOL")))
+ return use_ms_tp = FALSE;
+ else if (strcmp (mono_threadpool_env, "microsoft") == 0)
+ return use_ms_tp = TRUE;
+ else
+ return use_ms_tp = FALSE;
+}
#define THREAD_WANTS_A_BREAK(t) ((t->state & (ThreadState_StopRequested | \
ThreadState_SuspendRequested)) != 0)
-#define SMALL_STACK (128 * (sizeof (gpointer) / 4) * 1024)
-
/* DEBUG: prints tp data every 2s */
#undef DEBUG
MONITOR_STATE_SLEEPING
};
-typedef struct {
- mono_mutex_t io_lock; /* access to sock_to_state */
- int inited; // 0 -> not initialized , 1->initializing, 2->initialized, 3->cleaned up
- MonoGHashTable *sock_to_state;
-
- gint event_system;
- gpointer event_data;
- void (*modify) (gpointer p, int fd, int operation, int events, gboolean is_new);
- void (*wait) (gpointer sock_data);
- void (*shutdown) (gpointer event_data);
-} SocketIOData;
-
static SocketIOData socket_io_data;
-/* Keep in sync with the System.MonoAsyncCall class which provides GC tracking */
-typedef struct {
- MonoObject object;
- MonoMethodMessage *msg;
- MonoMethod *cb_method;
- MonoDelegate *cb_target;
- MonoObject *state;
- MonoObject *res;
- MonoArray *out_args;
-} ASyncCall;
-
typedef struct {
MonoSemType lock;
MonoCQ *queue; /* GC root */
static gboolean threadpool_start_thread (ThreadPool *tp);
static void threadpool_kill_thread (ThreadPool *tp);
static void monitor_thread (gpointer data);
-static void socket_io_cleanup (SocketIOData *data);
-static MonoObject *get_io_event (MonoMList **list, gint event);
-static int get_events_from_list (MonoMList *list);
static int get_event_from_state (MonoSocketAsyncResult *state);
-static void check_for_interruption_critical (void);
static MonoClass *async_call_klass;
static MonoClass *socket_async_call_klass;
AIO_OP_LAST
};
-#include <mono/metadata/tpool-poll.c>
+// #include <mono/metadata/tpool-poll.c>
+gpointer tp_poll_init (SocketIOData *data);
+
#ifdef HAVE_EPOLL
#include <mono/metadata/tpool-epoll.c>
#elif defined(USE_KQUEUE_FOR_THREADPOOL)
return klass->image == mono_defaults.corlib;
}
-/*
- * Note that we call it is_socket_type() where 'socket' refers to the image
- * that contains the System.Net.Sockets.Socket type.
-*/
-static gboolean
-is_socket_type (MonoDomain *domain, MonoClass *klass)
-{
- return is_system_type (domain, klass);
-}
-
#define check_type_cached(domain, ASSEMBLY, _class, _namespace, _name, loc) do { \
if (*loc) \
return *loc == _class; \
#define check_corlib_type_cached(domain, _class, _namespace, _name, loc) check_type_cached (domain, corlib, _class, _namespace, _name, loc)
-#define check_socket_type_cached(domain, _class, _namespace, _name, loc) check_type_cached (domain, socket, _class, _namespace, _name, loc)
-
#define check_system_type_cached(domain, _class, _namespace, _name, loc) check_type_cached (domain, system, _class, _namespace, _name, loc)
static gboolean
check_corlib_type_cached (domain, klass, "System.Runtime.Remoting.Messaging", "AsyncResult", &domain->corlib_asyncresult_class);
}
-static gboolean
-is_socket (MonoDomain *domain, MonoClass *klass)
-{
- check_socket_type_cached (domain, klass, "System.Net.Sockets", "Socket", &domain->socket_class);
-}
-
static gboolean
is_socketasyncresult (MonoDomain *domain, MonoClass *klass)
{
- return (klass->nested_in &&
- is_socket (domain, klass->nested_in) &&
- !strcmp (klass->name, "SocketAsyncResult"));
+ static MonoClass *socket_async_result_klass = NULL;
+ check_system_type_cached (domain, klass, "System.Net.Sockets", "SocketAsyncResult", &socket_async_result_klass);
}
static gboolean
is_socketasynccall (MonoDomain *domain, MonoClass *klass)
{
- return (klass->nested_in &&
- is_socket (domain, klass->nested_in) &&
- !strcmp (klass->name, "SocketAsyncCall"));
+ static MonoClass *socket_async_callback_klass = NULL;
+ check_system_type_cached (domain, klass, "System.Net.Sockets", "SocketAsyncCallback", &socket_async_callback_klass);
}
static gboolean
#ifdef DISABLE_SOCKETS
-#define socket_io_cleanup(x)
+void
+socket_io_cleanup (SocketIOData *data)
+{
+}
static int
get_event_from_state (MonoSocketAsyncResult *state)
return -1;
}
-static int
+int
get_events_from_list (MonoMList *list)
{
return 0;
#else
-static void
+void
socket_io_cleanup (SocketIOData *data)
{
mono_mutex_lock (&data->io_lock);
}
}
-static int
+int
get_events_from_list (MonoMList *list)
{
MonoSocketAsyncResult *state;
return events;
}
-#define ICALL_RECV(x) ves_icall_System_Net_Sockets_Socket_Receive_internal (\
- (SOCKET)(gssize)x->handle, x->buffer, x->offset, x->size,\
- x->socket_flags, &x->error);
-
-#define ICALL_SEND(x) ves_icall_System_Net_Sockets_Socket_Send_internal (\
- (SOCKET)(gssize)x->handle, x->buffer, x->offset, x->size,\
- x->socket_flags, &x->error);
-
#endif /* !DISABLE_SOCKETS */
static void
return FALSE;
}
-static MonoObject *
+MonoObject *
get_io_event (MonoMList **list, gint event)
{
MonoObject *state;
MonoSocketAsyncResult *state;
MonoObject *ares;
+ if (use_ms_threadpool ()) {
+#ifndef DISABLE_SOCKETS
+ mono_threadpool_ms_io_remove_socket (sock);
+#endif
+ return;
+ }
+
if (socket_io_data.inited == 0)
return;
static MonoObject *
mono_async_invoke (ThreadPool *tp, MonoAsyncResult *ares)
{
- ASyncCall *ac = (ASyncCall *)ares->object_data;
- MonoObject *res, *exc = NULL;
- MonoArray *out_args = NULL;
- HANDLE wait_event = NULL;
- MonoInternalThread *thread = mono_thread_internal_current ();
-
- if (ares->execution_context) {
- /* use captured ExecutionContext (if available) */
- MONO_OBJECT_SETREF (ares, original_context, mono_thread_get_execution_context ());
- mono_thread_set_execution_context (ares->execution_context);
- } else {
- ares->original_context = NULL;
- }
-
- if (ac == NULL) {
- /* Fast path from ThreadPool.*QueueUserWorkItem */
- void *pa = ares->async_state;
- /* The debugger needs this */
- thread->async_invoke_method = ((MonoDelegate*)ares->async_delegate)->method;
- res = mono_runtime_delegate_invoke (ares->async_delegate, &pa, &exc);
- thread->async_invoke_method = NULL;
- } else {
- MonoObject *cb_exc = NULL;
+ MonoObject *exc = NULL;
- ac->msg->exc = NULL;
- res = mono_message_invoke (ares->async_delegate, ac->msg, &exc, &out_args);
- MONO_OBJECT_SETREF (ac, res, res);
- MONO_OBJECT_SETREF (ac, msg->exc, exc);
- MONO_OBJECT_SETREF (ac, out_args, out_args);
-
- mono_monitor_enter ((MonoObject *) ares);
- ares->completed = 1;
- if (ares->handle != NULL)
- wait_event = mono_wait_handle_get_handle ((MonoWaitHandle *) ares->handle);
- mono_monitor_exit ((MonoObject *) ares);
- /* notify listeners */
- if (wait_event != NULL)
- SetEvent (wait_event);
-
- /* call async callback if cb_method != null*/
- if (ac != NULL && ac->cb_method) {
- void *pa = &ares;
- cb_exc = NULL;
- thread->async_invoke_method = ac->cb_method;
- mono_runtime_invoke (ac->cb_method, ac->cb_target, pa, &cb_exc);
- thread->async_invoke_method = NULL;
- exc = cb_exc;
- } else {
- exc = NULL;
- }
- }
-
- /* restore original thread execution context if flow isn't suppressed, i.e. non null */
- if (ares->original_context) {
- mono_thread_set_execution_context (ares->original_context);
- ares->original_context = NULL;
- }
+ mono_async_result_invoke (ares, &exc);
#if DEBUG
InterlockedDecrement (&tp->njobs);
monitor_heuristic (gint16 *current, gint16 *history_size, SamplesHistory *history, ThreadPool *tp)
{
int i;
- gint8 decision;
+ gint8 decision G_GNUC_UNUSED;
gint16 cur, max = 0;
gboolean all_waitsleepjoin;
MonoInternalThread *thread;
while (1) {
ms = SAMPLES_PERIOD;
i = 10; //number of spurious awakes we tolerate before doing a round of rebalancing.
+ mono_gc_set_skip_thread (TRUE);
+ MONO_PREPARE_BLOCKING
do {
guint32 ts;
ts = mono_msec_ticks ();
ms -= (mono_msec_ticks () - ts);
if (mono_runtime_is_shutting_down ())
break;
- if (THREAD_WANTS_A_BREAK (thread))
- mono_thread_interruption_checkpoint ();
+ check_for_interruption_critical ();
} while (ms > 0 && i--);
+ MONO_FINISH_BLOCKING
+ mono_gc_set_skip_thread (FALSE);
if (mono_runtime_is_shutting_down ())
break;
if (async_tp.pool_status == 2 || async_io_tp.pool_status == 2)
break;
+ MONO_PREPARE_BLOCKING
switch (monitor_state) {
case MONITOR_STATE_AWAKE:
num_waiting_iterations = 0;
case MONITOR_STATE_SLEEPING:
g_assert_not_reached ();
}
+ MONO_FINISH_BLOCKING
for (i = 0; i < 2; i++) {
ThreadPool *tp;
void
mono_thread_pool_init_tls (void)
{
+ if (use_ms_threadpool ()) {
+ mono_threadpool_ms_init_tls ();
+ return;
+ }
+
mono_wsq_init ();
}
{
gint threads_per_cpu = 1;
gint thread_count;
- gint cpu_count = mono_cpu_count ();
+ gint cpu_count;
int result;
+
+ if (use_ms_threadpool ()) {
+ mono_threadpool_ms_init ();
+ return;
+ }
+
+ cpu_count = mono_cpu_count ();
if (tp_inited == 2)
return;
MonoAsyncResult *ares;
ares = create_simple_asyncresult (target, (MonoObject *) state);
+
+ if (use_ms_threadpool ()) {
+#ifndef DISABLE_SOCKETS
+ mono_threadpool_ms_io_add (ares, state);
+#endif
+ return;
+ }
+
socket_io_add (ares, state);
}
MonoAsyncResult *
-mono_thread_pool_add (MonoObject *target, MonoMethodMessage *msg, MonoDelegate *async_callback,
- MonoObject *state)
+mono_thread_pool_begin_invoke (MonoDomain *domain, MonoObject *target, MonoMethod *method, gpointer *params)
{
- MonoDomain *domain = mono_domain_get ();
- MonoAsyncResult *ares;
- ASyncCall *ac;
+ MonoMethodMessage *message;
+ MonoAsyncResult *async_result;
+ MonoAsyncCall *async_call;
+ MonoDelegate *async_callback = NULL;
+ MonoObject *state = NULL;
+
+ if (use_ms_threadpool ())
+ return mono_threadpool_ms_begin_invoke (domain, target, method, params);
+
+ message = mono_method_call_message_new (method, params, mono_get_delegate_invoke (method->klass), (params != NULL) ? (&async_callback) : NULL, (params != NULL) ? (&state) : NULL);
- ac = (ASyncCall*)mono_object_new (domain, async_call_klass);
- MONO_OBJECT_SETREF (ac, msg, msg);
- MONO_OBJECT_SETREF (ac, state, state);
+ async_call = (MonoAsyncCall*)mono_object_new (domain, async_call_klass);
+ MONO_OBJECT_SETREF (async_call, msg, message);
+ MONO_OBJECT_SETREF (async_call, state, state);
if (async_callback) {
- ac->cb_method = mono_get_delegate_invoke (((MonoObject *)async_callback)->vtable->klass);
- MONO_OBJECT_SETREF (ac, cb_target, async_callback);
+ async_call->cb_method = mono_get_delegate_invoke (((MonoObject*) async_callback)->vtable->klass);
+ MONO_OBJECT_SETREF (async_call, cb_target, async_callback);
}
- ares = mono_async_result_new (domain, NULL, ac->state, NULL, (MonoObject*)ac);
- MONO_OBJECT_SETREF (ares, async_delegate, target);
+ async_result = mono_async_result_new (domain, NULL, async_call->state, NULL, (MonoObject*) async_call);
+ MONO_OBJECT_SETREF (async_result, async_delegate, target);
#ifndef DISABLE_SOCKETS
if (socket_io_filter (target, state)) {
- socket_io_add (ares, (MonoSocketAsyncResult *) state);
- return ares;
+ socket_io_add (async_result, (MonoSocketAsyncResult *) state);
+ return async_result;
}
#endif
- threadpool_append_job (&async_tp, (MonoObject *) ares);
- return ares;
+ threadpool_append_job (&async_tp, (MonoObject *) async_result);
+ return async_result;
}
MonoObject *
-mono_thread_pool_finish (MonoAsyncResult *ares, MonoArray **out_args, MonoObject **exc)
+mono_thread_pool_end_invoke (MonoAsyncResult *ares, MonoArray **out_args, MonoObject **exc)
{
- ASyncCall *ac;
+ MonoAsyncCall *ac;
HANDLE wait_event;
+ if (use_ms_threadpool ()) {
+ return mono_threadpool_ms_end_invoke (ares, out_args, exc);
+ }
+
*exc = NULL;
*out_args = NULL;
wait_event = mono_wait_handle_get_handle ((MonoWaitHandle *) ares->handle);
}
mono_monitor_exit ((MonoObject *) ares);
+ MONO_PREPARE_BLOCKING
WaitForSingleObjectEx (wait_event, INFINITE, TRUE);
+ MONO_FINISH_BLOCKING
} else {
mono_monitor_exit ((MonoObject *) ares);
}
- ac = (ASyncCall *) ares->object_data;
+ ac = (MonoAsyncCall *) ares->object_data;
g_assert (ac != NULL);
*exc = ac->msg->exc; /* FIXME: GC add write barrier */
*out_args = ac->out_args;
void
mono_thread_pool_cleanup (void)
{
+ if (use_ms_threadpool ()) {
+ mono_threadpool_ms_cleanup ();
+ return;
+ }
+
if (InterlockedExchange (&async_io_tp.pool_status, 2) == 1) {
socket_io_cleanup (&socket_io_data); /* Empty when DISABLE_SOCKETS is defined */
threadpool_kill_idle_threads (&async_io_tp);
#ifndef DISABLE_PERFCOUNTERS
mono_perfcounter_update_value (tp->pc_nthreads, TRUE, 1);
#endif
- thread = mono_thread_create_internal (mono_get_root_domain (), tp->async_invoke, tp, TRUE, stack_size);
- if (!tp->is_io) {
+ if (tp->is_io) {
+ thread = mono_thread_create_internal (mono_get_root_domain (), tp->async_invoke, tp, TRUE, stack_size);
+ } else {
mono_mutex_lock (&threads_lock);
+ thread = mono_thread_create_internal (mono_get_root_domain (), tp->async_invoke, tp, TRUE, stack_size);
g_assert (threads != NULL);
g_ptr_array_add (threads, thread);
mono_mutex_unlock (&threads_lock);
threadpool_append_jobs (tp, &ar, 1);
}
+void
+threadpool_append_async_io_jobs (MonoObject **jobs, gint njobs)
+{
+ threadpool_append_jobs (&async_io_tp, jobs, njobs);
+}
+
static void
threadpool_append_jobs (ThreadPool *tp, MonoObject **jobs, gint njobs)
{
mono_thread_pool_remove_domain_jobs (MonoDomain *domain, int timeout)
{
HANDLE sem_handle;
- int result = TRUE;
- guint32 start_time = 0;
+ int result;
+ guint32 start_time;
+
+ if (use_ms_threadpool ()) {
+ return mono_threadpool_ms_remove_domain_jobs (domain, timeout);
+ }
+
+ result = TRUE;
+ start_time = 0;
g_assert (domain->state == MONO_APPDOMAIN_UNLOADING);
if (domain->threadpool_jobs && timeout != -1)
start_time = mono_msec_ticks ();
while (domain->threadpool_jobs) {
+ MONO_PREPARE_BLOCKING
WaitForSingleObject (sem_handle, timeout);
+ MONO_FINISH_BLOCKING
if (timeout != -1 && (mono_msec_ticks () - start_time) > timeout) {
result = FALSE;
break;
gboolean
mono_thread_pool_is_queue_array (MonoArray *o)
{
+ if (use_ms_threadpool ()) {
+ return mono_threadpool_ms_is_queue_array (o);
+ }
+
// gpointer obj = o;
// FIXME: need some fix in sgen code.
if (mono_runtime_is_shutting_down ())
return;
+ MONO_PREPARE_BLOCKING
mono_mutex_lock (&wsqs_lock);
+ MONO_FINISH_BLOCKING
for (i = 0; wsqs != NULL && i < wsqs->len; i++) {
MonoWSQ *wsq;
ves_icall_System_Threading_Thread_SetState (thread, ThreadState_Background);
}
-static void
+void
check_for_interruption_critical (void)
{
MonoInternalThread *thread;
MonoSocketAsyncResult *state = (MonoSocketAsyncResult *) data;
is_socket = is_socketasyncresult (domain, klass);
ar = state->ares;
- switch (state->operation) {
- case AIO_OP_RECEIVE:
- state->total = ICALL_RECV (state);
- break;
- case AIO_OP_SEND:
- state->total = ICALL_SEND (state);
- break;
- }
}
#endif
/* worker threads invokes methods in different domains,
}
mono_gc_set_skip_thread (TRUE);
+ MONO_PREPARE_BLOCKING
#if defined(__OpenBSD__)
while (mono_cq_count (tp->queue) == 0 && (res = mono_sem_wait (&tp->new_job, TRUE)) == -1) {// && errno == EINTR) {
}
InterlockedDecrement (&tp->waiting);
+ MONO_FINISH_BLOCKING
mono_gc_set_skip_thread (FALSE);
if (mono_runtime_is_shutting_down ())
void
mono_thread_pool_suspend (void)
{
+ if (use_ms_threadpool ()) {
+ mono_threadpool_ms_suspend ();
+ return;
+ }
suspended = TRUE;
}
void
mono_thread_pool_resume (void)
{
+ if (use_ms_threadpool ()) {
+ mono_threadpool_ms_resume ();
+ return;
+ }
suspended = FALSE;
}