[system] Don't throw ObjectDisposedException on cancelled async WebConnection. Fixes...
[mono.git] / mono / metadata / threadpool.c
index 36514cc6ea8a06c0f26c553ecb68e59de2d846a7..c901d401acad6bd05f00ea1c750fab9114803b57 100644 (file)
@@ -26,6 +26,7 @@
 #include <mono/metadata/mono-cq.h>
 #include <mono/metadata/mono-wsq.h>
 #include <mono/metadata/mono-ptr-array.h>
+#include <mono/metadata/object-internals.h>
 #include <mono/io-layer/io-layer.h>
 #include <mono/utils/mono-time.h>
 #include <mono/utils/mono-proclib.h>
 #endif
 
 #include "threadpool.h"
+#include "threadpool-ms.h"
+#include "threadpool-ms-io.h"
+
+static gboolean
+use_ms_threadpool (void)
+{
+       static gboolean use_ms_tp = -1;
+       const gchar *mono_threadpool_env;
+       if (use_ms_tp != -1)
+               return use_ms_tp;
+       else if (!(mono_threadpool_env = g_getenv ("MONO_THREADPOOL")))
+               return use_ms_tp = FALSE;
+       else if (strcmp (mono_threadpool_env, "microsoft") == 0)
+               return use_ms_tp = TRUE;
+       else
+               return use_ms_tp = FALSE;
+}
 
 #define THREAD_WANTS_A_BREAK(t) ((t->state & (ThreadState_StopRequested | \
                                                ThreadState_SuspendRequested)) != 0)
 
-#define SMALL_STACK (128 * (sizeof (gpointer) / 4) * 1024)
-
 /* DEBUG: prints tp data every 2s */
 #undef DEBUG 
 
@@ -83,31 +99,8 @@ enum {
        MONITOR_STATE_SLEEPING
 };
 
-typedef struct {
-       mono_mutex_t io_lock; /* access to sock_to_state */
-       int inited; // 0 -> not initialized , 1->initializing, 2->initialized, 3->cleaned up
-       MonoGHashTable *sock_to_state;
-
-       gint event_system;
-       gpointer event_data;
-       void (*modify) (gpointer p, int fd, int operation, int events, gboolean is_new);
-       void (*wait) (gpointer sock_data);
-       void (*shutdown) (gpointer event_data);
-} SocketIOData;
-
 static SocketIOData socket_io_data;
 
-/* Keep in sync with the System.MonoAsyncCall class which provides GC tracking */
-typedef struct {
-       MonoObject         object;
-       MonoMethodMessage *msg;
-       MonoMethod        *cb_method;
-       MonoDelegate      *cb_target;
-       MonoObject        *state;
-       MonoObject        *res;
-       MonoArray         *out_args;
-} ASyncCall;
-
 typedef struct {
        MonoSemType lock;
        MonoCQ *queue; /* GC root */
@@ -148,11 +141,7 @@ static void threadpool_kill_idle_threads (ThreadPool *tp);
 static gboolean threadpool_start_thread (ThreadPool *tp);
 static void threadpool_kill_thread (ThreadPool *tp);
 static void monitor_thread (gpointer data);
-static void socket_io_cleanup (SocketIOData *data);
-static MonoObject *get_io_event (MonoMList **list, gint event);
-static int get_events_from_list (MonoMList *list);
 static int get_event_from_state (MonoSocketAsyncResult *state);
-static void check_for_interruption_critical (void);
 
 static MonoClass *async_call_klass;
 static MonoClass *socket_async_call_klass;
@@ -196,7 +185,9 @@ enum {
        AIO_OP_LAST
 };
 
-#include <mono/metadata/tpool-poll.c>
+// #include <mono/metadata/tpool-poll.c>
+gpointer tp_poll_init (SocketIOData *data);
+
 #ifdef HAVE_EPOLL
 #include <mono/metadata/tpool-epoll.c>
 #elif defined(USE_KQUEUE_FOR_THREADPOOL)
@@ -222,16 +213,6 @@ is_corlib_type (MonoDomain *domain, MonoClass *klass)
        return klass->image == mono_defaults.corlib;
 }
 
-/*
- * Note that we call it is_socket_type() where 'socket' refers to the image
- * that contains the System.Net.Sockets.Socket type.
-*/
-static gboolean
-is_socket_type (MonoDomain *domain, MonoClass *klass)
-{
-       return is_system_type (domain, klass);
-}
-
 #define check_type_cached(domain, ASSEMBLY, _class, _namespace, _name, loc) do { \
        if (*loc) \
                return *loc == _class; \
@@ -244,8 +225,6 @@ is_socket_type (MonoDomain *domain, MonoClass *klass)
 
 #define check_corlib_type_cached(domain, _class, _namespace, _name, loc) check_type_cached (domain, corlib, _class, _namespace, _name, loc)
 
-#define check_socket_type_cached(domain, _class, _namespace, _name, loc) check_type_cached (domain, socket, _class, _namespace, _name, loc)
-
 #define check_system_type_cached(domain, _class, _namespace, _name, loc) check_type_cached (domain, system, _class, _namespace, _name, loc)
 
 static gboolean
@@ -254,26 +233,18 @@ is_corlib_asyncresult (MonoDomain *domain, MonoClass *klass)
        check_corlib_type_cached (domain, klass, "System.Runtime.Remoting.Messaging", "AsyncResult", &domain->corlib_asyncresult_class);
 }
 
-static gboolean
-is_socket (MonoDomain *domain, MonoClass *klass)
-{
-       check_socket_type_cached (domain, klass, "System.Net.Sockets", "Socket", &domain->socket_class);
-}
-
 static gboolean
 is_socketasyncresult (MonoDomain *domain, MonoClass *klass)
 {
-       return (klass->nested_in &&
-                       is_socket (domain, klass->nested_in) &&
-                       !strcmp (klass->name, "SocketAsyncResult"));
+       static MonoClass *socket_async_result_klass = NULL;
+       check_system_type_cached (domain, klass, "System.Net.Sockets", "SocketAsyncResult", &socket_async_result_klass);
 }
 
 static gboolean
 is_socketasynccall (MonoDomain *domain, MonoClass *klass)
 {
-       return (klass->nested_in &&
-                       is_socket (domain, klass->nested_in) &&
-                       !strcmp (klass->name, "SocketAsyncCall"));
+       static MonoClass *socket_async_callback_klass = NULL;
+       check_system_type_cached (domain, klass, "System.Net.Sockets", "SocketAsyncCallback", &socket_async_callback_klass);
 }
 
 static gboolean
@@ -300,7 +271,10 @@ is_sdp_asyncreadhandler (MonoDomain *domain, MonoClass *klass)
 
 #ifdef DISABLE_SOCKETS
 
-#define socket_io_cleanup(x)
+void
+socket_io_cleanup (SocketIOData *data)
+{
+}
 
 static int
 get_event_from_state (MonoSocketAsyncResult *state)
@@ -309,7 +283,7 @@ get_event_from_state (MonoSocketAsyncResult *state)
        return -1;
 }
 
-static int
+int
 get_events_from_list (MonoMList *list)
 {
        return 0;
@@ -317,7 +291,7 @@ get_events_from_list (MonoMList *list)
 
 #else
 
-static void
+void
 socket_io_cleanup (SocketIOData *data)
 {
        mono_mutex_lock (&data->io_lock);
@@ -355,7 +329,7 @@ get_event_from_state (MonoSocketAsyncResult *state)
        }
 }
 
-static int
+int
 get_events_from_list (MonoMList *list)
 {
        MonoSocketAsyncResult *state;
@@ -369,14 +343,6 @@ get_events_from_list (MonoMList *list)
        return events;
 }
 
-#define ICALL_RECV(x)  ves_icall_System_Net_Sockets_Socket_Receive_internal (\
-                               (SOCKET)(gssize)x->handle, x->buffer, x->offset, x->size,\
-                                x->socket_flags, &x->error);
-
-#define ICALL_SEND(x)  ves_icall_System_Net_Sockets_Socket_Send_internal (\
-                               (SOCKET)(gssize)x->handle, x->buffer, x->offset, x->size,\
-                                x->socket_flags, &x->error);
-
 #endif /* !DISABLE_SOCKETS */
 
 static void
@@ -404,7 +370,7 @@ threadpool_jobs_dec (MonoObject *obj)
        return FALSE;
 }
 
-static MonoObject *
+MonoObject *
 get_io_event (MonoMList **list, gint event)
 {
        MonoObject *state;
@@ -446,6 +412,13 @@ mono_thread_pool_remove_socket (int sock)
        MonoSocketAsyncResult *state;
        MonoObject *ares;
 
+       if (use_ms_threadpool ()) {
+#ifndef DISABLE_SOCKETS
+               mono_threadpool_ms_io_remove_socket (sock);
+#endif
+               return;
+       }
+
        if (socket_io_data.inited == 0)
                return;
 
@@ -605,63 +578,9 @@ socket_io_filter (MonoObject *target, MonoObject *state)
 static MonoObject *
 mono_async_invoke (ThreadPool *tp, MonoAsyncResult *ares)
 {
-       ASyncCall *ac = (ASyncCall *)ares->object_data;
-       MonoObject *res, *exc = NULL;
-       MonoArray *out_args = NULL;
-       HANDLE wait_event = NULL;
-       MonoInternalThread *thread = mono_thread_internal_current ();
-
-       if (ares->execution_context) {
-               /* use captured ExecutionContext (if available) */
-               MONO_OBJECT_SETREF (ares, original_context, mono_thread_get_execution_context ());
-               mono_thread_set_execution_context (ares->execution_context);
-       } else {
-               ares->original_context = NULL;
-       }
-
-       if (ac == NULL) {
-               /* Fast path from ThreadPool.*QueueUserWorkItem */
-               void *pa = ares->async_state;
-               /* The debugger needs this */
-               thread->async_invoke_method = ((MonoDelegate*)ares->async_delegate)->method;
-               res = mono_runtime_delegate_invoke (ares->async_delegate, &pa, &exc);
-               thread->async_invoke_method = NULL;
-       } else {
-               MonoObject *cb_exc = NULL;
+       MonoObject *exc = NULL;
 
-               ac->msg->exc = NULL;
-               res = mono_message_invoke (ares->async_delegate, ac->msg, &exc, &out_args);
-               MONO_OBJECT_SETREF (ac, res, res);
-               MONO_OBJECT_SETREF (ac, msg->exc, exc);
-               MONO_OBJECT_SETREF (ac, out_args, out_args);
-
-               mono_monitor_enter ((MonoObject *) ares);
-               ares->completed = 1;
-               if (ares->handle != NULL)
-                       wait_event = mono_wait_handle_get_handle ((MonoWaitHandle *) ares->handle);
-               mono_monitor_exit ((MonoObject *) ares);
-               /* notify listeners */
-               if (wait_event != NULL)
-                       SetEvent (wait_event);
-
-               /* call async callback if cb_method != null*/
-               if (ac != NULL && ac->cb_method) {
-                       void *pa = &ares;
-                       cb_exc = NULL;
-                       thread->async_invoke_method = ac->cb_method;
-                       mono_runtime_invoke (ac->cb_method, ac->cb_target, pa, &cb_exc);
-                       thread->async_invoke_method = NULL;
-                       exc = cb_exc;
-               } else {
-                       exc = NULL;
-               }
-       }
-
-       /* restore original thread execution context if flow isn't suppressed, i.e. non null */
-       if (ares->original_context) {
-               mono_thread_set_execution_context (ares->original_context);
-               ares->original_context = NULL;
-       }
+       mono_async_result_invoke (ares, &exc);
 
 #if DEBUG
        InterlockedDecrement (&tp->njobs);
@@ -796,7 +715,7 @@ static gint8
 monitor_heuristic (gint16 *current, gint16 *history_size, SamplesHistory *history, ThreadPool *tp)
 {
        int i;
-       gint8 decision;
+       gint8 decision G_GNUC_UNUSED;
        gint16 cur, max = 0;
        gboolean all_waitsleepjoin;
        MonoInternalThread *thread;
@@ -903,6 +822,8 @@ monitor_thread (gpointer unused)
        while (1) {
                ms = SAMPLES_PERIOD;
                i = 10; //number of spurious awakes we tolerate before doing a round of rebalancing.
+               mono_gc_set_skip_thread (TRUE);
+               MONO_PREPARE_BLOCKING
                do {
                        guint32 ts;
                        ts = mono_msec_ticks ();
@@ -911,9 +832,10 @@ monitor_thread (gpointer unused)
                        ms -= (mono_msec_ticks () - ts);
                        if (mono_runtime_is_shutting_down ())
                                break;
-                       if (THREAD_WANTS_A_BREAK (thread))
-                               mono_thread_interruption_checkpoint ();
+                       check_for_interruption_critical ();
                } while (ms > 0 && i--);
+               MONO_FINISH_BLOCKING
+               mono_gc_set_skip_thread (FALSE);
 
                if (mono_runtime_is_shutting_down ())
                        break;
@@ -925,6 +847,7 @@ monitor_thread (gpointer unused)
                if (async_tp.pool_status == 2 || async_io_tp.pool_status == 2)
                        break;
 
+               MONO_PREPARE_BLOCKING
                switch (monitor_state) {
                case MONITOR_STATE_AWAKE:
                        num_waiting_iterations = 0;
@@ -943,6 +866,7 @@ monitor_thread (gpointer unused)
                case MONITOR_STATE_SLEEPING:
                        g_assert_not_reached ();
                }
+               MONO_FINISH_BLOCKING
 
                for (i = 0; i < 2; i++) {
                        ThreadPool *tp;
@@ -966,6 +890,11 @@ monitor_thread (gpointer unused)
 void
 mono_thread_pool_init_tls (void)
 {
+       if (use_ms_threadpool ()) {
+               mono_threadpool_ms_init_tls ();
+               return;
+       }
+
        mono_wsq_init ();
 }
 
@@ -974,8 +903,15 @@ mono_thread_pool_init (void)
 {
        gint threads_per_cpu = 1;
        gint thread_count;
-       gint cpu_count = mono_cpu_count ();
+       gint cpu_count;
        int result;
+       
+       if (use_ms_threadpool ()) {
+               mono_threadpool_ms_init ();
+               return;
+       }
+
+       cpu_count = mono_cpu_count ();
 
        if (tp_inited == 2)
                return;
@@ -1055,45 +991,63 @@ icall_append_io_job (MonoObject *target, MonoSocketAsyncResult *state)
        MonoAsyncResult *ares;
 
        ares = create_simple_asyncresult (target, (MonoObject *) state);
+
+       if (use_ms_threadpool ()) {
+#ifndef DISABLE_SOCKETS
+               mono_threadpool_ms_io_add (ares, state);
+#endif
+               return;
+       }
+
        socket_io_add (ares, state);
 }
 
 MonoAsyncResult *
-mono_thread_pool_add (MonoObject *target, MonoMethodMessage *msg, MonoDelegate *async_callback,
-                     MonoObject *state)
+mono_thread_pool_begin_invoke (MonoDomain *domain, MonoObject *target, MonoMethod *method, gpointer *params)
 {
-       MonoDomain *domain = mono_domain_get ();
-       MonoAsyncResult *ares;
-       ASyncCall *ac;
+       MonoMethodMessage *message;
+       MonoAsyncResult *async_result;
+       MonoAsyncCall *async_call;
+       MonoDelegate *async_callback = NULL;
+       MonoObject *state = NULL;
+
+       if (use_ms_threadpool ())
+               return mono_threadpool_ms_begin_invoke (domain, target, method, params);
+
+       message = mono_method_call_message_new (method, params, mono_get_delegate_invoke (method->klass), (params != NULL) ? (&async_callback) : NULL, (params != NULL) ? (&state) : NULL);
 
-       ac = (ASyncCall*)mono_object_new (domain, async_call_klass);
-       MONO_OBJECT_SETREF (ac, msg, msg);
-       MONO_OBJECT_SETREF (ac, state, state);
+       async_call = (MonoAsyncCall*)mono_object_new (domain, async_call_klass);
+       MONO_OBJECT_SETREF (async_call, msg, message);
+       MONO_OBJECT_SETREF (async_call, state, state);
 
        if (async_callback) {
-               ac->cb_method = mono_get_delegate_invoke (((MonoObject *)async_callback)->vtable->klass);
-               MONO_OBJECT_SETREF (ac, cb_target, async_callback);
+               async_call->cb_method = mono_get_delegate_invoke (((MonoObject*) async_callback)->vtable->klass);
+               MONO_OBJECT_SETREF (async_call, cb_target, async_callback);
        }
 
-       ares = mono_async_result_new (domain, NULL, ac->state, NULL, (MonoObject*)ac);
-       MONO_OBJECT_SETREF (ares, async_delegate, target);
+       async_result = mono_async_result_new (domain, NULL, async_call->state, NULL, (MonoObject*) async_call);
+       MONO_OBJECT_SETREF (async_result, async_delegate, target);
 
 #ifndef DISABLE_SOCKETS
        if (socket_io_filter (target, state)) {
-               socket_io_add (ares, (MonoSocketAsyncResult *) state);
-               return ares;
+               socket_io_add (async_result, (MonoSocketAsyncResult *) state);
+               return async_result;
        }
 #endif
-       threadpool_append_job (&async_tp, (MonoObject *) ares);
-       return ares;
+       threadpool_append_job (&async_tp, (MonoObject *) async_result);
+       return async_result;
 }
 
 MonoObject *
-mono_thread_pool_finish (MonoAsyncResult *ares, MonoArray **out_args, MonoObject **exc)
+mono_thread_pool_end_invoke (MonoAsyncResult *ares, MonoArray **out_args, MonoObject **exc)
 {
-       ASyncCall *ac;
+       MonoAsyncCall *ac;
        HANDLE wait_event;
 
+       if (use_ms_threadpool ()) {
+               return mono_threadpool_ms_end_invoke (ares, out_args, exc);
+       }
+
        *exc = NULL;
        *out_args = NULL;
 
@@ -1117,12 +1071,14 @@ mono_thread_pool_finish (MonoAsyncResult *ares, MonoArray **out_args, MonoObject
                        wait_event = mono_wait_handle_get_handle ((MonoWaitHandle *) ares->handle);
                }
                mono_monitor_exit ((MonoObject *) ares);
+               MONO_PREPARE_BLOCKING
                WaitForSingleObjectEx (wait_event, INFINITE, TRUE);
+               MONO_FINISH_BLOCKING
        } else {
                mono_monitor_exit ((MonoObject *) ares);
        }
 
-       ac = (ASyncCall *) ares->object_data;
+       ac = (MonoAsyncCall *) ares->object_data;
        g_assert (ac != NULL);
        *exc = ac->msg->exc; /* FIXME: GC add write barrier */
        *out_args = ac->out_args;
@@ -1145,6 +1101,11 @@ threadpool_kill_idle_threads (ThreadPool *tp)
 void
 mono_thread_pool_cleanup (void)
 {
+       if (use_ms_threadpool ()) {
+               mono_threadpool_ms_cleanup ();
+               return;
+       }
+
        if (InterlockedExchange (&async_io_tp.pool_status, 2) == 1) {
                socket_io_cleanup (&socket_io_data); /* Empty when DISABLE_SOCKETS is defined */
                threadpool_kill_idle_threads (&async_io_tp);
@@ -1195,9 +1156,11 @@ threadpool_start_thread (ThreadPool *tp)
 #ifndef DISABLE_PERFCOUNTERS
                        mono_perfcounter_update_value (tp->pc_nthreads, TRUE, 1);
 #endif
-                       thread = mono_thread_create_internal (mono_get_root_domain (), tp->async_invoke, tp, TRUE, stack_size);
-                       if (!tp->is_io) {
+                       if (tp->is_io) {
+                               thread = mono_thread_create_internal (mono_get_root_domain (), tp->async_invoke, tp, TRUE, stack_size);
+                       } else {
                                mono_mutex_lock (&threads_lock);
+                               thread = mono_thread_create_internal (mono_get_root_domain (), tp->async_invoke, tp, TRUE, stack_size);
                                g_assert (threads != NULL);
                                g_ptr_array_add (threads, thread);
                                mono_mutex_unlock (&threads_lock);
@@ -1235,6 +1198,12 @@ threadpool_append_job (ThreadPool *tp, MonoObject *ar)
        threadpool_append_jobs (tp, &ar, 1);
 }
 
+void
+threadpool_append_async_io_jobs (MonoObject **jobs, gint njobs)
+{
+       threadpool_append_jobs (&async_io_tp, jobs, njobs);
+}
+
 static void
 threadpool_append_jobs (ThreadPool *tp, MonoObject **jobs, gint njobs)
 {
@@ -1341,8 +1310,15 @@ gboolean
 mono_thread_pool_remove_domain_jobs (MonoDomain *domain, int timeout)
 {
        HANDLE sem_handle;
-       int result = TRUE;
-       guint32 start_time = 0;
+       int result;
+       guint32 start_time;
+
+       if (use_ms_threadpool ()) {
+               return mono_threadpool_ms_remove_domain_jobs (domain, timeout);
+       }
+
+       result = TRUE;
+       start_time = 0;
 
        g_assert (domain->state == MONO_APPDOMAIN_UNLOADING);
 
@@ -1372,7 +1348,9 @@ mono_thread_pool_remove_domain_jobs (MonoDomain *domain, int timeout)
        if (domain->threadpool_jobs && timeout != -1)
                start_time = mono_msec_ticks ();
        while (domain->threadpool_jobs) {
+               MONO_PREPARE_BLOCKING
                WaitForSingleObject (sem_handle, timeout);
+               MONO_FINISH_BLOCKING
                if (timeout != -1 && (mono_msec_ticks () - start_time) > timeout) {
                        result = FALSE;
                        break;
@@ -1394,6 +1372,10 @@ threadpool_free_queue (ThreadPool *tp)
 gboolean
 mono_thread_pool_is_queue_array (MonoArray *o)
 {
+       if (use_ms_threadpool ()) {
+               return mono_threadpool_ms_is_queue_array (o);
+       }
+
        // gpointer obj = o;
 
        // FIXME: need some fix in sgen code.
@@ -1467,7 +1449,9 @@ try_steal (MonoWSQ *local_wsq, gpointer *data, gboolean retry)
                if (mono_runtime_is_shutting_down ())
                        return;
 
+               MONO_PREPARE_BLOCKING
                mono_mutex_lock (&wsqs_lock);
+               MONO_FINISH_BLOCKING
                for (i = 0; wsqs != NULL && i < wsqs->len; i++) {
                        MonoWSQ *wsq;
 
@@ -1527,7 +1511,7 @@ clear_thread_state (void)
                ves_icall_System_Threading_Thread_SetState (thread, ThreadState_Background);
 }
 
-static void
+void
 check_for_interruption_critical (void)
 {
        MonoInternalThread *thread;
@@ -1588,14 +1572,6 @@ async_invoke_thread (gpointer data)
                                MonoSocketAsyncResult *state = (MonoSocketAsyncResult *) data;
                                is_socket = is_socketasyncresult (domain, klass);
                                ar = state->ares;
-                               switch (state->operation) {
-                               case AIO_OP_RECEIVE:
-                                       state->total = ICALL_RECV (state);
-                                       break;
-                               case AIO_OP_SEND:
-                                       state->total = ICALL_SEND (state);
-                                       break;
-                               }
                        }
 #endif
                        /* worker threads invokes methods in different domains,
@@ -1671,6 +1647,7 @@ async_invoke_thread (gpointer data)
                        }
 
                        mono_gc_set_skip_thread (TRUE);
+                       MONO_PREPARE_BLOCKING
 
 #if defined(__OpenBSD__)
                        while (mono_cq_count (tp->queue) == 0 && (res = mono_sem_wait (&tp->new_job, TRUE)) == -1) {// && errno == EINTR) {
@@ -1683,6 +1660,7 @@ async_invoke_thread (gpointer data)
                        }
                        InterlockedDecrement (&tp->waiting);
 
+                       MONO_FINISH_BLOCKING
                        mono_gc_set_skip_thread (FALSE);
 
                        if (mono_runtime_is_shutting_down ())
@@ -1865,6 +1843,10 @@ mono_internal_thread_unhandled_exception (MonoObject* exc)
 void
 mono_thread_pool_suspend (void)
 {
+       if (use_ms_threadpool ()) {
+               mono_threadpool_ms_suspend ();
+               return;
+       }
        suspended = TRUE;
 }
 
@@ -1874,5 +1856,9 @@ mono_thread_pool_suspend (void)
 void
 mono_thread_pool_resume (void)
 {
+       if (use_ms_threadpool ()) {
+               mono_threadpool_ms_resume ();
+               return;
+       }
        suspended = FALSE;
 }