X-Git-Url: http://wien.tomnetworks.com/gitweb/?a=blobdiff_plain;f=mono%2Fmetadata%2Fthreadpool.c;h=f8235f1ca6d972876a3c825ab6503f3e04f696bb;hb=3b5eb0cf5a2d48b362f64e0eaa351a56d7a0a065;hp=5c089bd7e5f47ca2e8cd896a3b97ef00dacff9f2;hpb=ac7288dc15d93db8a421991bb9458d3e5c641156;p=mono.git diff --git a/mono/metadata/threadpool.c b/mono/metadata/threadpool.c index 5c089bd7e5f..f8235f1ca6d 100644 --- a/mono/metadata/threadpool.c +++ b/mono/metadata/threadpool.c @@ -15,11 +15,10 @@ #ifdef PLATFORM_WIN32 #define WINVER 0x0500 #define _WIN32_WINNT 0x0500 -#define THREADS_PER_CPU 25 -#else -#define THREADS_PER_CPU 50 #endif +#define THREADS_PER_CPU 5 /* 20 + THREADS_PER_CPU * number of CPUs */ + #include #include #include @@ -54,9 +53,9 @@ #undef EPOLL_DEBUG /* maximum number of worker threads */ -static int mono_max_worker_threads = THREADS_PER_CPU; -static int mono_min_worker_threads = 0; -static int mono_io_max_worker_threads = THREADS_PER_CPU * 2; +static int mono_max_worker_threads; +static int mono_min_worker_threads; +static int mono_io_max_worker_threads; /* current number of worker threads */ static int mono_worker_threads = 0; @@ -96,14 +95,17 @@ static SocketIOData socket_io_data; static HANDLE job_added; static HANDLE io_job_added; +/* Keep in sync with the System.MonoAsyncCall class which provides GC tracking */ typedef struct { + MonoObject object; MonoMethodMessage *msg; - HANDLE wait_event; MonoMethod *cb_method; MonoDelegate *cb_target; MonoObject *state; MonoObject *res; MonoArray *out_args; + /* This is a HANDLE, we use guint64 so the managed object layout remains constant */ + guint64 wait_event; } ASyncCall; static void async_invoke_thread (gpointer data); @@ -115,7 +117,9 @@ static gpointer dequeue_job (CRITICAL_SECTION *cs, GList **plist); static GList *async_call_queue = NULL; static GList *async_io_queue = NULL; +static MonoClass *async_call_klass; static MonoClass *socket_async_call_klass; +static MonoClass *process_async_call_klass; #define INIT_POLLFD(a, b, c) {(a)->fd = b; (a)->events = c; (a)->revents = 0;} enum { @@ -128,6 +132,7 @@ enum { AIO_OP_SENDTO, AIO_OP_RECV_JUST_CALLBACK, AIO_OP_SEND_JUST_CALLBACK, + AIO_OP_READPIPE, AIO_OP_LAST }; @@ -177,6 +182,7 @@ get_event_from_state (MonoSocketAsyncResult *state) case AIO_OP_RECEIVE: case AIO_OP_RECV_JUST_CALLBACK: case AIO_OP_RECEIVEFROM: + case AIO_OP_READPIPE: return MONO_POLLIN; case AIO_OP_SEND: case AIO_OP_SEND_JUST_CALLBACK: @@ -219,7 +225,7 @@ async_invoke_io_thread (gpointer data) MonoThread *thread; thread = mono_thread_current (); thread->threadpool_thread = TRUE; - thread->state |= ThreadState_Background; + ves_icall_System_Threading_Thread_SetState (thread, ThreadState_Background); for (;;) { MonoSocketAsyncResult *state; @@ -229,8 +235,6 @@ async_invoke_io_thread (gpointer data) if (state) { InterlockedDecrement (&pending_io_items); ar = state->ares; - /* worker threads invokes methods in different domains, - * so we need to set the right domain here */ switch (state->operation) { case AIO_OP_RECEIVE: state->total = ICALL_RECV (state); @@ -240,17 +244,22 @@ async_invoke_io_thread (gpointer data) break; } + /* worker threads invokes methods in different domains, + * so we need to set the right domain here */ domain = ((MonoObject *)ar)->vtable->domain; + mono_thread_push_appdomain_ref (domain); if (mono_domain_set (domain, FALSE)) { ASyncCall *ac; - mono_thread_push_appdomain_ref (domain); mono_async_invoke (ar); - ac = (ASyncCall *) ar->data; + ac = (ASyncCall *) ar->object_data; + /* if (ac->msg->exc != NULL) mono_unhandled_exception (ac->msg->exc); - mono_thread_pop_appdomain_ref (); + */ + mono_domain_set (mono_get_root_domain (), TRUE); } + mono_thread_pop_appdomain_ref (); InterlockedDecrement (&busy_io_worker_threads); } @@ -304,7 +313,7 @@ start_io_thread_or_queue (MonoSocketAsyncResult *ares) InterlockedIncrement (&busy_io_worker_threads); InterlockedIncrement (&io_worker_threads); domain = ((ares) ? ((MonoObject *) ares)->vtable->domain : mono_domain_get ()); - mono_thread_create (domain, async_invoke_io_thread, ares); + mono_thread_create (mono_get_root_domain (), async_invoke_io_thread, ares); } else { append_job (&io_queue_lock, &async_io_queue, ares); ReleaseSemaphore (io_job_added, 1, NULL); @@ -352,7 +361,7 @@ mark_bad_fds (mono_pollfd *pfds, int nfds) if (pfd->fd == -1) continue; - ret = mono_poll (pfds, 1, 0); + ret = mono_poll (pfd, 1, 0); if (ret == -1 && errno == EBADF) { pfd->revents |= MONO_POLLNVAL; count++; @@ -378,7 +387,7 @@ socket_io_poll_main (gpointer p) thread = mono_thread_current (); thread->threadpool_thread = TRUE; - thread->state |= ThreadState_Background; + ves_icall_System_Threading_Thread_SetState (thread, ThreadState_Background); allocated = INITIAL_POLLFD_SIZE; pfds = g_new0 (mono_pollfd, allocated); @@ -471,6 +480,7 @@ socket_io_poll_main (gpointer p) EnterCriticalSection (&data->io_lock); if (data->inited == 0) { g_free (pfds); + LeaveCriticalSection (&data->io_lock); return; /* cleanup called */ } @@ -519,7 +529,7 @@ socket_io_epoll_main (gpointer p) epollfd = data->epollfd; thread = mono_thread_current (); thread->threadpool_thread = TRUE; - thread->state |= ThreadState_Background; + ves_icall_System_Threading_Thread_SetState (thread, ThreadState_Background); events = g_new0 (struct epoll_event, nevents); while (1) { @@ -732,12 +742,18 @@ socket_io_init (SocketIOData *data) g_assert (data->pipe [0] != INVALID_SOCKET); closesocket (srv); #endif + mono_io_max_worker_threads = mono_max_worker_threads / 2; + if (mono_io_max_worker_threads < 10) + mono_io_max_worker_threads = 10; data->sock_to_state = g_hash_table_new (g_direct_hash, g_direct_equal); - if (data->epoll_disabled) + if (data->epoll_disabled) { data->new_sem = CreateSemaphore (NULL, 1, 1, NULL); + g_assert (data->new_sem != NULL); + } io_job_added = CreateSemaphore (NULL, 0, 0x7fffffff, NULL); + g_assert (io_job_added != NULL); InitializeCriticalSection (&io_queue_lock); if (data->epoll_disabled) { mono_thread_create (mono_get_root_domain (), socket_io_poll_main, data); @@ -759,12 +775,23 @@ socket_io_add_poll (MonoSocketAsyncResult *state) GSList *list; SocketIOData *data = &socket_io_data; +#if defined(PLATFORM_MACOSX) || defined(PLATFORM_BSD6) || defined(PLATFORM_WIN32) + /* select() for connect() does not work well on the Mac. Bug #75436. */ + /* Bug #77637 for the BSD 6 case */ + /* Bug #78888 for the Windows case */ + if (state->operation == AIO_OP_CONNECT && state->blocking == TRUE) { + start_io_thread_or_queue (state); + return; + } +#endif WaitForSingleObject (data->new_sem, INFINITE); if (data->newpfd == NULL) data->newpfd = g_new0 (mono_pollfd, 1); EnterCriticalSection (&data->io_lock); + /* FIXME: 64 bit issue: handle can be a pointer on windows? */ list = g_hash_table_lookup (data->sock_to_state, GINT_TO_POINTER (state->handle)); + /* FIXME: GC issue: state is an object stored in a GList */ if (list == NULL) { list = g_slist_alloc (); list->data = state; @@ -798,6 +825,7 @@ socket_io_add_epoll (MonoSocketAsyncResult *state) fd = GPOINTER_TO_INT (state->handle); EnterCriticalSection (&data->io_lock); list = g_hash_table_lookup (data->sock_to_state, GINT_TO_POINTER (fd)); + /* FIXME: GC issue: state is an object stored in a GList */ if (list == NULL) { list = g_slist_alloc (); list->data = state; @@ -837,7 +865,7 @@ static void socket_io_add (MonoAsyncResult *ares, MonoSocketAsyncResult *state) { socket_io_init (&socket_io_data); - state->ares = ares; + MONO_OBJECT_SETREF (state, ares, ares); #ifdef HAVE_EPOLL if (socket_io_data.epoll_disabled == FALSE) { if (socket_io_add_epoll (state)) @@ -857,21 +885,31 @@ socket_io_filter (MonoObject *target, MonoObject *state) if (target == NULL || state == NULL) return FALSE; - klass = InterlockedCompareExchangePointer ((gpointer *) &socket_async_call_klass, NULL, NULL); - if (klass == NULL) { - MonoImage *system_assembly = mono_image_loaded ("System"); - - if (system_assembly == NULL) - g_assert_not_reached (); - - klass = mono_class_from_name (system_assembly, "System.Net.Sockets", "Socket/SocketAsyncCall"); - if (klass == NULL) - g_assert_not_reached (); - - InterlockedCompareExchangePointer ((gpointer *) &socket_async_call_klass, klass, NULL); + if (socket_async_call_klass == NULL) { + klass = target->vtable->klass; + /* Check if it's SocketAsyncCall in System.Net.Sockets + * FIXME: check the assembly is signed correctly for extra care + */ + if (klass->name [0] == 'S' && strcmp (klass->name, "SocketAsyncCall") == 0 + && strcmp (mono_image_get_name (klass->image), "System") == 0 + && klass->nested_in && strcmp (klass->nested_in->name, "Socket") == 0) + socket_async_call_klass = klass; } - if (target->vtable->klass != klass) + if (process_async_call_klass == NULL) { + klass = target->vtable->klass; + /* Check if it's AsyncReadHandler in System.Diagnostics.Process + * FIXME: check the assembly is signed correctly for extra care + */ + if (klass->name [0] == 'A' && strcmp (klass->name, "AsyncReadHandler") == 0 + && strcmp (mono_image_get_name (klass->image), "System") == 0 + && klass->nested_in && strcmp (klass->nested_in->name, "Process") == 0) + process_async_call_klass = klass; + } + /* return both when socket_async_call_klass has not been seen yet and when + * the object is not an instance of the class. + */ + if (target->vtable->klass != socket_async_call_klass && target->vtable->klass != process_async_call_klass) return FALSE; op = sock_res->operation; @@ -884,11 +922,25 @@ socket_io_filter (MonoObject *target, MonoObject *state) static void mono_async_invoke (MonoAsyncResult *ares) { - ASyncCall *ac = (ASyncCall *)ares->data; + ASyncCall *ac = (ASyncCall *)ares->object_data; + MonoThread *thread = NULL; + MonoObject *res, *exc = NULL; + MonoArray *out_args = NULL; + + if (ares->execution_context) { + /* use captured ExecutionContext (if available) */ + thread = mono_thread_current (); + MONO_OBJECT_SETREF (ares, original_context, thread->execution_context); + MONO_OBJECT_SETREF (thread, execution_context, ares->execution_context); + } else { + ares->original_context = NULL; + } ac->msg->exc = NULL; - ac->res = mono_message_invoke (ares->async_delegate, ac->msg, - &ac->msg->exc, &ac->out_args); + res = mono_message_invoke (ares->async_delegate, ac->msg, &exc, &out_args); + MONO_OBJECT_SETREF (ac, res, res); + MONO_OBJECT_SETREF (ac, msg->exc, exc); + MONO_OBJECT_SETREF (ac, out_args, out_args); ares->completed = 1; @@ -897,15 +949,23 @@ mono_async_invoke (MonoAsyncResult *ares) MonoObject *exc = NULL; void *pa = &ares; mono_runtime_invoke (ac->cb_method, ac->cb_target, pa, &exc); - if (!ac->msg->exc) - ac->msg->exc = exc; + /* 'exc' will be the previous ac->msg->exc if not NULL and not + * catched. If catched, this will be set to NULL and the + * exception will not be printed. */ + MONO_OBJECT_SETREF (ac->msg, exc, exc); + } + + /* restore original thread execution context if flow isn't suppressed, i.e. non null */ + if (ares->original_context) { + MONO_OBJECT_SETREF (thread, execution_context, ares->original_context); + ares->original_context = NULL; } /* notify listeners */ mono_monitor_enter ((MonoObject *) ares); if (ares->handle != NULL) { - ac->wait_event = ((MonoWaitHandle *) ares->handle)->handle; - SetEvent (ac->wait_event); + ac->wait_event = (gsize)((MonoWaitHandle *) ares->handle)->handle; + SetEvent ((gpointer)(gsize)ac->wait_event); } mono_monitor_exit ((MonoObject *) ares); @@ -926,16 +986,20 @@ mono_thread_pool_init () MONO_GC_REGISTER_ROOT (ares_htable); InitializeCriticalSection (&socket_io_data.io_lock); InitializeCriticalSection (&ares_lock); - ares_htable = mono_g_hash_table_new (NULL, NULL); + ares_htable = mono_g_hash_table_new_type (NULL, NULL, MONO_HASH_KEY_VALUE_GC); job_added = CreateSemaphore (NULL, 0, 0x7fffffff, NULL); + g_assert (job_added != NULL); GetSystemInfo (&info); - if (getenv ("MONO_THREADS_PER_CPU") != NULL) { - threads_per_cpu = atoi (getenv ("MONO_THREADS_PER_CPU")); + if (g_getenv ("MONO_THREADS_PER_CPU") != NULL) { + threads_per_cpu = atoi (g_getenv ("MONO_THREADS_PER_CPU")); if (threads_per_cpu <= 0) threads_per_cpu = THREADS_PER_CPU; } - mono_max_worker_threads = threads_per_cpu * info.dwNumberOfProcessors; + mono_max_worker_threads = 20 + threads_per_cpu * info.dwNumberOfProcessors; + + async_call_klass = mono_class_from_name (mono_defaults.corlib, "System", "MonoAsyncCall"); + g_assert (async_call_klass); } MonoAsyncResult * @@ -946,23 +1010,17 @@ mono_thread_pool_add (MonoObject *target, MonoMethodMessage *msg, MonoDelegate * MonoAsyncResult *ares; ASyncCall *ac; -#ifdef HAVE_BOEHM_GC - ac = GC_MALLOC (sizeof (ASyncCall)); -#else - /* We'll leak the event if creaated... */ - ac = g_new0 (ASyncCall, 1); -#endif - ac->wait_event = NULL; - ac->msg = msg; - ac->state = state; + ac = (ASyncCall*)mono_object_new (mono_domain_get (), async_call_klass); + MONO_OBJECT_SETREF (ac, msg, msg); + MONO_OBJECT_SETREF (ac, state, state); if (async_callback) { ac->cb_method = mono_get_delegate_invoke (((MonoObject *)async_callback)->vtable->klass); - ac->cb_target = async_callback; + MONO_OBJECT_SETREF (ac, cb_target, async_callback); } - ares = mono_async_result_new (domain, NULL, ac->state, ac); - ares->async_delegate = target; + ares = mono_async_result_new (domain, NULL, ac->state, NULL, (MonoObject*)ac); + MONO_OBJECT_SETREF (ares, async_delegate, target); EnterCriticalSection (&ares_lock); mono_g_hash_table_insert (ares_htable, ares, ares); @@ -981,7 +1039,6 @@ static void start_thread_or_queue (MonoAsyncResult *ares) { int busy, worker; - MonoDomain *domain; busy = (int) InterlockedCompareExchange (&busy_worker_threads, 0, -1); worker = (int) InterlockedCompareExchange (&mono_worker_threads, 0, -1); @@ -989,8 +1046,7 @@ start_thread_or_queue (MonoAsyncResult *ares) worker < mono_max_worker_threads) { InterlockedIncrement (&mono_worker_threads); InterlockedIncrement (&busy_worker_threads); - domain = ((MonoObject *) ares)->vtable->domain; - mono_thread_create (domain, async_invoke_thread, ares); + mono_thread_create (mono_get_root_domain (), async_invoke_thread, ares); } else { append_job (&mono_delegate_section, &async_call_queue, ares); ReleaseSemaphore (job_added, 1, NULL); @@ -1016,23 +1072,24 @@ mono_thread_pool_finish (MonoAsyncResult *ares, MonoArray **out_args, MonoObject } ares->endinvoke_called = 1; - ac = (ASyncCall *)ares->data; + ac = (ASyncCall *)ares->object_data; g_assert (ac != NULL); /* wait until we are really finished */ if (!ares->completed) { if (ares->handle == NULL) { - ac->wait_event = CreateEvent (NULL, TRUE, FALSE, NULL); - ares->handle = (MonoObject *) mono_wait_handle_new (mono_object_domain (ares), ac->wait_event); + ac->wait_event = (gsize)CreateEvent (NULL, TRUE, FALSE, NULL); + g_assert(ac->wait_event != 0); + MONO_OBJECT_SETREF (ares, handle, (MonoObject *) mono_wait_handle_new (mono_object_domain (ares), (gpointer)(gsize)ac->wait_event)); } mono_monitor_exit ((MonoObject *) ares); - WaitForSingleObjectEx (ac->wait_event, INFINITE, TRUE); + WaitForSingleObjectEx ((gpointer)(gsize)ac->wait_event, INFINITE, TRUE); } else { mono_monitor_exit ((MonoObject *) ares); } - *exc = ac->msg->exc; + *exc = ac->msg->exc; /* FIXME: GC add write barrier */ *out_args = ac->out_args; return ac->res; @@ -1054,6 +1111,7 @@ mono_thread_pool_cleanup (void) socket_io_cleanup (&socket_io_data); } +/* FIXME: GC: adds managed objects to the list... */ static void append_job (CRITICAL_SECTION *cs, GList **plist, gpointer ar) { @@ -1111,7 +1169,7 @@ async_invoke_thread (gpointer data) thread = mono_thread_current (); thread->threadpool_thread = TRUE; - thread->state |= ThreadState_Background; + ves_icall_System_Threading_Thread_SetState (thread, ThreadState_Background); for (;;) { MonoAsyncResult *ar; @@ -1121,21 +1179,24 @@ async_invoke_thread (gpointer data) /* worker threads invokes methods in different domains, * so we need to set the right domain here */ domain = ((MonoObject *)ar)->vtable->domain; + mono_thread_push_appdomain_ref (domain); if (mono_domain_set (domain, FALSE)) { ASyncCall *ac; - mono_thread_push_appdomain_ref (domain); mono_async_invoke (ar); - ac = (ASyncCall *) ar->data; + ac = (ASyncCall *) ar->object_data; + /* if (ac->msg->exc != NULL) mono_unhandled_exception (ac->msg->exc); - mono_thread_pop_appdomain_ref (); + */ + mono_domain_set (mono_get_root_domain (), TRUE); } + mono_thread_pop_appdomain_ref (); InterlockedDecrement (&busy_worker_threads); } data = dequeue_job (&mono_delegate_section, &async_call_queue); - + if (!data) { guint32 wr; int timeout = 10000;