X-Git-Url: http://wien.tomnetworks.com/gitweb/?a=blobdiff_plain;f=mono%2Fmetadata%2Fthreadpool.c;h=ac51b03d974c7b494814afe71e1565120d90af49;hb=a5cce3bf094e4f231bab2b7aa4db95c46efbc5eb;hp=33506f5bbb86f9e531a0ca122275349a9d6aa505;hpb=610eb25e126918f4361e4eaa805eaac7b625cde1;p=mono.git diff --git a/mono/metadata/threadpool.c b/mono/metadata/threadpool.c index 33506f5bbb8..ac51b03d974 100644 --- a/mono/metadata/threadpool.c +++ b/mono/metadata/threadpool.c @@ -5,15 +5,16 @@ * Dietmar Maurer (dietmar@ximian.com) * Gonzalo Paniagua Javier (gonzalo@ximian.com) * - * (C) 2001-2003 Ximian, Inc. - * (c) 2004,2005 Novell, Inc. (http://www.novell.com) + * Copyright 2001-2003 Ximian, Inc (http://www.ximian.com) + * Copyright 2004-2009 Novell, Inc (http://www.novell.com) */ #include #include -#define THREADS_PER_CPU 10 /* 20 + THREADS_PER_CPU * number of CPUs */ +#define THREADS_PER_CPU 10 /* 8 + THREADS_PER_CPU * number of CPUs = max threads */ #define THREAD_EXIT_TIMEOUT 1000 +#define INITIAL_QUEUE_LENGTH 128 #include #include @@ -25,6 +26,7 @@ #include #include #include +#include #include #include #include @@ -48,7 +50,9 @@ #include #endif +#ifndef DISABLE_SOCKETS #include "mono/io-layer/socket-wrappers.h" +#endif #include "threadpool.h" @@ -56,32 +60,22 @@ ThreadState_SuspendRequested)) != 0) #undef EPOLL_DEBUG - -/* maximum number of worker threads */ -static int mono_max_worker_threads; -static int mono_min_worker_threads; -static int mono_io_max_worker_threads; -static int mono_io_min_worker_threads; - -/* current number of worker threads */ -static int mono_worker_threads = 0; -static int io_worker_threads = 0; - -/* current number of busy threads */ -static int busy_worker_threads = 0; -static int busy_io_worker_threads; +// +/* map of CounterSample.cs */ +struct _MonoCounterSample { + gint64 rawValue; + gint64 baseValue; + gint64 counterFrequency; + gint64 systemFrequency; + gint64 timeStamp; + gint64 timeStamp100nSec; + gint64 counterTimeStamp; + int counterType; +}; /* mono_thread_pool_init called */ static int tp_inited; -/* started idle threads */ -static int tp_idle_started; - - -/* we use this to store a reference to the AsyncResult to avoid GC */ -static MonoGHashTable *ares_htable = NULL; -static CRITICAL_SECTION ares_lock; -static CRITICAL_SECTION io_queue_lock; static int pending_io_items; typedef struct { @@ -100,10 +94,6 @@ typedef struct { static SocketIOData socket_io_data; -/* we append a job */ -static HANDLE job_added; -static HANDLE io_job_added; - /* Keep in sync with the System.MonoAsyncCall class which provides GC tracking */ typedef struct { MonoObject object; @@ -114,30 +104,64 @@ typedef struct { MonoObject *res; MonoArray *out_args; /* This is a HANDLE, we use guint64 so the managed object layout remains constant */ + /* THIS FIELD IS NOT USED ANY MORE. Remove it when we feel like breaking corlib compatibility with 2.6 */ guint64 wait_event; } ASyncCall; typedef struct { + CRITICAL_SECTION lock; MonoArray *array; int first_elem; int next_elem; -} TPQueue; + /**/ + GQueue *idle_threads; + int idle_started; /* Have we started the idle threads? Interlocked */ + /* min, max, n and busy -> Interlocked */ + int min_threads; + int max_threads; + int nthreads; + int busy_threads; + + void (*async_invoke) (gpointer data); + void *pc_nitems; /* Performance counter for total number of items in added */ + /* We don't need the rate here since we can compute the different ourselves */ + /* void *perfc_rate; */ + MonoCounterSample last_sample; + +} ThreadPool; + +static ThreadPool async_tp; +static ThreadPool async_io_tp; + +typedef struct { + HANDLE wait_handle; + gpointer data; + gint timeout; + gboolean die; +} IdleThreadData; + static void async_invoke_thread (gpointer data); -static void append_job (CRITICAL_SECTION *cs, TPQueue *list, MonoObject *ar); -static void start_thread_or_queue (MonoAsyncResult *ares); -static void start_tpthread (MonoAsyncResult *ares); static void mono_async_invoke (MonoAsyncResult *ares); -static MonoObject* dequeue_job (CRITICAL_SECTION *cs, TPQueue *list); -static void free_queue (TPQueue *list); - -static TPQueue async_call_queue = {NULL, 0, 0}; -static TPQueue async_io_queue = {NULL, 0, 0}; +static void threadpool_free_queue (ThreadPool *tp); +static void threadpool_append_job (ThreadPool *tp, MonoObject *ar); +static void *threadpool_queue_idle_thread (ThreadPool *tp, IdleThreadData *it); +static void threadpool_init (ThreadPool *tp, int min_threads, int max_threads, void (*async_invoke) (gpointer)); +static void threadpool_start_idle_threads (ThreadPool *tp); +static void threadpool_kill_idle_threads (ThreadPool *tp); static MonoClass *async_call_klass; static MonoClass *socket_async_call_klass; static MonoClass *process_async_call_klass; +/* Hooks */ +static MonoThreadPoolFunc tp_start_func; +static MonoThreadPoolFunc tp_finish_func; +static gpointer tp_hooks_user_data; +static MonoThreadPoolItemFunc tp_item_begin_func; +static MonoThreadPoolItemFunc tp_item_end_func; +static gpointer tp_item_user_data; + #define INIT_POLLFD(a, b, c) {(a)->fd = b; (a)->events = c; (a)->revents = 0;} enum { AIO_OP_FIRST, @@ -153,17 +177,18 @@ enum { AIO_OP_LAST }; +#ifdef DISABLE_SOCKETS +#define socket_io_cleanup(x) +#else static void socket_io_cleanup (SocketIOData *data) { - gint release; - if (data->inited == 0) return; EnterCriticalSection (&data->io_lock); data->inited = 0; -#ifdef PLATFORM_WIN32 +#ifdef HOST_WIN32 closesocket (data->pipe [0]); closesocket (data->pipe [1]); #else @@ -177,10 +202,10 @@ socket_io_cleanup (SocketIOData *data) data->new_sem = NULL; mono_g_hash_table_destroy (data->sock_to_state); data->sock_to_state = NULL; - free_queue (&async_io_queue); - release = (gint) InterlockedCompareExchange (&io_worker_threads, 0, -1); - if (io_job_added) - ReleaseSemaphore (io_job_added, release, NULL); + EnterCriticalSection (&async_io_tp.lock); + threadpool_free_queue (&async_io_tp); + threadpool_kill_idle_threads (&async_io_tp); + LeaveCriticalSection (&async_io_tp.lock); g_free (data->newpfd); data->newpfd = NULL; #ifdef HAVE_EPOLL @@ -233,14 +258,7 @@ get_events_from_list (MonoMList *list) (SOCKET)(gssize)x->handle, x->buffer, x->offset, x->size,\ x->socket_flags, &x->error); - -static void -unregister_job (MonoAsyncResult *obj) -{ - EnterCriticalSection (&ares_lock); - mono_g_hash_table_remove (ares_htable, obj); - LeaveCriticalSection (&ares_lock); -} +#endif /* !DISABLE_SOCKETS */ static void threadpool_jobs_inc (MonoObject *obj) @@ -261,14 +279,21 @@ threadpool_jobs_dec (MonoObject *obj) return FALSE; } +#ifndef DISABLE_SOCKETS static void async_invoke_io_thread (gpointer data) { MonoDomain *domain; - MonoThread *thread; + MonoInternalThread *thread; const gchar *version; + IdleThreadData idle_data = {0}; + + idle_data.timeout = INFINITE; + idle_data.wait_handle = CreateEvent (NULL, FALSE, FALSE, NULL); - thread = mono_thread_current (); + thread = mono_thread_internal_current (); + if (tp_start_func) + tp_start_func (tp_hooks_user_data); version = mono_get_runtime_info ()->framework_version; for (;;) { @@ -296,91 +321,66 @@ async_invoke_io_thread (gpointer data) if (domain->state == MONO_APPDOMAIN_UNLOADED || domain->state == MONO_APPDOMAIN_UNLOADING) { threadpool_jobs_dec ((MonoObject *)ar); - unregister_job (ar); data = NULL; } else { mono_thread_push_appdomain_ref (domain); if (threadpool_jobs_dec ((MonoObject *)ar)) { - unregister_job (ar); data = NULL; mono_thread_pop_appdomain_ref (); continue; } if (mono_domain_set (domain, FALSE)) { - ASyncCall *ac; + /* ASyncCall *ac; */ + if (tp_item_begin_func) + tp_item_begin_func (tp_item_user_data); mono_async_invoke (ar); - ac = (ASyncCall *) ar->object_data; + if (tp_item_end_func) + tp_item_end_func (tp_item_user_data); /* + ac = (ASyncCall *) ar->object_data; if (ac->msg->exc != NULL) mono_unhandled_exception (ac->msg->exc); */ mono_domain_set (mono_get_root_domain (), TRUE); } mono_thread_pop_appdomain_ref (); - InterlockedDecrement (&busy_io_worker_threads); + InterlockedDecrement (&async_io_tp.busy_threads); /* If the callee changes the background status, set it back to TRUE */ if (*version != '1' && !mono_thread_test_state (thread , ThreadState_Background)) ves_icall_System_Threading_Thread_SetState (thread, ThreadState_Background); } } - data = dequeue_job (&io_queue_lock, &async_io_queue); - - if (!data) { + data = threadpool_queue_idle_thread (&async_io_tp, &idle_data); + while (!idle_data.die && !data) { guint32 wr; - int timeout = THREAD_EXIT_TIMEOUT; - guint32 start_time = mono_msec_ticks (); - - do { - wr = WaitForSingleObjectEx (io_job_added, (guint32)timeout, TRUE); - if (THREAD_WANTS_A_BREAK (thread)) - mono_thread_interruption_checkpoint (); - - timeout -= mono_msec_ticks () - start_time; - - if (wr != WAIT_TIMEOUT && wr != WAIT_IO_COMPLETION) - data = dequeue_job (&io_queue_lock, &async_io_queue); + wr = WaitForSingleObjectEx (idle_data.wait_handle, idle_data.timeout, TRUE); + if (THREAD_WANTS_A_BREAK (thread)) + mono_thread_interruption_checkpoint (); + + if (wr != WAIT_TIMEOUT && wr != WAIT_IO_COMPLETION) { + data = idle_data.data; + idle_data.data = NULL; + break; /* We have to exit */ } - while (!data && timeout > 0); } if (!data) { - if (InterlockedDecrement (&io_worker_threads) < 2) { - /* If we have pending items, keep the thread alive */ - if (InterlockedCompareExchange (&pending_io_items, 0, 0) != 0) { - InterlockedIncrement (&io_worker_threads); - continue; - } - } + InterlockedDecrement (&async_io_tp.nthreads); + CloseHandle (idle_data.wait_handle); + idle_data.wait_handle = NULL; + if (tp_finish_func) + tp_finish_func (tp_hooks_user_data); return; } - InterlockedIncrement (&busy_io_worker_threads); + InterlockedIncrement (&async_io_tp.busy_threads); } g_assert_not_reached (); } -static void -start_io_thread_or_queue (MonoSocketAsyncResult *ares) -{ - int busy, worker; - - busy = (int) InterlockedCompareExchange (&busy_io_worker_threads, 0, -1); - worker = (int) InterlockedCompareExchange (&io_worker_threads, 0, -1); - if (worker <= ++busy && - worker < mono_io_max_worker_threads) { - InterlockedIncrement (&busy_io_worker_threads); - InterlockedIncrement (&io_worker_threads); - threadpool_jobs_inc ((MonoObject *)ares); - mono_thread_create_internal (mono_get_root_domain (), async_invoke_io_thread, ares, TRUE); - } else { - append_job (&io_queue_lock, &async_io_queue, (MonoObject*)ares); - ReleaseSemaphore (io_job_added, 1, NULL); - } -} - static MonoMList * process_io_event (MonoMList *list, int event) { @@ -400,10 +400,10 @@ process_io_event (MonoMList *list, int event) if (list != NULL) { oldlist = mono_mlist_remove_item (oldlist, list); #ifdef EPOLL_DEBUG - g_print ("Dispatching event %d on socket %d\n", event, state->handle); + g_print ("Dispatching event %d on socket %p\n", event, state->handle); #endif InterlockedIncrement (&pending_io_items); - start_io_thread_or_queue (state); + threadpool_append_job (&async_io_tp, (MonoObject *) state); } return oldlist; @@ -443,9 +443,9 @@ socket_io_poll_main (gpointer p) gint maxfd = 1; gint allocated; gint i; - MonoThread *thread; + MonoInternalThread *thread; - thread = mono_thread_current (); + thread = mono_thread_internal_current (); allocated = INITIAL_POLLFD_SIZE; pfds = g_new0 (mono_pollfd, allocated); @@ -515,7 +515,7 @@ socket_io_poll_main (gpointer p) for (; i < allocated; i++) INIT_POLLFD (&pfds [i], -1, 0); } -#ifndef PLATFORM_WIN32 +#ifndef HOST_WIN32 nread = read (data->pipe [0], one, 1); #else nread = recv ((SOCKET) data->pipe [0], one, 1, 0); @@ -578,14 +578,14 @@ socket_io_epoll_main (gpointer p) { SocketIOData *data; int epollfd; - MonoThread *thread; + MonoInternalThread *thread; struct epoll_event *events, *evt; const int nevents = 512; int ready = 0, i; data = p; epollfd = data->epollfd; - thread = mono_thread_current (); + thread = mono_thread_internal_current (); events = g_new0 (struct epoll_event, nevents); while (1) { @@ -709,7 +709,7 @@ mono_thread_pool_remove_socket (int sock) } } -#ifdef PLATFORM_WIN32 +#ifdef HOST_WIN32 static void connect_hack (gpointer x) { @@ -729,7 +729,7 @@ connect_hack (gpointer x) static void socket_io_init (SocketIOData *data) { -#ifdef PLATFORM_WIN32 +#ifdef HOST_WIN32 struct sockaddr_in server; struct sockaddr_in client; SOCKET srv; @@ -762,7 +762,7 @@ socket_io_init (SocketIOData *data) data->epoll_disabled = TRUE; #endif -#ifndef PLATFORM_WIN32 +#ifndef HOST_WIN32 if (data->epoll_disabled) { if (pipe (data->pipe) != 0) { int err = errno; @@ -796,18 +796,13 @@ socket_io_init (SocketIOData *data) g_assert (data->pipe [0] != INVALID_SOCKET); closesocket (srv); #endif - mono_io_max_worker_threads = mono_max_worker_threads / 2; - if (mono_io_max_worker_threads < 10) - mono_io_max_worker_threads = 10; - data->sock_to_state = mono_g_hash_table_new_type (g_direct_hash, g_direct_equal, MONO_HASH_VALUE_GC); + mono_thread_create_internal (mono_get_root_domain (), threadpool_start_idle_threads, &async_io_tp, TRUE); if (data->epoll_disabled) { data->new_sem = CreateSemaphore (NULL, 1, 1, NULL); g_assert (data->new_sem != NULL); } - io_job_added = CreateSemaphore (NULL, 0, 0x7fffffff, NULL); - g_assert (io_job_added != NULL); if (data->epoll_disabled) { mono_thread_create_internal (mono_get_root_domain (), socket_io_poll_main, data, TRUE); } @@ -827,13 +822,14 @@ socket_io_add_poll (MonoSocketAsyncResult *state) char msg [1]; MonoMList *list; SocketIOData *data = &socket_io_data; + int w; -#if defined(PLATFORM_MACOSX) || defined(PLATFORM_BSD6) || defined(PLATFORM_WIN32) || defined(PLATFORM_SOLARIS) +#if defined(PLATFORM_MACOSX) || defined(PLATFORM_BSD) || defined(HOST_WIN32) || defined(PLATFORM_SOLARIS) /* select() for connect() does not work well on the Mac. Bug #75436. */ /* Bug #77637 for the BSD 6 case */ /* Bug #78888 for the Windows case */ if (state->operation == AIO_OP_CONNECT && state->blocking == TRUE) { - start_io_thread_or_queue (state); + threadpool_append_job (&async_io_tp, (MonoObject *) state); return; } #endif @@ -855,8 +851,9 @@ socket_io_add_poll (MonoSocketAsyncResult *state) mono_g_hash_table_replace (data->sock_to_state, GINT_TO_POINTER (state->handle), list); LeaveCriticalSection (&data->io_lock); *msg = (char) state->operation; -#ifndef PLATFORM_WIN32 - write (data->pipe [1], msg, 1); +#ifndef HOST_WIN32 + w = write (data->pipe [1], msg, 1); + w = w; #else send ((SOCKET) data->pipe [1], msg, 1, 0); #endif @@ -967,20 +964,20 @@ socket_io_filter (MonoObject *target, MonoObject *state) return TRUE; } +#endif /* !DISABLE_SOCKETS */ static void mono_async_invoke (MonoAsyncResult *ares) { ASyncCall *ac = (ASyncCall *)ares->object_data; - MonoThread *thread = NULL; MonoObject *res, *exc = NULL; MonoArray *out_args = NULL; + HANDLE wait_event = NULL; if (ares->execution_context) { /* use captured ExecutionContext (if available) */ - thread = mono_thread_current (); - MONO_OBJECT_SETREF (ares, original_context, thread->execution_context); - MONO_OBJECT_SETREF (thread, execution_context, ares->execution_context); + MONO_OBJECT_SETREF (ares, original_context, mono_thread_get_execution_context ()); + mono_thread_set_execution_context (ares->execution_context); } else { ares->original_context = NULL; } @@ -991,7 +988,14 @@ mono_async_invoke (MonoAsyncResult *ares) MONO_OBJECT_SETREF (ac, msg->exc, exc); MONO_OBJECT_SETREF (ac, out_args, out_args); + mono_monitor_enter ((MonoObject *) ares); ares->completed = 1; + if (ares->handle != NULL) + wait_event = mono_wait_handle_get_handle ((MonoWaitHandle *) ares->handle); + mono_monitor_exit ((MonoObject *) ares); + /* notify listeners */ + if (wait_event != NULL) + SetEvent (wait_event); /* call async callback if cb_method != null*/ if (ac->cb_method) { @@ -1006,55 +1010,57 @@ mono_async_invoke (MonoAsyncResult *ares) /* restore original thread execution context if flow isn't suppressed, i.e. non null */ if (ares->original_context) { - MONO_OBJECT_SETREF (thread, execution_context, ares->original_context); + mono_thread_set_execution_context (ares->original_context); ares->original_context = NULL; } - /* notify listeners */ - mono_monitor_enter ((MonoObject *) ares); - if (ares->handle != NULL) { - ac->wait_event = (gsize) mono_wait_handle_get_handle ((MonoWaitHandle *) ares->handle); - SetEvent ((gpointer)(gsize)ac->wait_event); - } - mono_monitor_exit ((MonoObject *) ares); - - EnterCriticalSection (&ares_lock); - mono_g_hash_table_remove (ares_htable, ares); - LeaveCriticalSection (&ares_lock); } static void -start_idle_threads (MonoAsyncResult *data) +threadpool_start_idle_threads (ThreadPool *tp) { int needed; int existing; - needed = (int) InterlockedCompareExchange (&mono_min_worker_threads, 0, -1); + needed = (int) InterlockedCompareExchange (&tp->min_threads, 0, -1); do { - existing = (int) InterlockedCompareExchange (&mono_worker_threads, 0, -1); - if ((needed - existing) > 0) { - start_tpthread (data); - if (data) - threadpool_jobs_dec ((MonoObject*)data); - data = NULL; - Sleep (500); - } - } while ((needed - existing) > 0); - - /* If we don't start any thread here, make sure 'data' is processed. */ - if (data != NULL) { - start_thread_or_queue (data); - threadpool_jobs_dec ((MonoObject*)data); - } + existing = (int) InterlockedCompareExchange (&tp->nthreads, 0, -1); + if (existing >= needed) + break; + InterlockedIncrement (&tp->nthreads); + mono_thread_create_internal (mono_get_root_domain (), tp->async_invoke, NULL, TRUE); + SleepEx (250, TRUE); + } while (1); } static void -start_tpthread (MonoAsyncResult *data) +threadpool_init (ThreadPool *tp, int min_threads, int max_threads, void (*async_invoke) (gpointer)) { - InterlockedIncrement (&mono_worker_threads); - InterlockedIncrement (&busy_worker_threads); - threadpool_jobs_inc ((MonoObject *)data); - mono_thread_create_internal (mono_get_root_domain (), async_invoke_thread, data, TRUE); + memset (tp, 0, sizeof (ThreadPool)); + InitializeCriticalSection (&tp->lock); + tp->min_threads = min_threads; + tp->max_threads = max_threads; + tp->async_invoke = async_invoke; + tp->idle_threads = g_queue_new (); +} + +static void * +init_perf_counter (const char *category, const char *counter) +{ + MonoString *category_str; + MonoString *counter_str; + MonoString *machine; + MonoDomain *root; + MonoBoolean custom; + int type; + + if (category == NULL || counter == NULL) + return NULL; + root = mono_get_root_domain (); + category_str = mono_string_new (root, category); + counter_str = mono_string_new (root, counter); + machine = mono_string_new (root, "."); + return mono_perfcounter_get_impl (category_str, counter_str, NULL, machine, &type, &custom); } void @@ -1062,30 +1068,36 @@ mono_thread_pool_init () { int threads_per_cpu = THREADS_PER_CPU; int cpu_count; + int n; if ((int) InterlockedCompareExchange (&tp_inited, 1, 0) == 1) return; - MONO_GC_REGISTER_ROOT (ares_htable); MONO_GC_REGISTER_ROOT (socket_io_data.sock_to_state); InitializeCriticalSection (&socket_io_data.io_lock); - InitializeCriticalSection (&ares_lock); - InitializeCriticalSection (&io_queue_lock); - ares_htable = mono_g_hash_table_new_type (NULL, NULL, MONO_HASH_KEY_VALUE_GC); - job_added = CreateSemaphore (NULL, 0, 0x7fffffff, NULL); - g_assert (job_added != NULL); if (g_getenv ("MONO_THREADS_PER_CPU") != NULL) { threads_per_cpu = atoi (g_getenv ("MONO_THREADS_PER_CPU")); - if (threads_per_cpu <= 0) + if (threads_per_cpu < THREADS_PER_CPU) threads_per_cpu = THREADS_PER_CPU; } cpu_count = mono_cpu_count (); - mono_max_worker_threads = 20 + threads_per_cpu * cpu_count; - mono_min_worker_threads = cpu_count; /* 1 idle thread per cpu */ + n = 8 + 2 * cpu_count; /* 8 is minFreeThreads for ASP.NET */ + threadpool_init (&async_tp, n, n + threads_per_cpu * cpu_count, async_invoke_thread); +#ifndef DISABLE_SOCKETS + threadpool_init (&async_io_tp, 2 * cpu_count, 8 * cpu_count, async_invoke_io_thread); +#endif async_call_klass = mono_class_from_name (mono_defaults.corlib, "System", "MonoAsyncCall"); g_assert (async_call_klass); + + async_tp.pc_nitems = init_perf_counter ("Mono Threadpool", "Work Items Added"); + g_assert (async_tp.pc_nitems); + mono_perfcounter_get_sample (async_tp.pc_nitems, FALSE, &async_tp.last_sample); + + async_io_tp.pc_nitems = init_perf_counter ("Mono Threadpool", "IO Work Items Added"); + g_assert (async_io_tp.pc_nitems); + mono_perfcounter_get_sample (async_io_tp.pc_nitems, FALSE, &async_io_tp.last_sample); } MonoAsyncResult * @@ -1108,45 +1120,24 @@ mono_thread_pool_add (MonoObject *target, MonoMethodMessage *msg, MonoDelegate * ares = mono_async_result_new (domain, NULL, ac->state, NULL, (MonoObject*)ac); MONO_OBJECT_SETREF (ares, async_delegate, target); - EnterCriticalSection (&ares_lock); - mono_g_hash_table_insert (ares_htable, ares, ares); - LeaveCriticalSection (&ares_lock); - +#ifndef DISABLE_SOCKETS if (socket_io_filter (target, state)) { socket_io_add (ares, (MonoSocketAsyncResult *) state); return ares; } - - start_thread_or_queue (ares); +#endif + if (InterlockedCompareExchange (&async_tp.idle_started, 1, 0) == 0) + mono_thread_create_internal (mono_get_root_domain (), threadpool_start_idle_threads, &async_tp, TRUE); + + threadpool_append_job (&async_tp, (MonoObject *) ares); return ares; } -static void -start_thread_or_queue (MonoAsyncResult *ares) -{ - int busy, worker; - - if ((int) InterlockedCompareExchange (&tp_idle_started, 1, 0) == 0) { - threadpool_jobs_inc ((MonoObject*)ares); - mono_thread_create_internal (mono_get_root_domain (), start_idle_threads, ares, TRUE); - return; - } - - busy = (int) InterlockedCompareExchange (&busy_worker_threads, 0, -1); - worker = (int) InterlockedCompareExchange (&mono_worker_threads, 0, -1); - if (worker <= ++busy && - worker < mono_max_worker_threads) { - start_tpthread (ares); - } else { - append_job (&mono_delegate_section, &async_call_queue, (MonoObject*)ares); - ReleaseSemaphore (job_added, 1, NULL); - } -} - MonoObject * mono_thread_pool_finish (MonoAsyncResult *ares, MonoArray **out_args, MonoObject **exc) { ASyncCall *ac; + HANDLE wait_event; *exc = NULL; *out_args = NULL; @@ -1162,90 +1153,262 @@ mono_thread_pool_finish (MonoAsyncResult *ares, MonoArray **out_args, MonoObject } ares->endinvoke_called = 1; - ac = (ASyncCall *)ares->object_data; - - g_assert (ac != NULL); - /* wait until we are really finished */ if (!ares->completed) { if (ares->handle == NULL) { - ac->wait_event = (gsize)CreateEvent (NULL, TRUE, FALSE, NULL); - g_assert(ac->wait_event != 0); - MONO_OBJECT_SETREF (ares, handle, (MonoObject *) mono_wait_handle_new (mono_object_domain (ares), (gpointer)(gsize)ac->wait_event)); + wait_event = CreateEvent (NULL, TRUE, FALSE, NULL); + g_assert(wait_event != 0); + MONO_OBJECT_SETREF (ares, handle, (MonoObject *) mono_wait_handle_new (mono_object_domain (ares), wait_event)); + } else { + wait_event = mono_wait_handle_get_handle ((MonoWaitHandle *) ares->handle); } mono_monitor_exit ((MonoObject *) ares); - WaitForSingleObjectEx ((gpointer)(gsize)ac->wait_event, INFINITE, TRUE); + WaitForSingleObjectEx (wait_event, INFINITE, TRUE); } else { mono_monitor_exit ((MonoObject *) ares); } + ac = (ASyncCall *) ares->object_data; + g_assert (ac != NULL); *exc = ac->msg->exc; /* FIXME: GC add write barrier */ *out_args = ac->out_args; return ac->res; } +static void +threadpool_kill_idle_threads (ThreadPool *tp) +{ + IdleThreadData *it; + + if (!tp || !tp->idle_threads) + return; + + while ((it = g_queue_pop_head (tp->idle_threads)) != NULL) { + it->data = NULL; + it->die = TRUE; + SetEvent (it->wait_handle); + } + g_queue_free (tp->idle_threads); + tp->idle_threads = NULL; +} + void mono_thread_pool_cleanup (void) { - gint release; + EnterCriticalSection (&async_tp.lock); + threadpool_free_queue (&async_tp); + threadpool_kill_idle_threads (&async_tp); + LeaveCriticalSection (&async_tp.lock); + socket_io_cleanup (&socket_io_data); /* Empty when DISABLE_SOCKETS is defined */ + /* Do we want/need these? + DeleteCriticalSection (&async_tp.lock); + DeleteCriticalSection (&async_tp.table_lock); + DeleteCriticalSection (&socket_io_data.io_lock); + */ +} + +static void +null_array (MonoArray *a, int first, int last) +{ + /* We must null the old array because it might + contain cross-appdomain references, which + will crash the GC when the domains are + unloaded. */ + memset (mono_array_addr (a, MonoObject*, first), 0, sizeof (MonoObject*) * (last - first)); +} + +/* Caller must enter &tp->lock */ +static MonoObject* +dequeue_job_nolock (ThreadPool *tp) +{ + MonoObject *ar; + int count; + + if (!tp->array || tp->first_elem == tp->next_elem) + return NULL; + ar = mono_array_get (tp->array, MonoObject*, tp->first_elem); + mono_array_set (tp->array, MonoObject*, tp->first_elem, NULL); + tp->first_elem++; + count = tp->next_elem - tp->first_elem; + /* reduce the size of the array if it's mostly empty */ + if (mono_array_length (tp->array) > INITIAL_QUEUE_LENGTH && count < (mono_array_length (tp->array) / 3)) { + MonoArray *newa = mono_array_new_cached (mono_get_root_domain (), mono_defaults.object_class, mono_array_length (tp->array) / 2); + mono_array_memcpy_refs (newa, 0, tp->array, tp->first_elem, count); + null_array (tp->array, tp->first_elem, tp->next_elem); + tp->array = newa; + tp->first_elem = 0; + tp->next_elem = count; + } + return ar; +} + +/* Call after entering &tp->lock */ +static int +signal_idle_threads (ThreadPool *tp) +{ + IdleThreadData *it; + int result = 0; + int njobs; + + njobs = tp->next_elem - tp->first_elem; + while (njobs > 0 && (it = g_queue_pop_head (tp->idle_threads)) != NULL) { + it->data = dequeue_job_nolock (tp); + if (it->data == NULL) + break; /* Should never happen */ + result++; + njobs--; + it->timeout = INFINITE; + SetEvent (it->wait_handle); + } + return njobs; +} + +/* Call after entering &tp->lock */ +static gboolean +threadpool_start_thread (ThreadPool *tp, gpointer arg) +{ + gint max; + gint n; + + max = (gint) InterlockedCompareExchange (&tp->max_threads, 0, -1); + n = (gint) InterlockedCompareExchange (&tp->nthreads, 0, -1); + if (max <= n) + return FALSE; + InterlockedIncrement (&tp->nthreads); + mono_thread_create_internal (mono_get_root_domain (), tp->async_invoke, arg, TRUE); + return TRUE; +} - EnterCriticalSection (&mono_delegate_section); - free_queue (&async_call_queue); - release = (gint) InterlockedCompareExchange (&mono_worker_threads, 0, -1); - LeaveCriticalSection (&mono_delegate_section); - if (job_added) - ReleaseSemaphore (job_added, release, NULL); +/* +static const char * +get_queue_name (ThreadPool *tp) +{ + if (tp == &async_tp) + return "TP"; + if (tp == &async_io_tp) + return "IO"; + return "(Unknown)"; +} +*/ + +static gpointer +threadpool_queue_idle_thread (ThreadPool *tp, IdleThreadData *it) +{ + /* + MonoCounterSample sample; + float rate; + */ + gpointer result = NULL; + CRITICAL_SECTION *cs = &tp->lock; + + EnterCriticalSection (cs); + if (tp->idle_threads == NULL) { + it->die = TRUE; + LeaveCriticalSection (cs); + return NULL; /* We are shutting down */ + } + /* + if (mono_100ns_ticks () - tp->last_sample.timeStamp > 10000 * 1000) { + float elapsed_ticks; + mono_perfcounter_get_sample (tp->pc_nitems, FALSE, &sample); + + elapsed_ticks = (float) (sample.timeStamp - tp->last_sample.timeStamp); + rate = ((float) (sample.rawValue - tp->last_sample.rawValue)) / elapsed_ticks * 10000000; + printf ("Queue: %s NThreads: %d Rate: %.2f Total items: %lld Time(ms): %.2f\n", get_queue_name (tp), + InterlockedCompareExchange (&tp->nthreads, 0, -1), rate, + sample.rawValue - tp->last_sample.rawValue, elapsed_ticks / 10000); + memcpy (&tp->last_sample, &sample, sizeof (sample)); + } + */ - socket_io_cleanup (&socket_io_data); + it->data = result = dequeue_job_nolock (tp); + if (result != NULL) { + signal_idle_threads (tp); + } else { + int min, n; + min = (gint) InterlockedCompareExchange (&tp->min_threads, 0, -1); + n = (gint) InterlockedCompareExchange (&tp->nthreads, 0, -1); + if (n <= min) { + g_queue_push_tail (tp->idle_threads, it); + } else { + /* TODO: figure out when threads should be told to die */ + /* it->die = TRUE; */ + g_queue_push_tail (tp->idle_threads, it); + } + } + LeaveCriticalSection (cs); + return result; } static void -append_job (CRITICAL_SECTION *cs, TPQueue *list, MonoObject *ar) +threadpool_append_job (ThreadPool *tp, MonoObject *ar) { - threadpool_jobs_inc (ar); + CRITICAL_SECTION *cs; + cs = &tp->lock; + threadpool_jobs_inc (ar); EnterCriticalSection (cs); - if (list->array && (list->next_elem < mono_array_length (list->array))) { - mono_array_setref (list->array, list->next_elem, ar); - list->next_elem++; + if (tp->idle_threads == NULL) { + LeaveCriticalSection (cs); + return; /* We are shutting down */ + } + if (ar->vtable->domain->state == MONO_APPDOMAIN_UNLOADING || + ar->vtable->domain->state == MONO_APPDOMAIN_UNLOADED) { LeaveCriticalSection (cs); return; } - if (!list->array) { - MONO_GC_REGISTER_ROOT (list->array); - list->array = mono_array_new (mono_get_root_domain (), mono_defaults.object_class, 16); + + mono_perfcounter_update_value (tp->pc_nitems, TRUE, 1); + if (tp->array && (tp->next_elem < mono_array_length (tp->array))) { + mono_array_setref (tp->array, tp->next_elem, ar); + tp->next_elem++; + if (signal_idle_threads (tp) > 0 && threadpool_start_thread (tp, ar)) { + tp->next_elem--; + mono_array_setref (tp->array, tp->next_elem, NULL); + } + LeaveCriticalSection (cs); + return; + } + + if (!tp->array) { + MONO_GC_REGISTER_ROOT (tp->array); + tp->array = mono_array_new_cached (mono_get_root_domain (), mono_defaults.object_class, INITIAL_QUEUE_LENGTH); } else { - int count = list->next_elem - list->first_elem; + int count = tp->next_elem - tp->first_elem; /* slide the array or create a larger one if it's full */ - if (list->first_elem) { - mono_array_memcpy_refs (list->array, 0, list->array, list->first_elem, count); + if (tp->first_elem) { + mono_array_memcpy_refs (tp->array, 0, tp->array, tp->first_elem, count); + null_array (tp->array, count, tp->next_elem); } else { - MonoArray *newa = mono_array_new (mono_get_root_domain (), mono_defaults.object_class, mono_array_length (list->array) * 2); - mono_array_memcpy_refs (newa, 0, list->array, list->first_elem, count); - list->array = newa; + MonoArray *newa = mono_array_new_cached (mono_get_root_domain (), mono_defaults.object_class, mono_array_length (tp->array) * 2); + mono_array_memcpy_refs (newa, 0, tp->array, tp->first_elem, count); + null_array (tp->array, count, tp->next_elem); + tp->array = newa; } - list->first_elem = 0; - list->next_elem = count; + tp->first_elem = 0; + tp->next_elem = count; + } + mono_array_setref (tp->array, tp->next_elem, ar); + tp->next_elem++; + if (signal_idle_threads (tp) > 0 && threadpool_start_thread (tp, ar)) { + tp->next_elem--; + mono_array_setref (tp->array, tp->next_elem, NULL); } - mono_array_setref (list->array, list->next_elem, ar); - list->next_elem++; LeaveCriticalSection (cs); } static void -clear_queue (CRITICAL_SECTION *cs, TPQueue *list, MonoDomain *domain) +threadpool_clear_queue (ThreadPool *tp, MonoDomain *domain) { int i, count = 0; - EnterCriticalSection (cs); + EnterCriticalSection (&tp->lock); /*remove*/ - for (i = list->first_elem; i < list->next_elem; ++i) { - MonoObject *obj = mono_array_get (list->array, MonoObject*, i); + for (i = tp->first_elem; i < tp->next_elem; ++i) { + MonoObject *obj = mono_array_get (tp->array, MonoObject*, i); if (obj->vtable->domain == domain) { - unregister_job ((MonoAsyncResult*)obj); - - mono_array_set (list->array, MonoObject*, i, NULL); + mono_array_set (tp->array, MonoObject*, i, NULL); InterlockedDecrement (&domain->threadpool_jobs); ++count; } @@ -1253,15 +1416,15 @@ clear_queue (CRITICAL_SECTION *cs, TPQueue *list, MonoDomain *domain) /*compact*/ if (count) { int idx = 0; - for (i = list->first_elem; i < list->next_elem; ++i) { - MonoObject *obj = mono_array_get (list->array, MonoObject*, i); + for (i = tp->first_elem; i < tp->next_elem; ++i) { + MonoObject *obj = mono_array_get (tp->array, MonoObject*, i); if (obj) - mono_array_set (list->array, MonoObject*, idx++, obj); + mono_array_set (tp->array, MonoObject*, idx++, obj); } - list->first_elem = 0; - list->next_elem = count; + tp->first_elem = 0; + tp->next_elem = count; } - LeaveCriticalSection (cs); + LeaveCriticalSection (&tp->lock); } /* @@ -1276,8 +1439,10 @@ mono_thread_pool_remove_domain_jobs (MonoDomain *domain, int timeout) int result = TRUE; guint32 start_time = 0; - clear_queue (&mono_delegate_section, &async_call_queue, domain); - clear_queue (&io_queue_lock, &async_io_queue, domain); + g_assert (domain->state == MONO_APPDOMAIN_UNLOADING); + + threadpool_clear_queue (&async_tp, domain); + threadpool_clear_queue (&async_io_tp, domain); /* * There might be some threads out that could be about to execute stuff from the given domain. @@ -1308,50 +1473,35 @@ mono_thread_pool_remove_domain_jobs (MonoDomain *domain, int timeout) return result; } - -static MonoObject* -dequeue_job (CRITICAL_SECTION *cs, TPQueue *list) +static void +threadpool_free_queue (ThreadPool *tp) { - MonoObject *ar; - int count; - - EnterCriticalSection (cs); - if (!list->array || list->first_elem == list->next_elem) { - LeaveCriticalSection (cs); - return NULL; - } - ar = mono_array_get (list->array, MonoObject*, list->first_elem); - list->first_elem++; - count = list->next_elem - list->first_elem; - /* reduce the size of the array if it's mostly empty */ - if (mono_array_length (list->array) > 16 && count < (mono_array_length (list->array) / 3)) { - MonoArray *newa = mono_array_new (mono_get_root_domain (), mono_defaults.object_class, mono_array_length (list->array) / 2); - mono_array_memcpy_refs (newa, 0, list->array, list->first_elem, count); - list->array = newa; - list->first_elem = 0; - list->next_elem = count; - } - LeaveCriticalSection (cs); - - return ar; + if (tp->array) + null_array (tp->array, tp->first_elem, tp->next_elem); + tp->array = NULL; + tp->first_elem = tp->next_elem = 0; } -static void -free_queue (TPQueue *list) +gboolean +mono_thread_pool_is_queue_array (MonoArray *o) { - list->array = NULL; - list->first_elem = list->next_elem = 0; + return o == async_tp.array || o == async_io_tp.array; } static void async_invoke_thread (gpointer data) { MonoDomain *domain; - MonoThread *thread; - int workers, min; + MonoInternalThread *thread; const gchar *version; + IdleThreadData idle_data = {0}; + + idle_data.timeout = INFINITE; + idle_data.wait_handle = CreateEvent (NULL, FALSE, FALSE, NULL); - thread = mono_thread_current (); + thread = mono_thread_internal_current (); + if (tp_start_func) + tp_start_func (tp_hooks_user_data); version = mono_get_runtime_info ()->framework_version; for (;;) { MonoAsyncResult *ar; @@ -1366,77 +1516,61 @@ async_invoke_thread (gpointer data) if (domain->state == MONO_APPDOMAIN_UNLOADED || domain->state == MONO_APPDOMAIN_UNLOADING) { threadpool_jobs_dec ((MonoObject *)ar); - unregister_job (ar); data = NULL; } else { mono_thread_push_appdomain_ref (domain); if (threadpool_jobs_dec ((MonoObject *)ar)) { - unregister_job (ar); data = NULL; mono_thread_pop_appdomain_ref (); continue; } if (mono_domain_set (domain, FALSE)) { - ASyncCall *ac; + /* ASyncCall *ac; */ + if (tp_item_begin_func) + tp_item_begin_func (tp_item_user_data); mono_async_invoke (ar); - ac = (ASyncCall *) ar->object_data; + if (tp_item_end_func) + tp_item_end_func (tp_item_user_data); /* + ac = (ASyncCall *) ar->object_data; if (ac->msg->exc != NULL) mono_unhandled_exception (ac->msg->exc); */ mono_domain_set (mono_get_root_domain (), TRUE); } mono_thread_pop_appdomain_ref (); - InterlockedDecrement (&busy_worker_threads); + InterlockedDecrement (&async_tp.busy_threads); /* If the callee changes the background status, set it back to TRUE */ if (*version != '1' && !mono_thread_test_state (thread , ThreadState_Background)) ves_icall_System_Threading_Thread_SetState (thread, ThreadState_Background); } } - - data = dequeue_job (&mono_delegate_section, &async_call_queue); - - if (!data) { + data = threadpool_queue_idle_thread (&async_tp, &idle_data); + while (!idle_data.die && !data) { guint32 wr; - int timeout = THREAD_EXIT_TIMEOUT; - guint32 start_time = mono_msec_ticks (); - - do { - wr = WaitForSingleObjectEx (job_added, (guint32)timeout, TRUE); - if (THREAD_WANTS_A_BREAK (thread)) - mono_thread_interruption_checkpoint (); - - timeout -= mono_msec_ticks () - start_time; - - if (wr != WAIT_TIMEOUT && wr != WAIT_IO_COMPLETION) - data = dequeue_job (&mono_delegate_section, &async_call_queue); + wr = WaitForSingleObjectEx (idle_data.wait_handle, idle_data.timeout, TRUE); + if (THREAD_WANTS_A_BREAK (thread)) + mono_thread_interruption_checkpoint (); + + if (wr != WAIT_TIMEOUT && wr != WAIT_IO_COMPLETION) { + data = idle_data.data; + break; /* We have to exit */ } - while (!data && timeout > 0); } + idle_data.data = NULL; if (!data) { - workers = (int) InterlockedCompareExchange (&mono_worker_threads, 0, -1); - min = (int) InterlockedCompareExchange (&mono_min_worker_threads, 0, -1); - - while (!data && workers <= min) { - WaitForSingleObjectEx (job_added, INFINITE, TRUE); - if (THREAD_WANTS_A_BREAK (thread)) - mono_thread_interruption_checkpoint (); - - data = dequeue_job (&mono_delegate_section, &async_call_queue); - workers = (int) InterlockedCompareExchange (&mono_worker_threads, 0, -1); - min = (int) InterlockedCompareExchange (&mono_min_worker_threads, 0, -1); - } - } - - if (!data) { - InterlockedDecrement (&mono_worker_threads); + InterlockedDecrement (&async_tp.nthreads); + CloseHandle (idle_data.wait_handle); + idle_data.wait_handle = NULL; + if (tp_finish_func) + tp_finish_func (tp_hooks_user_data); return; } - InterlockedIncrement (&busy_worker_threads); + InterlockedIncrement (&async_tp.busy_threads); } g_assert_not_reached (); @@ -1449,10 +1583,10 @@ ves_icall_System_Threading_ThreadPool_GetAvailableThreads (gint *workerThreads, MONO_ARCH_SAVE_REGS; - busy = (gint) InterlockedCompareExchange (&busy_worker_threads, 0, -1); - busy_io = (gint) InterlockedCompareExchange (&busy_io_worker_threads, 0, -1); - *workerThreads = mono_max_worker_threads - busy; - *completionPortThreads = mono_io_max_worker_threads - busy_io; + busy = (gint) InterlockedCompareExchange (&async_tp.busy_threads, 0, -1); + busy_io = (gint) InterlockedCompareExchange (&async_io_tp.busy_threads, 0, -1); + *workerThreads = async_tp.max_threads - busy; + *completionPortThreads = async_io_tp.max_threads - busy_io; } void @@ -1460,8 +1594,8 @@ ves_icall_System_Threading_ThreadPool_GetMaxThreads (gint *workerThreads, gint * { MONO_ARCH_SAVE_REGS; - *workerThreads = mono_max_worker_threads; - *completionPortThreads = mono_io_max_worker_threads; + *workerThreads = (gint) InterlockedCompareExchange (&async_tp.max_threads, 0, -1); + *completionPortThreads = (gint) InterlockedCompareExchange (&async_io_tp.max_threads, 0, -1); } void @@ -1471,26 +1605,37 @@ ves_icall_System_Threading_ThreadPool_GetMinThreads (gint *workerThreads, gint * MONO_ARCH_SAVE_REGS; - workers = (gint) InterlockedCompareExchange (&mono_min_worker_threads, 0, -1); - workers_io = (gint) InterlockedCompareExchange (&mono_io_min_worker_threads, 0, -1); + workers = (gint) InterlockedCompareExchange (&async_tp.min_threads, 0, -1); + workers_io = (gint) InterlockedCompareExchange (&async_io_tp.min_threads, 0, -1); *workerThreads = workers; *completionPortThreads = workers_io; } +static void +start_idle_threads (void) +{ + threadpool_start_idle_threads (&async_tp); +} + MonoBoolean ves_icall_System_Threading_ThreadPool_SetMinThreads (gint workerThreads, gint completionPortThreads) { + int max_threads; + int max_io_threads; + MONO_ARCH_SAVE_REGS; - if (workerThreads < 0 || workerThreads > mono_max_worker_threads) + max_threads = InterlockedCompareExchange (&async_tp.max_threads, -1, -1); + if (workerThreads <= 0 || workerThreads > max_threads) return FALSE; - if (completionPortThreads < 0 || completionPortThreads > mono_io_max_worker_threads) + max_io_threads = InterlockedCompareExchange (&async_io_tp.max_threads, -1, -1); + if (completionPortThreads <= 0 || completionPortThreads > max_io_threads) return FALSE; - InterlockedExchange (&mono_min_worker_threads, workerThreads); - InterlockedExchange (&mono_io_min_worker_threads, completionPortThreads); + InterlockedExchange (&async_tp.min_threads, workerThreads); + InterlockedExchange (&async_io_tp.min_threads, completionPortThreads); mono_thread_create_internal (mono_get_root_domain (), start_idle_threads, NULL, TRUE); return TRUE; } @@ -1498,16 +1643,60 @@ ves_icall_System_Threading_ThreadPool_SetMinThreads (gint workerThreads, gint co MonoBoolean ves_icall_System_Threading_ThreadPool_SetMaxThreads (gint workerThreads, gint completionPortThreads) { + int min_threads; + int min_io_threads; + int cpu_count; + MONO_ARCH_SAVE_REGS; - if (workerThreads < mono_max_worker_threads) + cpu_count = mono_cpu_count (); + min_threads = InterlockedCompareExchange (&async_tp.min_threads, -1, -1); + if (workerThreads < min_threads || workerThreads < cpu_count) return FALSE; - if (completionPortThreads < mono_io_max_worker_threads) + /* We don't really have the concept of completion ports. Do we care here? */ + min_io_threads = InterlockedCompareExchange (&async_io_tp.min_threads, -1, -1); + if (completionPortThreads < min_io_threads || completionPortThreads < cpu_count) return FALSE; - InterlockedExchange (&mono_max_worker_threads, workerThreads); - InterlockedExchange (&mono_io_max_worker_threads, completionPortThreads); + InterlockedExchange (&async_tp.max_threads, workerThreads); + InterlockedExchange (&async_io_tp.max_threads, completionPortThreads); return TRUE; } +/** + * mono_install_threadpool_thread_hooks + * @start_func: the function to be called right after a new threadpool thread is created. Can be NULL. + * @finish_func: the function to be called right before a thredpool thread is exiting. Can be NULL. + * @user_data: argument passed to @start_func and @finish_func. + * + * @start_fun will be called right after a threadpool thread is created and @finish_func right before a threadpool thread exits. + * The calls will be made from the thread itself. + */ +void +mono_install_threadpool_thread_hooks (MonoThreadPoolFunc start_func, MonoThreadPoolFunc finish_func, gpointer user_data) +{ + tp_start_func = start_func; + tp_finish_func = finish_func; + tp_hooks_user_data = user_data; +} + +/** + * mono_install_threadpool_item_hooks + * @begin_func: the function to be called before a threadpool work item processing starts. + * @end_func: the function to be called after a threadpool work item is finished. + * @user_data: argument passed to @begin_func and @end_func. + * + * The calls will be made from the thread itself and from the same AppDomain + * where the work item was executed. + * + */ +void +mono_install_threadpool_item_hooks (MonoThreadPoolItemFunc begin_func, MonoThreadPoolItemFunc end_func, gpointer user_data) +{ + tp_item_begin_func = begin_func; + tp_item_end_func = end_func; + tp_item_user_data = user_data; +} + +