2010-03-02 Rodrigo Kumpera <rkumpera@novell.com>
[mono.git] / mono / metadata / threadpool.c
index 9a8a90094c05681c979f48c70a39e24f768f9cd8..ac51b03d974c7b494814afe71e1565120d90af49 100644 (file)
@@ -104,6 +104,7 @@ typedef struct {
        MonoObject        *res;
        MonoArray         *out_args;
        /* This is a HANDLE, we use guint64 so the managed object layout remains constant */
+       /* THIS FIELD IS NOT USED ANY MORE. Remove it when we feel like breaking corlib compatibility with 2.6 */
        guint64           wait_event;
 } ASyncCall;
 
@@ -153,6 +154,14 @@ static MonoClass *async_call_klass;
 static MonoClass *socket_async_call_klass;
 static MonoClass *process_async_call_klass;
 
+/* Hooks */
+static MonoThreadPoolFunc tp_start_func;
+static MonoThreadPoolFunc tp_finish_func;
+static gpointer tp_hooks_user_data;
+static MonoThreadPoolItemFunc tp_item_begin_func;
+static MonoThreadPoolItemFunc tp_item_end_func;
+static gpointer tp_item_user_data;
+
 #define INIT_POLLFD(a, b, c) {(a)->fd = b; (a)->events = c; (a)->revents = 0;}
 enum {
        AIO_OP_FIRST,
@@ -179,7 +188,7 @@ socket_io_cleanup (SocketIOData *data)
 
        EnterCriticalSection (&data->io_lock);
        data->inited = 0;
-#ifdef PLATFORM_WIN32
+#ifdef HOST_WIN32
        closesocket (data->pipe [0]);
        closesocket (data->pipe [1]);
 #else
@@ -283,6 +292,8 @@ async_invoke_io_thread (gpointer data)
        idle_data.wait_handle = CreateEvent (NULL, FALSE, FALSE, NULL);
 
        thread = mono_thread_internal_current ();
+       if (tp_start_func)
+               tp_start_func (tp_hooks_user_data);
 
        version = mono_get_runtime_info ()->framework_version;
        for (;;) {
@@ -319,11 +330,15 @@ async_invoke_io_thread (gpointer data)
                                        continue;
                                }
                                if (mono_domain_set (domain, FALSE)) {
-                                       ASyncCall *ac;
+                                       /* ASyncCall *ac; */
 
+                                       if (tp_item_begin_func)
+                                               tp_item_begin_func (tp_item_user_data);
                                        mono_async_invoke (ar);
-                                       ac = (ASyncCall *) ar->object_data;
+                                       if (tp_item_end_func)
+                                               tp_item_end_func (tp_item_user_data);
                                        /*
+                                       ac = (ASyncCall *) ar->object_data;
                                        if (ac->msg->exc != NULL)
                                                mono_unhandled_exception (ac->msg->exc);
                                        */
@@ -355,6 +370,8 @@ async_invoke_io_thread (gpointer data)
                        InterlockedDecrement (&async_io_tp.nthreads);
                        CloseHandle (idle_data.wait_handle);
                        idle_data.wait_handle = NULL;
+                       if (tp_finish_func)
+                               tp_finish_func (tp_hooks_user_data);
                        return;
                }
                
@@ -498,7 +515,7 @@ socket_io_poll_main (gpointer p)
                                for (; i < allocated; i++)
                                        INIT_POLLFD (&pfds [i], -1, 0);
                        }
-#ifndef PLATFORM_WIN32
+#ifndef HOST_WIN32
                        nread = read (data->pipe [0], one, 1);
 #else
                        nread = recv ((SOCKET) data->pipe [0], one, 1, 0);
@@ -692,7 +709,7 @@ mono_thread_pool_remove_socket (int sock)
        }
 }
 
-#ifdef PLATFORM_WIN32
+#ifdef HOST_WIN32
 static void
 connect_hack (gpointer x)
 {
@@ -712,7 +729,7 @@ connect_hack (gpointer x)
 static void
 socket_io_init (SocketIOData *data)
 {
-#ifdef PLATFORM_WIN32
+#ifdef HOST_WIN32
        struct sockaddr_in server;
        struct sockaddr_in client;
        SOCKET srv;
@@ -745,7 +762,7 @@ socket_io_init (SocketIOData *data)
        data->epoll_disabled = TRUE;
 #endif
 
-#ifndef PLATFORM_WIN32
+#ifndef HOST_WIN32
        if (data->epoll_disabled) {
                if (pipe (data->pipe) != 0) {
                        int err = errno;
@@ -807,7 +824,7 @@ socket_io_add_poll (MonoSocketAsyncResult *state)
        SocketIOData *data = &socket_io_data;
        int w;
 
-#if defined(PLATFORM_MACOSX) || defined(PLATFORM_BSD) || defined(PLATFORM_WIN32) || defined(PLATFORM_SOLARIS)
+#if defined(PLATFORM_MACOSX) || defined(PLATFORM_BSD) || defined(HOST_WIN32) || defined(PLATFORM_SOLARIS)
        /* select() for connect() does not work well on the Mac. Bug #75436. */
        /* Bug #77637 for the BSD 6 case */
        /* Bug #78888 for the Windows case */
@@ -834,7 +851,7 @@ socket_io_add_poll (MonoSocketAsyncResult *state)
        mono_g_hash_table_replace (data->sock_to_state, GINT_TO_POINTER (state->handle), list);
        LeaveCriticalSection (&data->io_lock);
        *msg = (char) state->operation;
-#ifndef PLATFORM_WIN32
+#ifndef HOST_WIN32
        w = write (data->pipe [1], msg, 1);
        w = w;
 #else
@@ -957,8 +974,6 @@ mono_async_invoke (MonoAsyncResult *ares)
        MonoArray *out_args = NULL;
        HANDLE wait_event = NULL;
 
-       ares->completed = 1;
-
        if (ares->execution_context) {
                /* use captured ExecutionContext (if available) */
                MONO_OBJECT_SETREF (ares, original_context, mono_thread_get_execution_context ());
@@ -973,6 +988,15 @@ mono_async_invoke (MonoAsyncResult *ares)
        MONO_OBJECT_SETREF (ac, msg->exc, exc);
        MONO_OBJECT_SETREF (ac, out_args, out_args);
 
+       mono_monitor_enter ((MonoObject *) ares);
+       ares->completed = 1;
+       if (ares->handle != NULL)
+               wait_event = mono_wait_handle_get_handle ((MonoWaitHandle *) ares->handle);
+       mono_monitor_exit ((MonoObject *) ares);
+       /* notify listeners */
+       if (wait_event != NULL)
+               SetEvent (wait_event);
+
        /* call async callback if cb_method != null*/
        if (ac->cb_method) {
                MonoObject *exc = NULL;
@@ -990,15 +1014,6 @@ mono_async_invoke (MonoAsyncResult *ares)
                ares->original_context = NULL;
        }
 
-       /* notify listeners */
-       mono_monitor_enter ((MonoObject *) ares);
-       if (ares->handle != NULL) {
-               ac->wait_event = (gsize) mono_wait_handle_get_handle ((MonoWaitHandle *) ares->handle);
-               wait_event = (HANDLE)(gsize) ac->wait_event;
-       }
-       mono_monitor_exit ((MonoObject *) ares);
-       if (wait_event != NULL)
-               SetEvent (wait_event);
 }
 
 static void
@@ -1069,7 +1084,7 @@ mono_thread_pool_init ()
        cpu_count = mono_cpu_count ();
        n = 8 + 2 * cpu_count; /* 8 is minFreeThreads for ASP.NET */
        threadpool_init (&async_tp, n, n + threads_per_cpu * cpu_count, async_invoke_thread);
-#ifndef DISABLE_SOCKET
+#ifndef DISABLE_SOCKETS
        threadpool_init (&async_io_tp, 2 * cpu_count, 8 * cpu_count, async_invoke_io_thread);
 #endif
 
@@ -1122,6 +1137,7 @@ MonoObject *
 mono_thread_pool_finish (MonoAsyncResult *ares, MonoArray **out_args, MonoObject **exc)
 {
        ASyncCall *ac;
+       HANDLE wait_event;
 
        *exc = NULL;
        *out_args = NULL;
@@ -1137,23 +1153,23 @@ mono_thread_pool_finish (MonoAsyncResult *ares, MonoArray **out_args, MonoObject
        }
 
        ares->endinvoke_called = 1;
-       ac = (ASyncCall *)ares->object_data;
-
-       g_assert (ac != NULL);
-
        /* wait until we are really finished */
        if (!ares->completed) {
                if (ares->handle == NULL) {
-                       ac->wait_event = (gsize)CreateEvent (NULL, TRUE, FALSE, NULL);
-                       g_assert(ac->wait_event != 0);
-                       MONO_OBJECT_SETREF (ares, handle, (MonoObject *) mono_wait_handle_new (mono_object_domain (ares), (gpointer)(gsize)ac->wait_event));
+                       wait_event = CreateEvent (NULL, TRUE, FALSE, NULL);
+                       g_assert(wait_event != 0);
+                       MONO_OBJECT_SETREF (ares, handle, (MonoObject *) mono_wait_handle_new (mono_object_domain (ares), wait_event));
+               } else {
+                       wait_event = mono_wait_handle_get_handle ((MonoWaitHandle *) ares->handle);
                }
                mono_monitor_exit ((MonoObject *) ares);
-               WaitForSingleObjectEx ((gpointer)(gsize)ac->wait_event, INFINITE, TRUE);
+               WaitForSingleObjectEx (wait_event, INFINITE, TRUE);
        } else {
                mono_monitor_exit ((MonoObject *) ares);
        }
 
+       ac = (ASyncCall *) ares->object_data;
+       g_assert (ac != NULL);
        *exc = ac->msg->exc; /* FIXME: GC add write barrier */
        *out_args = ac->out_args;
 
@@ -1287,6 +1303,11 @@ threadpool_queue_idle_thread (ThreadPool *tp, IdleThreadData *it)
        CRITICAL_SECTION *cs = &tp->lock;
 
        EnterCriticalSection (cs);
+       if (tp->idle_threads == NULL) {
+               it->die = TRUE;
+               LeaveCriticalSection (cs);
+               return NULL; /* We are shutting down */
+       }
        /*
        if (mono_100ns_ticks () - tp->last_sample.timeStamp > 10000 * 1000) {
                float elapsed_ticks;
@@ -1328,6 +1349,10 @@ threadpool_append_job (ThreadPool *tp, MonoObject *ar)
        cs = &tp->lock;
        threadpool_jobs_inc (ar); 
        EnterCriticalSection (cs);
+       if (tp->idle_threads == NULL) { 
+               LeaveCriticalSection (cs);
+               return; /* We are shutting down */
+       }
        if (ar->vtable->domain->state == MONO_APPDOMAIN_UNLOADING ||
                        ar->vtable->domain->state == MONO_APPDOMAIN_UNLOADED) {
                LeaveCriticalSection (cs);
@@ -1457,6 +1482,12 @@ threadpool_free_queue (ThreadPool *tp)
        tp->first_elem = tp->next_elem = 0;
 }
 
+gboolean
+mono_thread_pool_is_queue_array (MonoArray *o)
+{
+       return o == async_tp.array || o == async_io_tp.array;
+}
+
 static void
 async_invoke_thread (gpointer data)
 {
@@ -1469,6 +1500,8 @@ async_invoke_thread (gpointer data)
        idle_data.wait_handle = CreateEvent (NULL, FALSE, FALSE, NULL);
  
        thread = mono_thread_internal_current ();
+       if (tp_start_func)
+               tp_start_func (tp_hooks_user_data);
        version = mono_get_runtime_info ()->framework_version;
        for (;;) {
                MonoAsyncResult *ar;
@@ -1493,11 +1526,15 @@ async_invoke_thread (gpointer data)
                                }
 
                                if (mono_domain_set (domain, FALSE)) {
-                                       ASyncCall *ac;
+                                       /* ASyncCall *ac; */
 
+                                       if (tp_item_begin_func)
+                                               tp_item_begin_func (tp_item_user_data);
                                        mono_async_invoke (ar);
-                                       ac = (ASyncCall *) ar->object_data;
+                                       if (tp_item_end_func)
+                                               tp_item_end_func (tp_item_user_data);
                                        /*
+                                       ac = (ASyncCall *) ar->object_data;
                                        if (ac->msg->exc != NULL)
                                                mono_unhandled_exception (ac->msg->exc);
                                        */
@@ -1528,6 +1565,8 @@ async_invoke_thread (gpointer data)
                        InterlockedDecrement (&async_tp.nthreads);
                        CloseHandle (idle_data.wait_handle);
                        idle_data.wait_handle = NULL;
+                       if (tp_finish_func)
+                               tp_finish_func (tp_hooks_user_data);
                        return;
                }
                
@@ -1582,12 +1621,17 @@ start_idle_threads (void)
 MonoBoolean
 ves_icall_System_Threading_ThreadPool_SetMinThreads (gint workerThreads, gint completionPortThreads)
 {
+       int max_threads;
+       int max_io_threads;
+
        MONO_ARCH_SAVE_REGS;
 
-       if (workerThreads < 0 || workerThreads > async_tp.max_threads)
+       max_threads = InterlockedCompareExchange (&async_tp.max_threads, -1, -1);
+       if (workerThreads <= 0 || workerThreads > max_threads)
                return FALSE;
 
-       if (completionPortThreads < 0 || completionPortThreads > async_io_tp.max_threads)
+       max_io_threads = InterlockedCompareExchange (&async_io_tp.max_threads, -1, -1);
+       if (completionPortThreads <= 0 || completionPortThreads > max_io_threads)
                return FALSE;
 
        InterlockedExchange (&async_tp.min_threads, workerThreads);
@@ -1599,13 +1643,20 @@ ves_icall_System_Threading_ThreadPool_SetMinThreads (gint workerThreads, gint co
 MonoBoolean
 ves_icall_System_Threading_ThreadPool_SetMaxThreads (gint workerThreads, gint completionPortThreads)
 {
+       int min_threads;
+       int min_io_threads;
+       int cpu_count;
+
        MONO_ARCH_SAVE_REGS;
 
-       if (workerThreads < async_tp.max_threads)
+       cpu_count = mono_cpu_count ();
+       min_threads = InterlockedCompareExchange (&async_tp.min_threads, -1, -1);
+       if (workerThreads < min_threads || workerThreads < cpu_count)
                return FALSE;
 
        /* We don't really have the concept of completion ports. Do we care here? */
-       if (completionPortThreads < async_io_tp.max_threads)
+       min_io_threads = InterlockedCompareExchange (&async_io_tp.min_threads, -1, -1);
+       if (completionPortThreads < min_io_threads || completionPortThreads < cpu_count)
                return FALSE;
 
        InterlockedExchange (&async_tp.max_threads, workerThreads);
@@ -1613,3 +1664,39 @@ ves_icall_System_Threading_ThreadPool_SetMaxThreads (gint workerThreads, gint co
        return TRUE;
 }
 
+/**
+ * mono_install_threadpool_thread_hooks
+ * @start_func: the function to be called right after a new threadpool thread is created. Can be NULL.
+ * @finish_func: the function to be called right before a thredpool thread is exiting. Can be NULL.
+ * @user_data: argument passed to @start_func and @finish_func.
+ *
+ * @start_fun will be called right after a threadpool thread is created and @finish_func right before a threadpool thread exits.
+ * The calls will be made from the thread itself.
+ */
+void
+mono_install_threadpool_thread_hooks (MonoThreadPoolFunc start_func, MonoThreadPoolFunc finish_func, gpointer user_data)
+{
+       tp_start_func = start_func;
+       tp_finish_func = finish_func;
+       tp_hooks_user_data = user_data;
+}
+
+/**
+ * mono_install_threadpool_item_hooks
+ * @begin_func: the function to be called before a threadpool work item processing starts.
+ * @end_func: the function to be called after a threadpool work item is finished.
+ * @user_data: argument passed to @begin_func and @end_func.
+ *
+ * The calls will be made from the thread itself and from the same AppDomain
+ * where the work item was executed.
+ *
+ */
+void
+mono_install_threadpool_item_hooks (MonoThreadPoolItemFunc begin_func, MonoThreadPoolItemFunc end_func, gpointer user_data)
+{
+       tp_item_begin_func = begin_func;
+       tp_item_end_func = end_func;
+       tp_item_user_data = user_data;
+}
+
+