MonoObject *res;
MonoArray *out_args;
/* This is a HANDLE, we use guint64 so the managed object layout remains constant */
+ /* THIS FIELD IS NOT USED ANY MORE. Remove it when we feel like breaking corlib compatibility with 2.6 */
guint64 wait_event;
} ASyncCall;
static MonoClass *socket_async_call_klass;
static MonoClass *process_async_call_klass;
+/* Hooks */
+static MonoThreadPoolFunc tp_start_func;
+static MonoThreadPoolFunc tp_finish_func;
+static gpointer tp_hooks_user_data;
+static MonoThreadPoolItemFunc tp_item_begin_func;
+static MonoThreadPoolItemFunc tp_item_end_func;
+static gpointer tp_item_user_data;
+
#define INIT_POLLFD(a, b, c) {(a)->fd = b; (a)->events = c; (a)->revents = 0;}
enum {
AIO_OP_FIRST,
EnterCriticalSection (&data->io_lock);
data->inited = 0;
-#ifdef PLATFORM_WIN32
+#ifdef HOST_WIN32
closesocket (data->pipe [0]);
closesocket (data->pipe [1]);
#else
idle_data.wait_handle = CreateEvent (NULL, FALSE, FALSE, NULL);
thread = mono_thread_internal_current ();
+ if (tp_start_func)
+ tp_start_func (tp_hooks_user_data);
version = mono_get_runtime_info ()->framework_version;
for (;;) {
continue;
}
if (mono_domain_set (domain, FALSE)) {
- ASyncCall *ac;
+ /* ASyncCall *ac; */
+ if (tp_item_begin_func)
+ tp_item_begin_func (tp_item_user_data);
mono_async_invoke (ar);
- ac = (ASyncCall *) ar->object_data;
+ if (tp_item_end_func)
+ tp_item_end_func (tp_item_user_data);
/*
+ ac = (ASyncCall *) ar->object_data;
if (ac->msg->exc != NULL)
mono_unhandled_exception (ac->msg->exc);
*/
InterlockedDecrement (&async_io_tp.nthreads);
CloseHandle (idle_data.wait_handle);
idle_data.wait_handle = NULL;
+ if (tp_finish_func)
+ tp_finish_func (tp_hooks_user_data);
return;
}
for (; i < allocated; i++)
INIT_POLLFD (&pfds [i], -1, 0);
}
-#ifndef PLATFORM_WIN32
+#ifndef HOST_WIN32
nread = read (data->pipe [0], one, 1);
#else
nread = recv ((SOCKET) data->pipe [0], one, 1, 0);
}
}
-#ifdef PLATFORM_WIN32
+#ifdef HOST_WIN32
static void
connect_hack (gpointer x)
{
static void
socket_io_init (SocketIOData *data)
{
-#ifdef PLATFORM_WIN32
+#ifdef HOST_WIN32
struct sockaddr_in server;
struct sockaddr_in client;
SOCKET srv;
data->epoll_disabled = TRUE;
#endif
-#ifndef PLATFORM_WIN32
+#ifndef HOST_WIN32
if (data->epoll_disabled) {
if (pipe (data->pipe) != 0) {
int err = errno;
SocketIOData *data = &socket_io_data;
int w;
-#if defined(PLATFORM_MACOSX) || defined(PLATFORM_BSD) || defined(PLATFORM_WIN32) || defined(PLATFORM_SOLARIS)
+#if defined(PLATFORM_MACOSX) || defined(PLATFORM_BSD) || defined(HOST_WIN32) || defined(PLATFORM_SOLARIS)
/* select() for connect() does not work well on the Mac. Bug #75436. */
/* Bug #77637 for the BSD 6 case */
/* Bug #78888 for the Windows case */
mono_g_hash_table_replace (data->sock_to_state, GINT_TO_POINTER (state->handle), list);
LeaveCriticalSection (&data->io_lock);
*msg = (char) state->operation;
-#ifndef PLATFORM_WIN32
+#ifndef HOST_WIN32
w = write (data->pipe [1], msg, 1);
w = w;
#else
MonoArray *out_args = NULL;
HANDLE wait_event = NULL;
- ares->completed = 1;
-
if (ares->execution_context) {
/* use captured ExecutionContext (if available) */
MONO_OBJECT_SETREF (ares, original_context, mono_thread_get_execution_context ());
MONO_OBJECT_SETREF (ac, msg->exc, exc);
MONO_OBJECT_SETREF (ac, out_args, out_args);
+ mono_monitor_enter ((MonoObject *) ares);
+ ares->completed = 1;
+ if (ares->handle != NULL)
+ wait_event = mono_wait_handle_get_handle ((MonoWaitHandle *) ares->handle);
+ mono_monitor_exit ((MonoObject *) ares);
+ /* notify listeners */
+ if (wait_event != NULL)
+ SetEvent (wait_event);
+
/* call async callback if cb_method != null*/
if (ac->cb_method) {
MonoObject *exc = NULL;
ares->original_context = NULL;
}
- /* notify listeners */
- mono_monitor_enter ((MonoObject *) ares);
- if (ares->handle != NULL) {
- ac->wait_event = (gsize) mono_wait_handle_get_handle ((MonoWaitHandle *) ares->handle);
- wait_event = (HANDLE)(gsize) ac->wait_event;
- }
- mono_monitor_exit ((MonoObject *) ares);
- if (wait_event != NULL)
- SetEvent (wait_event);
}
static void
cpu_count = mono_cpu_count ();
n = 8 + 2 * cpu_count; /* 8 is minFreeThreads for ASP.NET */
threadpool_init (&async_tp, n, n + threads_per_cpu * cpu_count, async_invoke_thread);
-#ifndef DISABLE_SOCKET
+#ifndef DISABLE_SOCKETS
threadpool_init (&async_io_tp, 2 * cpu_count, 8 * cpu_count, async_invoke_io_thread);
#endif
mono_thread_pool_finish (MonoAsyncResult *ares, MonoArray **out_args, MonoObject **exc)
{
ASyncCall *ac;
+ HANDLE wait_event;
*exc = NULL;
*out_args = NULL;
}
ares->endinvoke_called = 1;
- ac = (ASyncCall *)ares->object_data;
-
- g_assert (ac != NULL);
-
/* wait until we are really finished */
if (!ares->completed) {
if (ares->handle == NULL) {
- ac->wait_event = (gsize)CreateEvent (NULL, TRUE, FALSE, NULL);
- g_assert(ac->wait_event != 0);
- MONO_OBJECT_SETREF (ares, handle, (MonoObject *) mono_wait_handle_new (mono_object_domain (ares), (gpointer)(gsize)ac->wait_event));
+ wait_event = CreateEvent (NULL, TRUE, FALSE, NULL);
+ g_assert(wait_event != 0);
+ MONO_OBJECT_SETREF (ares, handle, (MonoObject *) mono_wait_handle_new (mono_object_domain (ares), wait_event));
+ } else {
+ wait_event = mono_wait_handle_get_handle ((MonoWaitHandle *) ares->handle);
}
mono_monitor_exit ((MonoObject *) ares);
- WaitForSingleObjectEx ((gpointer)(gsize)ac->wait_event, INFINITE, TRUE);
+ WaitForSingleObjectEx (wait_event, INFINITE, TRUE);
} else {
mono_monitor_exit ((MonoObject *) ares);
}
+ ac = (ASyncCall *) ares->object_data;
+ g_assert (ac != NULL);
*exc = ac->msg->exc; /* FIXME: GC add write barrier */
*out_args = ac->out_args;
CRITICAL_SECTION *cs = &tp->lock;
EnterCriticalSection (cs);
+ if (tp->idle_threads == NULL) {
+ it->die = TRUE;
+ LeaveCriticalSection (cs);
+ return NULL; /* We are shutting down */
+ }
/*
if (mono_100ns_ticks () - tp->last_sample.timeStamp > 10000 * 1000) {
float elapsed_ticks;
cs = &tp->lock;
threadpool_jobs_inc (ar);
EnterCriticalSection (cs);
+ if (tp->idle_threads == NULL) {
+ LeaveCriticalSection (cs);
+ return; /* We are shutting down */
+ }
if (ar->vtable->domain->state == MONO_APPDOMAIN_UNLOADING ||
ar->vtable->domain->state == MONO_APPDOMAIN_UNLOADED) {
LeaveCriticalSection (cs);
tp->first_elem = tp->next_elem = 0;
}
+gboolean
+mono_thread_pool_is_queue_array (MonoArray *o)
+{
+ return o == async_tp.array || o == async_io_tp.array;
+}
+
static void
async_invoke_thread (gpointer data)
{
idle_data.wait_handle = CreateEvent (NULL, FALSE, FALSE, NULL);
thread = mono_thread_internal_current ();
+ if (tp_start_func)
+ tp_start_func (tp_hooks_user_data);
version = mono_get_runtime_info ()->framework_version;
for (;;) {
MonoAsyncResult *ar;
}
if (mono_domain_set (domain, FALSE)) {
- ASyncCall *ac;
+ /* ASyncCall *ac; */
+ if (tp_item_begin_func)
+ tp_item_begin_func (tp_item_user_data);
mono_async_invoke (ar);
- ac = (ASyncCall *) ar->object_data;
+ if (tp_item_end_func)
+ tp_item_end_func (tp_item_user_data);
/*
+ ac = (ASyncCall *) ar->object_data;
if (ac->msg->exc != NULL)
mono_unhandled_exception (ac->msg->exc);
*/
InterlockedDecrement (&async_tp.nthreads);
CloseHandle (idle_data.wait_handle);
idle_data.wait_handle = NULL;
+ if (tp_finish_func)
+ tp_finish_func (tp_hooks_user_data);
return;
}
MonoBoolean
ves_icall_System_Threading_ThreadPool_SetMinThreads (gint workerThreads, gint completionPortThreads)
{
+ int max_threads;
+ int max_io_threads;
+
MONO_ARCH_SAVE_REGS;
- if (workerThreads < 0 || workerThreads > async_tp.max_threads)
+ max_threads = InterlockedCompareExchange (&async_tp.max_threads, -1, -1);
+ if (workerThreads <= 0 || workerThreads > max_threads)
return FALSE;
- if (completionPortThreads < 0 || completionPortThreads > async_io_tp.max_threads)
+ max_io_threads = InterlockedCompareExchange (&async_io_tp.max_threads, -1, -1);
+ if (completionPortThreads <= 0 || completionPortThreads > max_io_threads)
return FALSE;
InterlockedExchange (&async_tp.min_threads, workerThreads);
MonoBoolean
ves_icall_System_Threading_ThreadPool_SetMaxThreads (gint workerThreads, gint completionPortThreads)
{
+ int min_threads;
+ int min_io_threads;
+ int cpu_count;
+
MONO_ARCH_SAVE_REGS;
- if (workerThreads < async_tp.max_threads)
+ cpu_count = mono_cpu_count ();
+ min_threads = InterlockedCompareExchange (&async_tp.min_threads, -1, -1);
+ if (workerThreads < min_threads || workerThreads < cpu_count)
return FALSE;
/* We don't really have the concept of completion ports. Do we care here? */
- if (completionPortThreads < async_io_tp.max_threads)
+ min_io_threads = InterlockedCompareExchange (&async_io_tp.min_threads, -1, -1);
+ if (completionPortThreads < min_io_threads || completionPortThreads < cpu_count)
return FALSE;
InterlockedExchange (&async_tp.max_threads, workerThreads);
return TRUE;
}
+/**
+ * mono_install_threadpool_thread_hooks
+ * @start_func: the function to be called right after a new threadpool thread is created. Can be NULL.
+ * @finish_func: the function to be called right before a thredpool thread is exiting. Can be NULL.
+ * @user_data: argument passed to @start_func and @finish_func.
+ *
+ * @start_fun will be called right after a threadpool thread is created and @finish_func right before a threadpool thread exits.
+ * The calls will be made from the thread itself.
+ */
+void
+mono_install_threadpool_thread_hooks (MonoThreadPoolFunc start_func, MonoThreadPoolFunc finish_func, gpointer user_data)
+{
+ tp_start_func = start_func;
+ tp_finish_func = finish_func;
+ tp_hooks_user_data = user_data;
+}
+
+/**
+ * mono_install_threadpool_item_hooks
+ * @begin_func: the function to be called before a threadpool work item processing starts.
+ * @end_func: the function to be called after a threadpool work item is finished.
+ * @user_data: argument passed to @begin_func and @end_func.
+ *
+ * The calls will be made from the thread itself and from the same AppDomain
+ * where the work item was executed.
+ *
+ */
+void
+mono_install_threadpool_item_hooks (MonoThreadPoolItemFunc begin_func, MonoThreadPoolItemFunc end_func, gpointer user_data)
+{
+ tp_item_begin_func = begin_func;
+ tp_item_end_func = end_func;
+ tp_item_user_data = user_data;
+}
+
+