X-Git-Url: http://wien.tomnetworks.com/gitweb/?a=blobdiff_plain;f=mono%2Fmetadata%2Fthreadpool.c;h=f22c80c4ed049fd03b708d0801edfb8f20e30e36;hb=e97a5fd1826b7068dbc27cb7439df1c37d1b50f0;hp=612090b0cececef2c8c028a705563c1d325f1cda;hpb=9975a45f43793e34318e593ddacd3f332f8d91c0;p=mono.git diff --git a/mono/metadata/threadpool.c b/mono/metadata/threadpool.c index 612090b0cec..f22c80c4ed0 100644 --- a/mono/metadata/threadpool.c +++ b/mono/metadata/threadpool.c @@ -27,15 +27,20 @@ #include #include #include +#include #include #include #include #include #include +#ifdef HAVE_SYS_TIME_H #include +#endif #include #include +#ifdef HAVE_UNISTD_H #include +#endif #include #include @@ -56,6 +61,7 @@ static int mono_max_worker_threads; static int mono_min_worker_threads; static int mono_io_max_worker_threads; +static int mono_io_min_worker_threads; /* current number of worker threads */ static int mono_worker_threads = 0; @@ -79,7 +85,7 @@ typedef struct { CRITICAL_SECTION io_lock; /* access to sock_to_state */ int inited; int pipe [2]; - GHashTable *sock_to_state; + MonoGHashTable *sock_to_state; HANDLE new_sem; /* access to newpfd and write side of the pipe */ mono_pollfd *newpfd; @@ -108,14 +114,21 @@ typedef struct { guint64 wait_event; } ASyncCall; +typedef struct { + MonoArray *array; + int first_elem; + int next_elem; +} TPQueue; + static void async_invoke_thread (gpointer data); -static void append_job (CRITICAL_SECTION *cs, GList **plist, gpointer ar); +static void append_job (CRITICAL_SECTION *cs, TPQueue *list, MonoObject *ar); static void start_thread_or_queue (MonoAsyncResult *ares); static void mono_async_invoke (MonoAsyncResult *ares); -static gpointer dequeue_job (CRITICAL_SECTION *cs, GList **plist); +static MonoObject* dequeue_job (CRITICAL_SECTION *cs, TPQueue *list); +static void free_queue (TPQueue *list); -static GList *async_call_queue = NULL; -static GList *async_io_queue = NULL; +static TPQueue async_call_queue = {NULL, 0, 0}; +static TPQueue async_io_queue = {NULL, 0, 0}; static MonoClass *async_call_klass; static MonoClass *socket_async_call_klass; @@ -158,10 +171,9 @@ socket_io_cleanup (SocketIOData *data) if (data->new_sem) CloseHandle (data->new_sem); data->new_sem = NULL; - g_hash_table_destroy (data->sock_to_state); + mono_g_hash_table_destroy (data->sock_to_state); data->sock_to_state = NULL; - g_list_free (async_io_queue); - async_io_queue = NULL; + free_queue (&async_io_queue); release = (gint) InterlockedCompareExchange (&io_worker_threads, 0, -1); if (io_job_added) ReleaseSemaphore (io_job_added, release, NULL); @@ -196,15 +208,14 @@ get_event_from_state (MonoSocketAsyncResult *state) } static int -get_events_from_list (GSList *list) +get_events_from_list (MonoMList *list) { MonoSocketAsyncResult *state; int events = 0; - while (list && list->data) { - state = (MonoSocketAsyncResult *) list->data; + while (list && (state = (MonoSocketAsyncResult *)mono_mlist_get_data (list))) { events |= get_event_from_state (state); - list = list->next; + list = mono_mlist_next (list); } return events; @@ -315,30 +326,29 @@ start_io_thread_or_queue (MonoSocketAsyncResult *ares) domain = ((ares) ? ((MonoObject *) ares)->vtable->domain : mono_domain_get ()); mono_thread_create (mono_get_root_domain (), async_invoke_io_thread, ares); } else { - append_job (&io_queue_lock, &async_io_queue, ares); + append_job (&io_queue_lock, &async_io_queue, (MonoObject*)ares); ReleaseSemaphore (io_job_added, 1, NULL); } } -static GSList * -process_io_event (GSList *list, int event) +static MonoMList * +process_io_event (MonoMList *list, int event) { MonoSocketAsyncResult *state; - GSList *oldlist; + MonoMList *oldlist; oldlist = list; state = NULL; while (list) { - state = (MonoSocketAsyncResult *) list->data; + state = (MonoSocketAsyncResult *) mono_mlist_get_data (list); if (get_event_from_state (state) == event) break; - list = list->next; + list = mono_mlist_next (list); } if (list != NULL) { - oldlist = g_slist_remove_link (oldlist, list); - g_slist_free_1 (list); + oldlist = mono_mlist_remove_item (oldlist, list); #ifdef EPOLL_DEBUG g_print ("Dispatching event %d on socket %d\n", event, state->handle); #endif @@ -399,7 +409,7 @@ socket_io_poll_main (gpointer p) int nsock = 0; mono_pollfd *pfd; char one [1]; - GSList *list; + MonoMList *list; do { if (nsock == -1) { @@ -480,6 +490,7 @@ socket_io_poll_main (gpointer p) EnterCriticalSection (&data->io_lock); if (data->inited == 0) { g_free (pfds); + LeaveCriticalSection (&data->io_lock); return; /* cleanup called */ } @@ -489,7 +500,7 @@ socket_io_poll_main (gpointer p) continue; nsock--; - list = g_hash_table_lookup (data->sock_to_state, GINT_TO_POINTER (pfd->fd)); + list = mono_g_hash_table_lookup (data->sock_to_state, GINT_TO_POINTER (pfd->fd)); if (list != NULL && (pfd->revents & (MONO_POLLIN | POLL_ERRORS)) != 0) { list = process_io_event (list, MONO_POLLIN); } @@ -499,10 +510,10 @@ socket_io_poll_main (gpointer p) } if (list != NULL) { - g_hash_table_replace (data->sock_to_state, GINT_TO_POINTER (pfd->fd), list); + mono_g_hash_table_replace (data->sock_to_state, GINT_TO_POINTER (pfd->fd), list); pfd->events = get_events_from_list (list); } else { - g_hash_table_remove (data->sock_to_state, GINT_TO_POINTER (pfd->fd)); + mono_g_hash_table_remove (data->sock_to_state, GINT_TO_POINTER (pfd->fd)); pfd->fd = -1; if (i == maxfd - 1) maxfd--; @@ -572,13 +583,13 @@ socket_io_epoll_main (gpointer p) for (i = 0; i < ready; i++) { int fd; - GSList *list; + MonoMList *list; evt = &events [i]; fd = evt->data.fd; - list = g_hash_table_lookup (data->sock_to_state, GINT_TO_POINTER (fd)); + list = mono_g_hash_table_lookup (data->sock_to_state, GINT_TO_POINTER (fd)); #ifdef EPOLL_DEBUG - g_print ("Event %d on %d list length: %d\n", evt->events, fd, g_slist_length (list)); + g_print ("Event %d on %d list length: %d\n", evt->events, fd, mono_mlist_length (list)); #endif if (list != NULL && (evt->events & (EPOLLIN | EPOLL_ERRORS)) != 0) { list = process_io_event (list, MONO_POLLIN); @@ -589,7 +600,7 @@ socket_io_epoll_main (gpointer p) } if (list != NULL) { - g_hash_table_replace (data->sock_to_state, GINT_TO_POINTER (fd), list); + mono_g_hash_table_replace (data->sock_to_state, GINT_TO_POINTER (fd), list); evt->events = get_events_from_list (list); #ifdef EPOLL_DEBUG g_print ("MOD %d to %d\n", fd, evt->events); @@ -604,7 +615,7 @@ socket_io_epoll_main (gpointer p) } } } else { - g_hash_table_remove (data->sock_to_state, GINT_TO_POINTER (fd)); + mono_g_hash_table_remove (data->sock_to_state, GINT_TO_POINTER (fd)); #ifdef EPOLL_DEBUG g_print ("DEL %d\n", fd); #endif @@ -624,27 +635,27 @@ void mono_thread_pool_remove_socket (int sock) { #ifdef HAVE_EPOLL - GSList *list, *next; + MonoMList *list, *next; MonoSocketAsyncResult *state; if (socket_io_data.epoll_disabled == TRUE || socket_io_data.inited == FALSE) return; EnterCriticalSection (&socket_io_data.io_lock); - list = g_hash_table_lookup (socket_io_data.sock_to_state, GINT_TO_POINTER (sock)); + list = mono_g_hash_table_lookup (socket_io_data.sock_to_state, GINT_TO_POINTER (sock)); if (list) { - g_hash_table_remove (socket_io_data.sock_to_state, GINT_TO_POINTER (sock)); + mono_g_hash_table_remove (socket_io_data.sock_to_state, GINT_TO_POINTER (sock)); } LeaveCriticalSection (&socket_io_data.io_lock); while (list) { - state = (MonoSocketAsyncResult *) list->data; + state = (MonoSocketAsyncResult *) mono_mlist_get_data (list); if (state->operation == AIO_OP_RECEIVE) state->operation = AIO_OP_RECV_JUST_CALLBACK; else if (state->operation == AIO_OP_SEND) state->operation = AIO_OP_SEND_JUST_CALLBACK; - next = g_slist_remove_link (list, list); + next = mono_mlist_remove_item (list, list); list = process_io_event (list, MONO_POLLIN); if (list) process_io_event (list, MONO_POLLOUT); @@ -745,11 +756,14 @@ socket_io_init (SocketIOData *data) if (mono_io_max_worker_threads < 10) mono_io_max_worker_threads = 10; - data->sock_to_state = g_hash_table_new (g_direct_hash, g_direct_equal); + data->sock_to_state = mono_g_hash_table_new_type (g_direct_hash, g_direct_equal, MONO_HASH_VALUE_GC); - if (data->epoll_disabled) + if (data->epoll_disabled) { data->new_sem = CreateSemaphore (NULL, 1, 1, NULL); + g_assert (data->new_sem != NULL); + } io_job_added = CreateSemaphore (NULL, 0, 0x7fffffff, NULL); + g_assert (io_job_added != NULL); InitializeCriticalSection (&io_queue_lock); if (data->epoll_disabled) { mono_thread_create (mono_get_root_domain (), socket_io_poll_main, data); @@ -768,7 +782,7 @@ socket_io_add_poll (MonoSocketAsyncResult *state) { int events; char msg [1]; - GSList *list; + MonoMList *list; SocketIOData *data = &socket_io_data; #if defined(PLATFORM_MACOSX) || defined(PLATFORM_BSD6) || defined(PLATFORM_WIN32) @@ -785,17 +799,17 @@ socket_io_add_poll (MonoSocketAsyncResult *state) data->newpfd = g_new0 (mono_pollfd, 1); EnterCriticalSection (&data->io_lock); - list = g_hash_table_lookup (data->sock_to_state, GINT_TO_POINTER (state->handle)); + /* FIXME: 64 bit issue: handle can be a pointer on windows? */ + list = mono_g_hash_table_lookup (data->sock_to_state, GINT_TO_POINTER (state->handle)); if (list == NULL) { - list = g_slist_alloc (); - list->data = state; + list = mono_mlist_alloc ((MonoObject*)state); } else { - list = g_slist_append (list, state); + list = mono_mlist_append (list, (MonoObject*)state); } events = get_events_from_list (list); INIT_POLLFD (data->newpfd, GPOINTER_TO_INT (state->handle), events); - g_hash_table_replace (data->sock_to_state, GINT_TO_POINTER (state->handle), list); + mono_g_hash_table_replace (data->sock_to_state, GINT_TO_POINTER (state->handle), list); LeaveCriticalSection (&data->io_lock); *msg = (char) state->operation; #ifndef PLATFORM_WIN32 @@ -809,7 +823,7 @@ socket_io_add_poll (MonoSocketAsyncResult *state) static gboolean socket_io_add_epoll (MonoSocketAsyncResult *state) { - GSList *list; + MonoMList *list; SocketIOData *data = &socket_io_data; struct epoll_event event; int epoll_op, ievt; @@ -818,13 +832,12 @@ socket_io_add_epoll (MonoSocketAsyncResult *state) memset (&event, 0, sizeof (struct epoll_event)); fd = GPOINTER_TO_INT (state->handle); EnterCriticalSection (&data->io_lock); - list = g_hash_table_lookup (data->sock_to_state, GINT_TO_POINTER (fd)); + list = mono_g_hash_table_lookup (data->sock_to_state, GINT_TO_POINTER (fd)); if (list == NULL) { - list = g_slist_alloc (); - list->data = state; + list = mono_mlist_alloc ((MonoObject*)state); epoll_op = EPOLL_CTL_ADD; } else { - list = g_slist_append (list, state); + list = mono_mlist_append (list, (MonoObject*)state); epoll_op = EPOLL_CTL_MOD; } @@ -834,7 +847,7 @@ socket_io_add_epoll (MonoSocketAsyncResult *state) if ((ievt & MONO_POLLOUT) != 0) event.events |= EPOLLOUT; - g_hash_table_replace (data->sock_to_state, state->handle, list); + mono_g_hash_table_replace (data->sock_to_state, state->handle, list); event.data.fd = fd; #ifdef EPOLL_DEBUG g_print ("%s %d with %d\n", epoll_op == EPOLL_CTL_ADD ? "ADD" : "MOD", fd, event.events); @@ -917,6 +930,8 @@ mono_async_invoke (MonoAsyncResult *ares) { ASyncCall *ac = (ASyncCall *)ares->object_data; MonoThread *thread = NULL; + MonoObject *res, *exc = NULL; + MonoArray *out_args = NULL; if (ares->execution_context) { /* use captured ExecutionContext (if available) */ @@ -928,8 +943,10 @@ mono_async_invoke (MonoAsyncResult *ares) } ac->msg->exc = NULL; - ac->res = mono_message_invoke (ares->async_delegate, ac->msg, - &ac->msg->exc, &ac->out_args); + res = mono_message_invoke (ares->async_delegate, ac->msg, &exc, &out_args); + MONO_OBJECT_SETREF (ac, res, res); + MONO_OBJECT_SETREF (ac, msg->exc, exc); + MONO_OBJECT_SETREF (ac, out_args, out_args); ares->completed = 1; @@ -953,7 +970,7 @@ mono_async_invoke (MonoAsyncResult *ares) /* notify listeners */ mono_monitor_enter ((MonoObject *) ares); if (ares->handle != NULL) { - ac->wait_event = (gsize)((MonoWaitHandle *) ares->handle)->handle; + ac->wait_event = (gsize) mono_wait_handle_get_handle ((MonoWaitHandle *) ares->handle); SetEvent ((gpointer)(gsize)ac->wait_event); } mono_monitor_exit ((MonoObject *) ares); @@ -973,10 +990,12 @@ mono_thread_pool_init () return; MONO_GC_REGISTER_ROOT (ares_htable); + MONO_GC_REGISTER_ROOT (socket_io_data.sock_to_state); InitializeCriticalSection (&socket_io_data.io_lock); InitializeCriticalSection (&ares_lock); ares_htable = mono_g_hash_table_new_type (NULL, NULL, MONO_HASH_KEY_VALUE_GC); job_added = CreateSemaphore (NULL, 0, 0x7fffffff, NULL); + g_assert (job_added != NULL); GetSystemInfo (&info); if (g_getenv ("MONO_THREADS_PER_CPU") != NULL) { threads_per_cpu = atoi (g_getenv ("MONO_THREADS_PER_CPU")); @@ -998,21 +1017,13 @@ mono_thread_pool_add (MonoObject *target, MonoMethodMessage *msg, MonoDelegate * MonoAsyncResult *ares; ASyncCall *ac; -#ifdef HAVE_BOEHM_GC - ac = GC_MALLOC (sizeof (ASyncCall)); -#elif defined(HAVE_SGEN_GC) - ac = mono_object_new (mono_domain_get (), async_call_klass); -#else - /* We'll leak the event if creaated... */ - ac = g_new0 (ASyncCall, 1); -#endif - ac->wait_event = 0; + ac = (ASyncCall*)mono_object_new (mono_domain_get (), async_call_klass); MONO_OBJECT_SETREF (ac, msg, msg); MONO_OBJECT_SETREF (ac, state, state); if (async_callback) { ac->cb_method = mono_get_delegate_invoke (((MonoObject *)async_callback)->vtable->klass); - ac->cb_target = async_callback; + MONO_OBJECT_SETREF (ac, cb_target, async_callback); } ares = mono_async_result_new (domain, NULL, ac->state, NULL, (MonoObject*)ac); @@ -1044,7 +1055,7 @@ start_thread_or_queue (MonoAsyncResult *ares) InterlockedIncrement (&busy_worker_threads); mono_thread_create (mono_get_root_domain (), async_invoke_thread, ares); } else { - append_job (&mono_delegate_section, &async_call_queue, ares); + append_job (&mono_delegate_section, &async_call_queue, (MonoObject*)ares); ReleaseSemaphore (job_added, 1, NULL); } } @@ -1076,6 +1087,7 @@ mono_thread_pool_finish (MonoAsyncResult *ares, MonoArray **out_args, MonoObject if (!ares->completed) { if (ares->handle == NULL) { ac->wait_event = (gsize)CreateEvent (NULL, TRUE, FALSE, NULL); + g_assert(ac->wait_event != 0); MONO_OBJECT_SETREF (ares, handle, (MonoObject *) mono_wait_handle_new (mono_object_domain (ares), (gpointer)(gsize)ac->wait_event)); } mono_monitor_exit ((MonoObject *) ares); @@ -1096,8 +1108,7 @@ mono_thread_pool_cleanup (void) gint release; EnterCriticalSection (&mono_delegate_section); - g_list_free (async_call_queue); - async_call_queue = NULL; + free_queue (&async_call_queue); release = (gint) InterlockedCompareExchange (&mono_worker_threads, 0, -1); LeaveCriticalSection (&mono_delegate_section); if (job_added) @@ -1107,53 +1118,70 @@ mono_thread_pool_cleanup (void) } static void -append_job (CRITICAL_SECTION *cs, GList **plist, gpointer ar) +append_job (CRITICAL_SECTION *cs, TPQueue *list, MonoObject *ar) { - GList *tmp, *list; - EnterCriticalSection (cs); - list = *plist; - if (list == NULL) { - list = g_list_append (list, ar); + if (list->array && (list->next_elem < mono_array_length (list->array))) { + mono_array_setref (list->array, list->next_elem, ar); + list->next_elem++; + LeaveCriticalSection (cs); + return; + } + if (!list->array) { + MONO_GC_REGISTER_ROOT (list->array); + list->array = mono_array_new (mono_get_root_domain (), mono_defaults.object_class, 16); } else { - for (tmp = list; tmp && tmp->data != NULL; tmp = tmp->next); - if (tmp == NULL) { - list = g_list_append (list, ar); + int count = list->next_elem - list->first_elem; + /* slide the array or create a larger one if it's full */ + if (list->first_elem) { + mono_array_memcpy_refs (list->array, 0, list->array, list->first_elem, count); } else { - tmp->data = ar; + MonoArray *newa = mono_array_new (mono_get_root_domain (), mono_defaults.object_class, mono_array_length (list->array) * 2); + mono_array_memcpy_refs (newa, 0, list->array, list->first_elem, count); + list->array = newa; } + list->first_elem = 0; + list->next_elem = count; } - *plist = list; + mono_array_setref (list->array, list->next_elem, ar); + list->next_elem++; LeaveCriticalSection (cs); } -static gpointer -dequeue_job (CRITICAL_SECTION *cs, GList **plist) +static MonoObject* +dequeue_job (CRITICAL_SECTION *cs, TPQueue *list) { - gpointer ar = NULL; - GList *tmp, *tmp2, *list; + MonoObject *ar; + int count; EnterCriticalSection (cs); - list = *plist; - tmp = list; - if (tmp) { - ar = tmp->data; - tmp->data = NULL; - tmp2 = tmp; - for (tmp2 = tmp; tmp2->next != NULL; tmp2 = tmp2->next); - if (tmp2 != tmp) { - list = tmp->next; - tmp->next = NULL; - tmp2->next = tmp; - tmp->prev = tmp2; - } + if (!list->array || list->first_elem == list->next_elem) { + LeaveCriticalSection (cs); + return NULL; + } + ar = mono_array_get (list->array, MonoObject*, list->first_elem); + list->first_elem++; + count = list->next_elem - list->first_elem; + /* reduce the size of the array if it's mostly empty */ + if (mono_array_length (list->array) > 16 && count < (mono_array_length (list->array) / 3)) { + MonoArray *newa = mono_array_new (mono_get_root_domain (), mono_defaults.object_class, mono_array_length (list->array) / 2); + mono_array_memcpy_refs (newa, 0, list->array, list->first_elem, count); + list->array = newa; + list->first_elem = 0; + list->next_elem = count; } - *plist = list; LeaveCriticalSection (cs); return ar; } +static void +free_queue (TPQueue *list) +{ + list->array = NULL; + list->first_elem = list->next_elem = 0; +} + static void async_invoke_thread (gpointer data) { @@ -1238,13 +1266,14 @@ async_invoke_thread (gpointer data) void ves_icall_System_Threading_ThreadPool_GetAvailableThreads (gint *workerThreads, gint *completionPortThreads) { - gint busy; + gint busy, busy_io; MONO_ARCH_SAVE_REGS; busy = (gint) InterlockedCompareExchange (&busy_worker_threads, 0, -1); + busy_io = (gint) InterlockedCompareExchange (&busy_io_worker_threads, 0, -1); *workerThreads = mono_max_worker_threads - busy; - *completionPortThreads = 0; + *completionPortThreads = mono_io_max_worker_threads - busy_io; } void @@ -1253,19 +1282,21 @@ ves_icall_System_Threading_ThreadPool_GetMaxThreads (gint *workerThreads, gint * MONO_ARCH_SAVE_REGS; *workerThreads = mono_max_worker_threads; - *completionPortThreads = 0; + *completionPortThreads = mono_io_max_worker_threads; } void ves_icall_System_Threading_ThreadPool_GetMinThreads (gint *workerThreads, gint *completionPortThreads) { - gint workers; + gint workers, workers_io; MONO_ARCH_SAVE_REGS; workers = (gint) InterlockedCompareExchange (&mono_min_worker_threads, 0, -1); + workers_io = (gint) InterlockedCompareExchange (&mono_io_min_worker_threads, 0, -1); + *workerThreads = workers; - *completionPortThreads = 0; + *completionPortThreads = workers_io; } MonoBoolean @@ -1275,8 +1306,28 @@ ves_icall_System_Threading_ThreadPool_SetMinThreads (gint workerThreads, gint co if (workerThreads < 0 || workerThreads > mono_max_worker_threads) return FALSE; + + if (completionPortThreads < 0 || completionPortThreads > mono_io_max_worker_threads) + return FALSE; + InterlockedExchange (&mono_min_worker_threads, workerThreads); + InterlockedExchange (&mono_io_min_worker_threads, completionPortThreads); /* FIXME: should actually start the idle threads if needed */ return TRUE; } +MonoBoolean +ves_icall_System_Threading_ThreadPool_SetMaxThreads (gint workerThreads, gint completionPortThreads) +{ + MONO_ARCH_SAVE_REGS; + + if (workerThreads < mono_max_worker_threads) + return FALSE; + + if (completionPortThreads < mono_io_max_worker_threads) + return FALSE; + + InterlockedExchange (&mono_max_worker_threads, workerThreads); + InterlockedExchange (&mono_io_max_worker_threads, completionPortThreads); + return TRUE; +}