*
* Authors:
* Dietmar Maurer (dietmar@ximian.com)
+ * Gonzalo Paniagua Javier (gonzalo@ximian.com)
*
- * (C) 2001 Ximian, Inc.
+ * (C) 2001-2003 Ximian, Inc.
+ * (c) 2004 Novell, Inc. (http://www.novell.com)
*/
#include <config.h>
#include <glib.h>
-#include <mono/metadata/appdomain.h>
+#ifdef PLATFORM_WIN32
+#define WINVER 0x0500
+#define _WIN32_WINNT 0x0500
+#endif
+
+#include <mono/metadata/domain-internals.h>
#include <mono/metadata/tabledefs.h>
#include <mono/metadata/threads.h>
+#include <mono/metadata/threads-types.h>
#include <mono/metadata/exception.h>
+#include <mono/metadata/file-io.h>
+#include <mono/metadata/monitor.h>
+#include <mono/metadata/marshal.h>
#include <mono/io-layer/io-layer.h>
+#include <mono/os/gc_wrapper.h>
#include "threadpool.h"
-/* FIXME:
- * - worker threads need to be initialized correctly.
- * - worker threads should be domain specific
- */
-
/* maximum number of worker threads */
-int mono_worker_threads = 1;
+int mono_max_worker_threads = 25; /* fixme: should be 25 per available CPU */
+static int mono_min_worker_threads = 0;
+
+/* current number of worker threads */
+static int mono_worker_threads = 0;
+
+/* current number of busy threads */
+static int busy_worker_threads = 0;
+
+/* we use this to store a reference to the AsyncResult to avoid GC */
+static MonoGHashTable *ares_htable = NULL;
-static int workers = 0;
+/* we append a job */
+static HANDLE job_added;
typedef struct {
MonoMethodMessage *msg;
- HANDLE wait_semaphore;
+ HANDLE wait_event;
MonoMethod *cb_method;
MonoDelegate *cb_target;
MonoObject *state;
MonoArray *out_args;
} ASyncCall;
-static void async_invoke_thread (void);
+static void async_invoke_thread (gpointer data);
+static void append_job (MonoAsyncResult *ar);
static GList *async_call_queue = NULL;
&ac->msg->exc, &ac->out_args);
ares->completed = 1;
-
- /* notify listeners */
- ReleaseSemaphore (ac->wait_semaphore, 0x7fffffff, NULL);
/* call async callback if cb_method != null*/
if (ac->cb_method) {
if (!ac->msg->exc)
ac->msg->exc = exc;
}
+
+ /* notify listeners */
+ if(!mono_monitor_enter ((MonoObject *) ares))
+ return;
+
+ if (ares->handle != NULL) {
+ ac->wait_event = ((MonoWaitHandle *) ares->handle)->handle;
+ SetEvent (ac->wait_event);
+ }
+ mono_monitor_exit ((MonoObject *) ares);
+
+ mono_g_hash_table_remove (ares_htable, ares);
}
MonoAsyncResult *
MonoDomain *domain = mono_domain_get ();
MonoAsyncResult *ares;
ASyncCall *ac;
+ int busy, worker;
+#ifdef HAVE_BOEHM_GC
+ ac = GC_MALLOC (sizeof (ASyncCall));
+#else
+ /* We'll leak the event if creaated... */
ac = g_new0 (ASyncCall, 1);
- ac->wait_semaphore = CreateSemaphore (NULL, 0, 0x7fffffff, NULL);
+#endif
+ ac->wait_event = NULL;
ac->msg = msg;
ac->state = state;
ac->cb_target = async_callback;
}
- ares = mono_async_result_new (domain, ac->wait_semaphore, ac->state, ac);
+ ares = mono_async_result_new (domain, NULL, ac->state, ac);
ares->async_delegate = target;
- EnterCriticalSection (&mono_delegate_section);
- async_call_queue = g_list_append (async_call_queue, ares);
- ReleaseSemaphore (mono_delegate_semaphore, 1, NULL);
+ if (!ares_htable) {
+ MONO_GC_REGISTER_ROOT (ares_htable);
+ ares_htable = mono_g_hash_table_new (NULL, NULL);
+ job_added = CreateSemaphore (NULL, 0, 0x7fffffff, NULL);
+ }
- if (workers == 0) {
- workers++;
- mono_thread_create (domain, async_invoke_thread, NULL);
+ mono_g_hash_table_insert (ares_htable, ares, ares);
+
+ busy = (int) InterlockedCompareExchange (&busy_worker_threads, 0, -1);
+ worker = (int) InterlockedCompareExchange (&mono_worker_threads, 0, -1);
+ if (worker <= ++busy &&
+ worker < mono_max_worker_threads) {
+ InterlockedIncrement (&mono_worker_threads);
+ InterlockedIncrement (&busy_worker_threads);
+ mono_thread_create (domain, async_invoke_thread, ares);
+ } else {
+ append_job (ares);
+ ReleaseSemaphore (job_added, 1, NULL);
}
- LeaveCriticalSection (&mono_delegate_section);
return ares;
}
mono_thread_pool_finish (MonoAsyncResult *ares, MonoArray **out_args, MonoObject **exc)
{
ASyncCall *ac;
- GList *l;
*exc = NULL;
*out_args = NULL;
- EnterCriticalSection (&mono_delegate_section);
/* check if already finished */
+ if (!mono_monitor_enter ((MonoObject *) ares)) {
+ return NULL;
+ }
+
if (ares->endinvoke_called) {
*exc = (MonoObject *)mono_exception_from_name (mono_defaults.corlib, "System",
"InvalidOperationException");
- LeaveCriticalSection (&mono_delegate_section);
+ mono_monitor_exit ((MonoObject *) ares);
return NULL;
}
g_assert (ac != NULL);
- if ((l = g_list_find (async_call_queue, ares))) {
- async_call_queue = g_list_remove_link (async_call_queue, l);
- mono_async_invoke (ares);
- }
- LeaveCriticalSection (&mono_delegate_section);
-
/* wait until we are really finished */
- WaitForSingleObject (ac->wait_semaphore, INFINITE);
+ if (!ares->completed) {
+ if (ares->handle == NULL) {
+ ac->wait_event = CreateEvent (NULL, TRUE, FALSE, NULL);
+ ares->handle = (MonoObject *) mono_wait_handle_new (mono_object_domain (ares), ac->wait_event);
+ }
+ mono_monitor_exit ((MonoObject *) ares);
+ WaitForSingleObjectEx (ac->wait_event, INFINITE, TRUE);
+ } else {
+ mono_monitor_exit ((MonoObject *) ares);
+ }
*exc = ac->msg->exc;
*out_args = ac->out_args;
return ac->res;
}
+void
+mono_thread_pool_cleanup (void)
+{
+ gint release;
+
+ EnterCriticalSection (&mono_delegate_section);
+ g_list_free (async_call_queue);
+ async_call_queue = NULL;
+ release = (gint) InterlockedCompareExchange (&busy_worker_threads, 0, -1);
+ LeaveCriticalSection (&mono_delegate_section);
+ if (job_added)
+ ReleaseSemaphore (job_added, release, NULL);
+}
+
+static void
+append_job (MonoAsyncResult *ar)
+{
+ EnterCriticalSection (&mono_delegate_section);
+ async_call_queue = g_list_append (async_call_queue, ar);
+ LeaveCriticalSection (&mono_delegate_section);
+}
+
+static MonoAsyncResult *
+dequeue_job (void)
+{
+ MonoAsyncResult *ar = NULL;
+ GList *tmp = NULL;
+
+ EnterCriticalSection (&mono_delegate_section);
+ if (async_call_queue) {
+ ar = (MonoAsyncResult *)async_call_queue->data;
+ tmp = async_call_queue;
+ async_call_queue = g_list_remove_link (tmp, tmp);
+ }
+ LeaveCriticalSection (&mono_delegate_section);
+ if (tmp)
+ g_list_free_1 (tmp);
+
+ return ar;
+}
+
static void
-async_invoke_thread ()
+async_invoke_thread (gpointer data)
{
MonoDomain *domain;
+ MonoThread *thread;
+ int workers, min;
+ thread = mono_thread_current ();
+ thread->threadpool_thread = TRUE;
+ thread->state |= ThreadState_Background;
for (;;) {
MonoAsyncResult *ar;
- gboolean new_worker = FALSE;
- if (WaitForSingleObject (mono_delegate_semaphore, 500) == WAIT_TIMEOUT) {
- EnterCriticalSection (&mono_delegate_section);
- workers--;
- LeaveCriticalSection (&mono_delegate_section);
- ExitThread (0);
+ ar = (MonoAsyncResult *) data;
+ if (ar) {
+ /* worker threads invokes methods in different domains,
+ * so we need to set the right domain here */
+ domain = ((MonoObject *)ar)->vtable->domain;
+ if (mono_domain_set (domain, FALSE))
+ mono_async_invoke (ar);
+ InterlockedDecrement (&busy_worker_threads);
+ }
+
+ data = dequeue_job ();
+
+ if (!data) {
+ guint32 wr;
+ int timeout = 500;
+ guint32 start_time = GetTickCount ();
+
+ do {
+ wr = WaitForSingleObjectEx (job_added, (guint32)timeout, TRUE);
+ mono_thread_interruption_checkpoint ();
+
+ timeout -= GetTickCount () - start_time;
+
+ if (wr != WAIT_TIMEOUT)
+ data = dequeue_job ();
+ }
+ while (!data && timeout > 0);
}
- ar = NULL;
- EnterCriticalSection (&mono_delegate_section);
-
- if (async_call_queue) {
- if ((g_list_length (async_call_queue) > 1) &&
- (workers < mono_worker_threads)) {
- new_worker = TRUE;
- workers++;
+ if (!data) {
+ workers = (int) InterlockedCompareExchange (&mono_worker_threads, 0, -1);
+ min = (int) InterlockedCompareExchange (&mono_min_worker_threads, 0, -1);
+
+ while (!data && workers <= min) {
+ WaitForSingleObjectEx (job_added, INFINITE, TRUE);
+ mono_thread_interruption_checkpoint ();
+
+ data = dequeue_job ();
+ workers = (int) InterlockedCompareExchange (&mono_worker_threads, 0, -1);
+ min = (int) InterlockedCompareExchange (&mono_min_worker_threads, 0, -1);
}
+ }
+
+ if (!data) {
+ InterlockedDecrement (&mono_worker_threads);
+ return;
+ }
+
+ InterlockedIncrement (&busy_worker_threads);
+ }
- ar = (MonoAsyncResult *)async_call_queue->data;
- async_call_queue = g_list_remove_link (async_call_queue, async_call_queue);
+ g_assert_not_reached ();
+}
- }
+void
+ves_icall_System_Threading_ThreadPool_GetAvailableThreads (gint *workerThreads, gint *completionPortThreads)
+{
+ gint busy;
- LeaveCriticalSection (&mono_delegate_section);
+ MONO_ARCH_SAVE_REGS;
- if (!ar)
- continue;
-
- /* worker threads invokes methods in different domains,
- * so we need to set the right domain here */
- domain = ((MonoObject *)ar)->vtable->domain;
- mono_domain_set (domain);
+ busy = (gint) InterlockedCompareExchange (&busy_worker_threads, 0, -1);
+ *workerThreads = mono_max_worker_threads - busy;
+ *completionPortThreads = 0;
+}
+
+void
+ves_icall_System_Threading_ThreadPool_GetMaxThreads (gint *workerThreads, gint *completionPortThreads)
+{
+ MONO_ARCH_SAVE_REGS;
- if (new_worker)
- mono_thread_create (domain, async_invoke_thread, NULL);
+ *workerThreads = mono_max_worker_threads;
+ *completionPortThreads = 0;
+}
+
+void
+ves_icall_System_Threading_ThreadPool_GetMinThreads (gint *workerThreads, gint *completionPortThreads)
+{
+ gint workers;
- mono_async_invoke (ar);
+ MONO_ARCH_SAVE_REGS;
+
+ workers = (gint) InterlockedCompareExchange (&mono_min_worker_threads, 0, -1);
+ *workerThreads = workers;
+ *completionPortThreads = 0;
+}
+
+MonoBoolean
+ves_icall_System_Threading_ThreadPool_SetMinThreads (gint workerThreads, gint completionPortThreads)
+{
+ MONO_ARCH_SAVE_REGS;
+
+ if (workerThreads < 0 || workerThreads > mono_max_worker_threads)
+ return FALSE;
+ InterlockedExchange (&mono_min_worker_threads, workerThreads);
+ /* FIXME: should actually start the idle threads if needed */
+ return TRUE;
+}
+
+static void
+overlapped_callback (guint32 error, guint32 numbytes, WapiOverlapped *overlapped)
+{
+ MonoFSAsyncResult *ares;
+ MonoThread *thread;
+
+ MONO_ARCH_SAVE_REGS;
+
+ ares = (MonoFSAsyncResult *) overlapped->handle1;
+ ares->completed = TRUE;
+ if (ares->bytes_read != -1)
+ ares->bytes_read = numbytes;
+ else
+ ares->count = numbytes;
+
+ thread = mono_thread_attach (mono_object_domain (ares));
+ if (ares->async_callback != NULL) {
+ gpointer p [1];
+
+ *p = ares;
+ mono_runtime_invoke (ares->async_callback->method_info->method, NULL, p, NULL);
}
- g_assert_not_reached ();
+ SetEvent (ares->wait_handle->handle);
+ mono_thread_detach (thread);
+ g_free (overlapped);
+}
+
+MonoBoolean
+ves_icall_System_Threading_ThreadPool_BindHandle (gpointer handle)
+{
+ MONO_ARCH_SAVE_REGS;
+
+#ifdef PLATFORM_WIN32
+ return FALSE;
+#else
+ if (!BindIoCompletionCallback (handle, overlapped_callback, 0)) {
+ gint error = GetLastError ();
+ MonoException *exc;
+ gchar *msg;
+
+ if (error == ERROR_INVALID_PARAMETER) {
+ exc = mono_get_exception_argument (NULL, "Invalid parameter.");
+ } else {
+ msg = g_strdup_printf ("Win32 error %d.", error);
+ exc = mono_exception_from_name_msg (mono_defaults.corlib,
+ "System",
+ "ApplicationException", msg);
+ g_free (msg);
+ }
+
+ mono_raise_exception (exc);
+ }
+
+ return TRUE;
+#endif
}
+