Fix typo.
[mono.git] / mono / metadata / threadpool.c
index 47ab1f3cec6014610cae370a2f51f31177b5cdaa..11f86ad1fa4472a44f43458fe98cb4afc30bf308 100644 (file)
 #include <config.h>
 #include <glib.h>
 
-#include <mono/metadata/appdomain.h>
+#ifdef PLATFORM_WIN32
+#define WINVER 0x0500
+#define _WIN32_WINNT 0x0500
+#define THREADS_PER_CPU        25
+#else
+#define THREADS_PER_CPU        50
+#endif
+
+#include <mono/metadata/domain-internals.h>
 #include <mono/metadata/tabledefs.h>
 #include <mono/metadata/threads.h>
+#include <mono/metadata/threads-types.h>
 #include <mono/metadata/exception.h>
 #include <mono/metadata/file-io.h>
 #include <mono/metadata/monitor.h>
 #include "threadpool.h"
 
 /* maximum number of worker threads */
-int mono_max_worker_threads = 25; /* fixme: should be 25 per available CPU */
+int mono_max_worker_threads = THREADS_PER_CPU;
+static int mono_min_worker_threads = 0;
+
 /* current number of worker threads */
 static int mono_worker_threads = 0;
 
 /* current number of busy threads */
-int busy_worker_threads = 0;
+static int busy_worker_threads = 0;
+
+/* mono_thread_pool_init called */
+static int tp_inited;
 
 /* we use this to store a reference to the AsyncResult to avoid GC */
 static MonoGHashTable *ares_htable = NULL;
 
+static CRITICAL_SECTION ares_lock;
+
 /* we append a job */
 static HANDLE job_added;
 
@@ -74,14 +90,40 @@ mono_async_invoke (MonoAsyncResult *ares)
        }
 
        /* notify listeners */
-       mono_monitor_try_enter ((MonoObject *) ares, INFINITE);
+       mono_monitor_enter ((MonoObject *) ares);
+       
        if (ares->handle != NULL) {
                ac->wait_event = ((MonoWaitHandle *) ares->handle)->handle;
                SetEvent (ac->wait_event);
        }
        mono_monitor_exit ((MonoObject *) ares);
 
+       EnterCriticalSection (&ares_lock);
        mono_g_hash_table_remove (ares_htable, ares);
+       LeaveCriticalSection (&ares_lock);
+}
+
+void
+mono_thread_pool_init ()
+{
+       SYSTEM_INFO info;
+       int threads_per_cpu = THREADS_PER_CPU;
+
+       if ((int) InterlockedCompareExchange (&tp_inited, 1, 0) == 1)
+               return;
+
+       MONO_GC_REGISTER_ROOT (ares_htable);
+       InitializeCriticalSection (&ares_lock);
+       ares_htable = mono_g_hash_table_new (NULL, NULL);
+       job_added = CreateSemaphore (NULL, 0, 0x7fffffff, NULL);
+       GetSystemInfo (&info);
+       if (getenv ("MONO_THREADS_PER_CPU") != NULL) {
+               threads_per_cpu = atoi (getenv ("MONO_THREADS_PER_CPU"));
+               if (threads_per_cpu <= 0)
+                       threads_per_cpu = THREADS_PER_CPU;
+       }
+
+       mono_max_worker_threads = threads_per_cpu * info.dwNumberOfProcessors;
 }
 
 MonoAsyncResult *
@@ -111,13 +153,10 @@ mono_thread_pool_add (MonoObject *target, MonoMethodMessage *msg, MonoDelegate *
        ares = mono_async_result_new (domain, NULL, ac->state, ac);
        ares->async_delegate = target;
 
-       if (!ares_htable) {
-               ares_htable = mono_g_hash_table_new (NULL, NULL);
-               job_added = CreateSemaphore (NULL, 0, 0x7fffffff, NULL);
-       }
-
+       EnterCriticalSection (&ares_lock);
        mono_g_hash_table_insert (ares_htable, ares, ares);
-
+       LeaveCriticalSection (&ares_lock);
+       
        busy = (int) InterlockedCompareExchange (&busy_worker_threads, 0, -1);
        worker = (int) InterlockedCompareExchange (&mono_worker_threads, 0, -1); 
        if (worker <= ++busy &&
@@ -142,7 +181,8 @@ mono_thread_pool_finish (MonoAsyncResult *ares, MonoArray **out_args, MonoObject
        *out_args = NULL;
 
        /* check if already finished */
-       mono_monitor_try_enter ((MonoObject *) ares, INFINITE);
+       mono_monitor_enter ((MonoObject *) ares);
+       
        if (ares->endinvoke_called) {
                *exc = (MonoObject *)mono_exception_from_name (mono_defaults.corlib, "System", 
                                              "InvalidOperationException");
@@ -162,7 +202,7 @@ mono_thread_pool_finish (MonoAsyncResult *ares, MonoArray **out_args, MonoObject
                        ares->handle = (MonoObject *) mono_wait_handle_new (mono_object_domain (ares), ac->wait_event);
                }
                mono_monitor_exit ((MonoObject *) ares);
-               WaitForSingleObject (ac->wait_event, INFINITE);
+               WaitForSingleObjectEx (ac->wait_event, INFINITE, TRUE);
        } else {
                mono_monitor_exit ((MonoObject *) ares);
        }
@@ -173,11 +213,36 @@ mono_thread_pool_finish (MonoAsyncResult *ares, MonoArray **out_args, MonoObject
        return ac->res;
 }
 
+void
+mono_thread_pool_cleanup (void)
+{
+       gint release;
+
+       EnterCriticalSection (&mono_delegate_section);
+       g_list_free (async_call_queue);
+       async_call_queue = NULL;
+       release = (gint) InterlockedCompareExchange (&busy_worker_threads, 0, -1);
+       LeaveCriticalSection (&mono_delegate_section);
+       if (job_added)
+               ReleaseSemaphore (job_added, release, NULL);
+}
+
 static void
 append_job (MonoAsyncResult *ar)
 {
+       GList *tmp;
+
        EnterCriticalSection (&mono_delegate_section);
-       async_call_queue = g_list_append (async_call_queue, ar); 
+       if (async_call_queue == NULL) {
+               async_call_queue = g_list_append (async_call_queue, ar); 
+       } else {
+               for (tmp = async_call_queue; tmp && tmp->data != NULL; tmp = tmp->next);
+               if (tmp == NULL) {
+                       async_call_queue = g_list_append (async_call_queue, ar); 
+               } else {
+                       tmp->data = ar;
+               }
+       }
        LeaveCriticalSection (&mono_delegate_section);
 }
 
@@ -185,17 +250,23 @@ static MonoAsyncResult *
 dequeue_job (void)
 {
        MonoAsyncResult *ar = NULL;
-       GList *tmp = NULL;
+       GList *tmp, *tmp2;
 
        EnterCriticalSection (&mono_delegate_section);
-       if (async_call_queue) {
-               ar = (MonoAsyncResult *)async_call_queue->data;
-               tmp = async_call_queue;
-               async_call_queue = g_list_remove_link (tmp, tmp); 
+       tmp = async_call_queue;
+       if (tmp) {
+               ar = (MonoAsyncResult *) tmp->data;
+               tmp->data = NULL;
+               tmp2 = tmp;
+               for (tmp2 = tmp; tmp2->next != NULL; tmp2 = tmp2->next);
+               if (tmp2 != tmp) {
+                       async_call_queue = tmp->next;
+                       tmp->next = NULL;
+                       tmp2->next = tmp;
+                       tmp->prev = tmp2;
+               }
        }
        LeaveCriticalSection (&mono_delegate_section);
-       if (tmp)
-               g_list_free_1 (tmp);
 
        return ar;
 }
@@ -205,10 +276,12 @@ async_invoke_thread (gpointer data)
 {
        MonoDomain *domain;
        MonoThread *thread;
+       int workers, min;
  
        thread = mono_thread_current ();
        thread->threadpool_thread = TRUE;
        thread->state |= ThreadState_Background;
+
        for (;;) {
                MonoAsyncResult *ar;
 
@@ -217,19 +290,54 @@ async_invoke_thread (gpointer data)
                        /* worker threads invokes methods in different domains,
                         * so we need to set the right domain here */
                        domain = ((MonoObject *)ar)->vtable->domain;
-                       if (mono_domain_set (domain, FALSE))
+                       if (mono_domain_set (domain, FALSE)) {
+                               mono_thread_push_appdomain_ref (domain);
                                mono_async_invoke (ar);
+                               mono_thread_pop_appdomain_ref ();
+                       }
                        InterlockedDecrement (&busy_worker_threads);
                }
 
                data = dequeue_job ();
-               if (!data && WaitForSingleObject (job_added, 500) != WAIT_TIMEOUT)
-                       data = dequeue_job ();
+       
+               if (!data) {
+                       guint32 wr;
+                       int timeout = 500;
+                       guint32 start_time = GetTickCount ();
+                       
+                       do {
+                               wr = WaitForSingleObjectEx (job_added, (guint32)timeout, TRUE);
+                               if ((thread->state & ThreadState_StopRequested)!=0)
+                                       mono_thread_interruption_checkpoint ();
+                       
+                               timeout -= GetTickCount () - start_time;
+                       
+                               if (wr != WAIT_TIMEOUT)
+                                       data = dequeue_job ();
+                       }
+                       while (!data && timeout > 0);
+               }
 
+               if (!data) {
+                       workers = (int) InterlockedCompareExchange (&mono_worker_threads, 0, -1); 
+                       min = (int) InterlockedCompareExchange (&mono_min_worker_threads, 0, -1); 
+       
+                       while (!data && workers <= min) {
+                               WaitForSingleObjectEx (job_added, INFINITE, TRUE);
+                               if ((thread->state & ThreadState_StopRequested)!=0)
+                                       mono_thread_interruption_checkpoint ();
+                       
+                               data = dequeue_job ();
+                               workers = (int) InterlockedCompareExchange (&mono_worker_threads, 0, -1); 
+                               min = (int) InterlockedCompareExchange (&mono_min_worker_threads, 0, -1); 
+                       }
+               }
+       
                if (!data) {
                        InterlockedDecrement (&mono_worker_threads);
                        return;
                }
+               
                InterlockedIncrement (&busy_worker_threads);
        }
 
@@ -257,12 +365,35 @@ ves_icall_System_Threading_ThreadPool_GetMaxThreads (gint *workerThreads, gint *
        *completionPortThreads = 0;
 }
 
+void
+ves_icall_System_Threading_ThreadPool_GetMinThreads (gint *workerThreads, gint *completionPortThreads)
+{
+       gint workers;
+
+       MONO_ARCH_SAVE_REGS;
+
+       workers = (gint) InterlockedCompareExchange (&mono_min_worker_threads, 0, -1);
+       *workerThreads = workers;
+       *completionPortThreads = 0;
+}
+
+MonoBoolean
+ves_icall_System_Threading_ThreadPool_SetMinThreads (gint workerThreads, gint completionPortThreads)
+{
+       MONO_ARCH_SAVE_REGS;
+
+       if (workerThreads < 0 || workerThreads > mono_max_worker_threads)
+               return FALSE;
+       InterlockedExchange (&mono_min_worker_threads, workerThreads);
+       /* FIXME: should actually start the idle threads if needed */
+       return TRUE;
+}
+
 static void
 overlapped_callback (guint32 error, guint32 numbytes, WapiOverlapped *overlapped)
 {
        MonoFSAsyncResult *ares;
        MonoThread *thread;
-       gpointer ftn;
  
        MONO_ARCH_SAVE_REGS;
 
@@ -274,7 +405,6 @@ overlapped_callback (guint32 error, guint32 numbytes, WapiOverlapped *overlapped
                ares->count = numbytes;
 
        thread = mono_thread_attach (mono_object_domain (ares));
-       SetEvent (ares->wait_handle->handle);
        if (ares->async_callback != NULL) {
                gpointer p [1];
 
@@ -282,8 +412,8 @@ overlapped_callback (guint32 error, guint32 numbytes, WapiOverlapped *overlapped
                mono_runtime_invoke (ares->async_callback->method_info->method, NULL, p, NULL);
        }
 
+       SetEvent (ares->wait_handle->handle);
        mono_thread_detach (thread);
-       CloseHandle (thread->handle);
        g_free (overlapped);
 }
 
@@ -292,6 +422,9 @@ ves_icall_System_Threading_ThreadPool_BindHandle (gpointer handle)
 {
        MONO_ARCH_SAVE_REGS;
 
+#ifdef PLATFORM_WIN32
+       return FALSE;
+#else
        if (!BindIoCompletionCallback (handle, overlapped_callback, 0)) {
                gint error = GetLastError ();
                MonoException *exc;
@@ -311,5 +444,6 @@ ves_icall_System_Threading_ThreadPool_BindHandle (gpointer handle)
        }
 
        return TRUE;
+#endif
 }