Patch for bug #62532 implemention of a kqueue/kevent based FileSystemWatcher.
[mono.git] / mono / metadata / threadpool.c
index b2b89aca3c5109a0b709ae13efb431dcb361de38..0803f0f846299acfbcc14cc917797b6cb2dc269a 100644 (file)
@@ -3,35 +3,52 @@
  *
  * Authors:
  *   Dietmar Maurer (dietmar@ximian.com)
+ *   Gonzalo Paniagua Javier (gonzalo@ximian.com)
  *
- * (C) 2001 Ximian, Inc.
+ * (C) 2001-2003 Ximian, Inc.
+ * (c) 2004 Novell, Inc. (http://www.novell.com)
  */
 
 #include <config.h>
 #include <glib.h>
 
-#include <mono/metadata/appdomain.h>
+#ifdef PLATFORM_WIN32
+#define WINVER 0x0500
+#define _WIN32_WINNT 0x0500
+#endif
+
+#include <mono/metadata/domain-internals.h>
 #include <mono/metadata/tabledefs.h>
 #include <mono/metadata/threads.h>
+#include <mono/metadata/threads-types.h>
 #include <mono/metadata/exception.h>
+#include <mono/metadata/file-io.h>
+#include <mono/metadata/monitor.h>
+#include <mono/metadata/marshal.h>
 #include <mono/io-layer/io-layer.h>
 #include <mono/os/gc_wrapper.h>
 
 #include "threadpool.h"
 
-/* FIXME:
- * - worker threads need to be initialized correctly.
- * - worker threads should be domain specific
- */
-
 /* maximum number of worker threads */
-int mono_worker_threads = 1;
+int mono_max_worker_threads = 25; /* fixme: should be 25 per available CPU */
+static int mono_min_worker_threads = 0;
+
+/* current number of worker threads */
+static int mono_worker_threads = 0;
+
+/* current number of busy threads */
+static int busy_worker_threads = 0;
 
-static int workers = 0;
+/* we use this to store a reference to the AsyncResult to avoid GC */
+static MonoGHashTable *ares_htable = NULL;
+
+/* we append a job */
+static HANDLE job_added;
 
 typedef struct {
        MonoMethodMessage *msg;
-       HANDLE             wait_semaphore;
+       HANDLE             wait_event;
        MonoMethod        *cb_method;
        MonoDelegate      *cb_target;
        MonoObject        *state;
@@ -39,7 +56,8 @@ typedef struct {
        MonoArray         *out_args;
 } ASyncCall;
 
-static void async_invoke_thread (void);
+static void async_invoke_thread (gpointer data);
+static void append_job (MonoAsyncResult *ar);
 
 static GList *async_call_queue = NULL;
 
@@ -53,9 +71,6 @@ mono_async_invoke (MonoAsyncResult *ares)
                                       &ac->msg->exc, &ac->out_args);
 
        ares->completed = 1;
-               
-       /* notify listeners */
-       ReleaseSemaphore (ac->wait_semaphore, 0x7fffffff, NULL);
 
        /* call async callback if cb_method != null*/
        if (ac->cb_method) {
@@ -65,6 +80,18 @@ mono_async_invoke (MonoAsyncResult *ares)
                if (!ac->msg->exc)
                        ac->msg->exc = exc;
        }
+
+       /* notify listeners */
+       if(!mono_monitor_enter ((MonoObject *) ares))
+               return;
+       
+       if (ares->handle != NULL) {
+               ac->wait_event = ((MonoWaitHandle *) ares->handle)->handle;
+               SetEvent (ac->wait_event);
+       }
+       mono_monitor_exit ((MonoObject *) ares);
+
+       mono_g_hash_table_remove (ares_htable, ares);
 }
 
 MonoAsyncResult *
@@ -74,14 +101,15 @@ mono_thread_pool_add (MonoObject *target, MonoMethodMessage *msg, MonoDelegate *
        MonoDomain *domain = mono_domain_get ();
        MonoAsyncResult *ares;
        ASyncCall *ac;
+       int busy, worker;
 
 #ifdef HAVE_BOEHM_GC
        ac = GC_MALLOC (sizeof (ASyncCall));
 #else
-       /* We'll leak the semaphore... */
+       /* We'll leak the event if creaated... */
        ac = g_new0 (ASyncCall, 1);
 #endif
-       ac->wait_semaphore = CreateSemaphore (NULL, 0, 0x7fffffff, NULL);       
+       ac->wait_event = NULL;
        ac->msg = msg;
        ac->state = state;
 
@@ -90,18 +118,28 @@ mono_thread_pool_add (MonoObject *target, MonoMethodMessage *msg, MonoDelegate *
                ac->cb_target = async_callback;
        }
 
-       ares = mono_async_result_new (domain, ac->wait_semaphore, ac->state, ac);
+       ares = mono_async_result_new (domain, NULL, ac->state, ac);
        ares->async_delegate = target;
 
-       EnterCriticalSection (&mono_delegate_section);  
-       async_call_queue = g_list_append (async_call_queue, ares); 
-       ReleaseSemaphore (mono_delegate_semaphore, 1, NULL);
+       if (!ares_htable) {
+               MONO_GC_REGISTER_ROOT (ares_htable);
+               ares_htable = mono_g_hash_table_new (NULL, NULL);
+               job_added = CreateSemaphore (NULL, 0, 0x7fffffff, NULL);
+       }
 
-       if (workers == 0) {
-               workers++;
-               mono_thread_create (domain, async_invoke_thread, NULL);
+       mono_g_hash_table_insert (ares_htable, ares, ares);
+
+       busy = (int) InterlockedCompareExchange (&busy_worker_threads, 0, -1);
+       worker = (int) InterlockedCompareExchange (&mono_worker_threads, 0, -1); 
+       if (worker <= ++busy &&
+           worker < mono_max_worker_threads) {
+               InterlockedIncrement (&mono_worker_threads);
+               InterlockedIncrement (&busy_worker_threads);
+               mono_thread_create (domain, async_invoke_thread, ares);
+       } else {
+               append_job (ares);
+               ReleaseSemaphore (job_added, 1, NULL);
        }
-       LeaveCriticalSection (&mono_delegate_section);
 
        return ares;
 }
@@ -110,17 +148,19 @@ MonoObject *
 mono_thread_pool_finish (MonoAsyncResult *ares, MonoArray **out_args, MonoObject **exc)
 {
        ASyncCall *ac;
-       GList *l;
 
        *exc = NULL;
        *out_args = NULL;
 
-       EnterCriticalSection (&mono_delegate_section);  
        /* check if already finished */
+       if (!mono_monitor_enter ((MonoObject *) ares)) {
+               return NULL;
+       }
+       
        if (ares->endinvoke_called) {
                *exc = (MonoObject *)mono_exception_from_name (mono_defaults.corlib, "System", 
                                              "InvalidOperationException");
-               LeaveCriticalSection (&mono_delegate_section);
+               mono_monitor_exit ((MonoObject *) ares);
                return NULL;
        }
 
@@ -129,14 +169,17 @@ mono_thread_pool_finish (MonoAsyncResult *ares, MonoArray **out_args, MonoObject
 
        g_assert (ac != NULL);
 
-       if ((l = g_list_find (async_call_queue, ares))) {
-               async_call_queue = g_list_remove_link (async_call_queue, l);
-               mono_async_invoke (ares);
-       }               
-       LeaveCriticalSection (&mono_delegate_section);
-       
        /* wait until we are really finished */
-       WaitForSingleObject (ac->wait_semaphore, INFINITE);
+       if (!ares->completed) {
+               if (ares->handle == NULL) {
+                       ac->wait_event = CreateEvent (NULL, TRUE, FALSE, NULL);
+                       ares->handle = (MonoObject *) mono_wait_handle_new (mono_object_domain (ares), ac->wait_event);
+               }
+               mono_monitor_exit ((MonoObject *) ares);
+               WaitForSingleObjectEx (ac->wait_event, INFINITE, TRUE);
+       } else {
+               mono_monitor_exit ((MonoObject *) ares);
+       }
 
        *exc = ac->msg->exc;
        *out_args = ac->out_args;
@@ -144,52 +187,214 @@ mono_thread_pool_finish (MonoAsyncResult *ares, MonoArray **out_args, MonoObject
        return ac->res;
 }
 
+void
+mono_thread_pool_cleanup (void)
+{
+       gint release;
+
+       EnterCriticalSection (&mono_delegate_section);
+       g_list_free (async_call_queue);
+       async_call_queue = NULL;
+       release = (gint) InterlockedCompareExchange (&busy_worker_threads, 0, -1);
+       LeaveCriticalSection (&mono_delegate_section);
+       if (job_added)
+               ReleaseSemaphore (job_added, release, NULL);
+}
+
 static void
-async_invoke_thread ()
+append_job (MonoAsyncResult *ar)
+{
+       EnterCriticalSection (&mono_delegate_section);
+       async_call_queue = g_list_append (async_call_queue, ar); 
+       LeaveCriticalSection (&mono_delegate_section);
+}
+
+static MonoAsyncResult *
+dequeue_job (void)
+{
+       MonoAsyncResult *ar = NULL;
+       GList *tmp = NULL;
+
+       EnterCriticalSection (&mono_delegate_section);
+       if (async_call_queue) {
+               ar = (MonoAsyncResult *)async_call_queue->data;
+               tmp = async_call_queue;
+               async_call_queue = g_list_remove_link (tmp, tmp); 
+       }
+       LeaveCriticalSection (&mono_delegate_section);
+       if (tmp)
+               g_list_free_1 (tmp);
+
+       return ar;
+}
+
+static void
+async_invoke_thread (gpointer data)
 {
        MonoDomain *domain;
+       MonoThread *thread;
+       int workers, min;
  
+       thread = mono_thread_current ();
+       thread->threadpool_thread = TRUE;
+       thread->state |= ThreadState_Background;
        for (;;) {
                MonoAsyncResult *ar;
-               gboolean new_worker = FALSE;
 
-               if (WaitForSingleObject (mono_delegate_semaphore, 500) == WAIT_TIMEOUT) {
-                       EnterCriticalSection (&mono_delegate_section);
-                       workers--;
-                       LeaveCriticalSection (&mono_delegate_section);
-                       ExitThread (0);
+               ar = (MonoAsyncResult *) data;
+               if (ar) {
+                       /* worker threads invokes methods in different domains,
+                        * so we need to set the right domain here */
+                       domain = ((MonoObject *)ar)->vtable->domain;
+                       if (mono_domain_set (domain, FALSE))
+                               mono_async_invoke (ar);
+                       InterlockedDecrement (&busy_worker_threads);
+               }
+
+               data = dequeue_job ();
+       
+               if (!data) {
+                       guint32 wr;
+                       int timeout = 500;
+                       guint32 start_time = GetTickCount ();
+                       
+                       do {
+                               wr = WaitForSingleObjectEx (job_added, (guint32)timeout, TRUE);
+                               mono_thread_interruption_checkpoint ();
+                       
+                               timeout -= GetTickCount () - start_time;
+                       
+                               if (wr != WAIT_TIMEOUT)
+                                       data = dequeue_job ();
+                       }
+                       while (!data && timeout > 0);
                }
                
-               ar = NULL;
-               EnterCriticalSection (&mono_delegate_section);
-               
-               if (async_call_queue) {
-                       if ((g_list_length (async_call_queue) > 1) && 
-                           (workers < mono_worker_threads)) {
-                               new_worker = TRUE;
-                               workers++;
+               if (!data) {
+                       workers = (int) InterlockedCompareExchange (&mono_worker_threads, 0, -1); 
+                       min = (int) InterlockedCompareExchange (&mono_min_worker_threads, 0, -1); 
+       
+                       while (!data && workers <= min) {
+                               WaitForSingleObjectEx (job_added, INFINITE, TRUE);
+                               mono_thread_interruption_checkpoint ();
+                       
+                               data = dequeue_job ();
+                               workers = (int) InterlockedCompareExchange (&mono_worker_threads, 0, -1); 
+                               min = (int) InterlockedCompareExchange (&mono_min_worker_threads, 0, -1); 
                        }
+               }
+       
+               if (!data) {
+                       InterlockedDecrement (&mono_worker_threads);
+                       return;
+               }
+               
+               InterlockedIncrement (&busy_worker_threads);
+       }
 
-                       ar = (MonoAsyncResult *)async_call_queue->data;
-                       async_call_queue = g_list_remove_link (async_call_queue, async_call_queue); 
+       g_assert_not_reached ();
+}
 
-               }
+void
+ves_icall_System_Threading_ThreadPool_GetAvailableThreads (gint *workerThreads, gint *completionPortThreads)
+{
+       gint busy;
 
-               LeaveCriticalSection (&mono_delegate_section);
+       MONO_ARCH_SAVE_REGS;
 
-               if (!ar)
-                       continue;
-               
-               /* worker threads invokes methods in different domains,
-                * so we need to set the right domain here */
-               domain = ((MonoObject *)ar)->vtable->domain;
-               mono_domain_set (domain);
+       busy = (gint) InterlockedCompareExchange (&busy_worker_threads, 0, -1);
+       *workerThreads = mono_max_worker_threads - busy;
+       *completionPortThreads = 0;
+}
+
+void
+ves_icall_System_Threading_ThreadPool_GetMaxThreads (gint *workerThreads, gint *completionPortThreads)
+{
+       MONO_ARCH_SAVE_REGS;
 
-               if (new_worker)
-                       mono_thread_create (domain, async_invoke_thread, NULL);
+       *workerThreads = mono_max_worker_threads;
+       *completionPortThreads = 0;
+}
 
-               mono_async_invoke (ar);
+void
+ves_icall_System_Threading_ThreadPool_GetMinThreads (gint *workerThreads, gint *completionPortThreads)
+{
+       gint workers;
+
+       MONO_ARCH_SAVE_REGS;
+
+       workers = (gint) InterlockedCompareExchange (&mono_min_worker_threads, 0, -1);
+       *workerThreads = workers;
+       *completionPortThreads = 0;
+}
+
+MonoBoolean
+ves_icall_System_Threading_ThreadPool_SetMinThreads (gint workerThreads, gint completionPortThreads)
+{
+       MONO_ARCH_SAVE_REGS;
+
+       if (workerThreads < 0 || workerThreads > mono_max_worker_threads)
+               return FALSE;
+       InterlockedExchange (&mono_min_worker_threads, workerThreads);
+       /* FIXME: should actually start the idle threads if needed */
+       return TRUE;
+}
+
+static void
+overlapped_callback (guint32 error, guint32 numbytes, WapiOverlapped *overlapped)
+{
+       MonoFSAsyncResult *ares;
+       MonoThread *thread;
+       MONO_ARCH_SAVE_REGS;
+
+       ares = (MonoFSAsyncResult *) overlapped->handle1;
+       ares->completed = TRUE;
+       if (ares->bytes_read != -1)
+               ares->bytes_read = numbytes;
+       else
+               ares->count = numbytes;
+
+       thread = mono_thread_attach (mono_object_domain (ares));
+       if (ares->async_callback != NULL) {
+               gpointer p [1];
+
+               *p = ares;
+               mono_runtime_invoke (ares->async_callback->method_info->method, NULL, p, NULL);
        }
 
-       g_assert_not_reached ();
+       SetEvent (ares->wait_handle->handle);
+       mono_thread_detach (thread);
+       g_free (overlapped);
 }
+
+MonoBoolean
+ves_icall_System_Threading_ThreadPool_BindHandle (gpointer handle)
+{
+       MONO_ARCH_SAVE_REGS;
+
+#ifdef PLATFORM_WIN32
+       return FALSE;
+#else
+       if (!BindIoCompletionCallback (handle, overlapped_callback, 0)) {
+               gint error = GetLastError ();
+               MonoException *exc;
+               gchar *msg;
+
+               if (error == ERROR_INVALID_PARAMETER) {
+                       exc = mono_get_exception_argument (NULL, "Invalid parameter.");
+               } else {
+                       msg = g_strdup_printf ("Win32 error %d.", error);
+                       exc = mono_exception_from_name_msg (mono_defaults.corlib,
+                                                           "System",
+                                                           "ApplicationException", msg);
+                       g_free (msg);
+               }
+
+               mono_raise_exception (exc);
+       }
+
+       return TRUE;
+#endif
+}
+