#include <config.h>
#include <glib.h>
-#include <mono/metadata/appdomain.h>
+#ifdef PLATFORM_WIN32
+#define WINVER 0x0500
+#define _WIN32_WINNT 0x0500
+#define THREADS_PER_CPU 25
+#else
+#define THREADS_PER_CPU 50
+#endif
+
+#include <mono/metadata/domain-internals.h>
#include <mono/metadata/tabledefs.h>
#include <mono/metadata/threads.h>
+#include <mono/metadata/threads-types.h>
#include <mono/metadata/exception.h>
#include <mono/metadata/file-io.h>
#include <mono/metadata/monitor.h>
#include "threadpool.h"
/* maximum number of worker threads */
-int mono_max_worker_threads = 25; /* fixme: should be 25 per available CPU */
+int mono_max_worker_threads = THREADS_PER_CPU;
+static int mono_min_worker_threads = 0;
+
/* current number of worker threads */
static int mono_worker_threads = 0;
/* current number of busy threads */
-int busy_worker_threads = 0;
+static int busy_worker_threads = 0;
+
+/* mono_thread_pool_init called */
+static int tp_inited;
/* we use this to store a reference to the AsyncResult to avoid GC */
static MonoGHashTable *ares_htable = NULL;
+static CRITICAL_SECTION ares_lock;
+
/* we append a job */
static HANDLE job_added;
}
/* notify listeners */
- mono_monitor_try_enter ((MonoObject *) ares, INFINITE);
+ mono_monitor_enter ((MonoObject *) ares);
+
if (ares->handle != NULL) {
ac->wait_event = ((MonoWaitHandle *) ares->handle)->handle;
SetEvent (ac->wait_event);
}
mono_monitor_exit ((MonoObject *) ares);
+ EnterCriticalSection (&ares_lock);
mono_g_hash_table_remove (ares_htable, ares);
+ LeaveCriticalSection (&ares_lock);
+}
+
+void
+mono_thread_pool_init ()
+{
+ SYSTEM_INFO info;
+ int threads_per_cpu = THREADS_PER_CPU;
+
+ if ((int) InterlockedCompareExchange (&tp_inited, 1, 0) == 1)
+ return;
+
+ MONO_GC_REGISTER_ROOT (ares_htable);
+ InitializeCriticalSection (&ares_lock);
+ ares_htable = mono_g_hash_table_new (NULL, NULL);
+ job_added = CreateSemaphore (NULL, 0, 0x7fffffff, NULL);
+ GetSystemInfo (&info);
+ if (getenv ("MONO_THREADS_PER_CPU") != NULL) {
+ threads_per_cpu = atoi (getenv ("MONO_THREADS_PER_CPU"));
+ if (threads_per_cpu <= 0)
+ threads_per_cpu = THREADS_PER_CPU;
+ }
+
+ mono_max_worker_threads = threads_per_cpu * info.dwNumberOfProcessors;
}
MonoAsyncResult *
ares = mono_async_result_new (domain, NULL, ac->state, ac);
ares->async_delegate = target;
- if (!ares_htable) {
- ares_htable = mono_g_hash_table_new (NULL, NULL);
- job_added = CreateSemaphore (NULL, 0, 0x7fffffff, NULL);
- }
-
+ EnterCriticalSection (&ares_lock);
mono_g_hash_table_insert (ares_htable, ares, ares);
-
+ LeaveCriticalSection (&ares_lock);
+
busy = (int) InterlockedCompareExchange (&busy_worker_threads, 0, -1);
worker = (int) InterlockedCompareExchange (&mono_worker_threads, 0, -1);
if (worker <= ++busy &&
*out_args = NULL;
/* check if already finished */
- mono_monitor_try_enter ((MonoObject *) ares, INFINITE);
+ mono_monitor_enter ((MonoObject *) ares);
+
if (ares->endinvoke_called) {
*exc = (MonoObject *)mono_exception_from_name (mono_defaults.corlib, "System",
"InvalidOperationException");
ares->handle = (MonoObject *) mono_wait_handle_new (mono_object_domain (ares), ac->wait_event);
}
mono_monitor_exit ((MonoObject *) ares);
- WaitForSingleObject (ac->wait_event, INFINITE);
+ WaitForSingleObjectEx (ac->wait_event, INFINITE, TRUE);
} else {
mono_monitor_exit ((MonoObject *) ares);
}
return ac->res;
}
+void
+mono_thread_pool_cleanup (void)
+{
+ gint release;
+
+ EnterCriticalSection (&mono_delegate_section);
+ g_list_free (async_call_queue);
+ async_call_queue = NULL;
+ release = (gint) InterlockedCompareExchange (&busy_worker_threads, 0, -1);
+ LeaveCriticalSection (&mono_delegate_section);
+ if (job_added)
+ ReleaseSemaphore (job_added, release, NULL);
+}
+
static void
append_job (MonoAsyncResult *ar)
{
+ GList *tmp;
+
EnterCriticalSection (&mono_delegate_section);
- async_call_queue = g_list_append (async_call_queue, ar);
+ if (async_call_queue == NULL) {
+ async_call_queue = g_list_append (async_call_queue, ar);
+ } else {
+ for (tmp = async_call_queue; tmp && tmp->data != NULL; tmp = tmp->next);
+ if (tmp == NULL) {
+ async_call_queue = g_list_append (async_call_queue, ar);
+ } else {
+ tmp->data = ar;
+ }
+ }
LeaveCriticalSection (&mono_delegate_section);
}
dequeue_job (void)
{
MonoAsyncResult *ar = NULL;
- GList *tmp = NULL;
+ GList *tmp, *tmp2;
EnterCriticalSection (&mono_delegate_section);
- if (async_call_queue) {
- ar = (MonoAsyncResult *)async_call_queue->data;
- tmp = async_call_queue;
- async_call_queue = g_list_remove_link (tmp, tmp);
+ tmp = async_call_queue;
+ if (tmp) {
+ ar = (MonoAsyncResult *) tmp->data;
+ tmp->data = NULL;
+ tmp2 = tmp;
+ for (tmp2 = tmp; tmp2->next != NULL; tmp2 = tmp2->next);
+ if (tmp2 != tmp) {
+ async_call_queue = tmp->next;
+ tmp->next = NULL;
+ tmp2->next = tmp;
+ tmp->prev = tmp2;
+ }
}
LeaveCriticalSection (&mono_delegate_section);
- if (tmp)
- g_list_free_1 (tmp);
return ar;
}
{
MonoDomain *domain;
MonoThread *thread;
+ int workers, min;
thread = mono_thread_current ();
thread->threadpool_thread = TRUE;
thread->state |= ThreadState_Background;
+
for (;;) {
MonoAsyncResult *ar;
/* worker threads invokes methods in different domains,
* so we need to set the right domain here */
domain = ((MonoObject *)ar)->vtable->domain;
- if (mono_domain_set (domain, FALSE))
+ if (mono_domain_set (domain, FALSE)) {
+ mono_thread_push_appdomain_ref (domain);
mono_async_invoke (ar);
+ mono_thread_pop_appdomain_ref ();
+ }
InterlockedDecrement (&busy_worker_threads);
}
data = dequeue_job ();
- if (!data && WaitForSingleObject (job_added, 500) != WAIT_TIMEOUT)
- data = dequeue_job ();
+
+ if (!data) {
+ guint32 wr;
+ int timeout = 500;
+ guint32 start_time = GetTickCount ();
+
+ do {
+ wr = WaitForSingleObjectEx (job_added, (guint32)timeout, TRUE);
+ if ((thread->state & ThreadState_StopRequested)!=0)
+ mono_thread_interruption_checkpoint ();
+
+ timeout -= GetTickCount () - start_time;
+
+ if (wr != WAIT_TIMEOUT)
+ data = dequeue_job ();
+ }
+ while (!data && timeout > 0);
+ }
+ if (!data) {
+ workers = (int) InterlockedCompareExchange (&mono_worker_threads, 0, -1);
+ min = (int) InterlockedCompareExchange (&mono_min_worker_threads, 0, -1);
+
+ while (!data && workers <= min) {
+ WaitForSingleObjectEx (job_added, INFINITE, TRUE);
+ if ((thread->state & ThreadState_StopRequested)!=0)
+ mono_thread_interruption_checkpoint ();
+
+ data = dequeue_job ();
+ workers = (int) InterlockedCompareExchange (&mono_worker_threads, 0, -1);
+ min = (int) InterlockedCompareExchange (&mono_min_worker_threads, 0, -1);
+ }
+ }
+
if (!data) {
InterlockedDecrement (&mono_worker_threads);
return;
}
+
InterlockedIncrement (&busy_worker_threads);
}
*completionPortThreads = 0;
}
+void
+ves_icall_System_Threading_ThreadPool_GetMinThreads (gint *workerThreads, gint *completionPortThreads)
+{
+ gint workers;
+
+ MONO_ARCH_SAVE_REGS;
+
+ workers = (gint) InterlockedCompareExchange (&mono_min_worker_threads, 0, -1);
+ *workerThreads = workers;
+ *completionPortThreads = 0;
+}
+
+MonoBoolean
+ves_icall_System_Threading_ThreadPool_SetMinThreads (gint workerThreads, gint completionPortThreads)
+{
+ MONO_ARCH_SAVE_REGS;
+
+ if (workerThreads < 0 || workerThreads > mono_max_worker_threads)
+ return FALSE;
+ InterlockedExchange (&mono_min_worker_threads, workerThreads);
+ /* FIXME: should actually start the idle threads if needed */
+ return TRUE;
+}
+
static void
overlapped_callback (guint32 error, guint32 numbytes, WapiOverlapped *overlapped)
{
MonoFSAsyncResult *ares;
MonoThread *thread;
- gpointer ftn;
MONO_ARCH_SAVE_REGS;
ares->count = numbytes;
thread = mono_thread_attach (mono_object_domain (ares));
- SetEvent (ares->wait_handle->handle);
if (ares->async_callback != NULL) {
gpointer p [1];
mono_runtime_invoke (ares->async_callback->method_info->method, NULL, p, NULL);
}
+ SetEvent (ares->wait_handle->handle);
mono_thread_detach (thread);
- CloseHandle (thread->handle);
g_free (overlapped);
}
{
MONO_ARCH_SAVE_REGS;
+#ifdef PLATFORM_WIN32
+ return FALSE;
+#else
if (!BindIoCompletionCallback (handle, overlapped_callback, 0)) {
gint error = GetLastError ();
MonoException *exc;
}
return TRUE;
+#endif
}