X-Git-Url: http://wien.tomnetworks.com/gitweb/?a=blobdiff_plain;f=mono%2Fmetadata%2Fthreadpool.c;h=6d0996e444ae93f529fcb6ec567e7c5cf8eb9a94;hb=62a953fddf5fa49c3e0377503f6225951287c021;hp=1eb527e00fc72013b136c9e4660051d0580fb778;hpb=c133ae2dc82c0533f5790333a3c93b2635b9c43c;p=mono.git diff --git a/mono/metadata/threadpool.c b/mono/metadata/threadpool.c index 1eb527e00fc..6d0996e444a 100644 --- a/mono/metadata/threadpool.c +++ b/mono/metadata/threadpool.c @@ -19,6 +19,7 @@ #include #include #include +#include #include #include #include @@ -29,6 +30,7 @@ #include #include #include +#include #include #ifdef HAVE_SYS_TIME_H #include @@ -88,7 +90,7 @@ typedef struct { gint event_system; gpointer event_data; - void (*modify) (gpointer event_data, int fd, int operation, int events, gboolean is_new); + void (*modify) (gpointer p, int fd, int operation, int events, gboolean is_new); void (*wait) (gpointer sock_data); void (*shutdown) (gpointer event_data); } SocketIOData; @@ -151,6 +153,7 @@ static void socket_io_cleanup (SocketIOData *data); static MonoObject *get_io_event (MonoMList **list, gint event); static int get_events_from_list (MonoMList *list); static int get_event_from_state (MonoSocketAsyncResult *state); +static void check_for_interruption_critical (void); static MonoClass *async_call_klass; static MonoClass *socket_async_call_klass; @@ -158,6 +161,7 @@ static MonoClass *process_async_call_klass; static GPtrArray *wsqs; CRITICAL_SECTION wsqs_lock; +static gboolean suspended; /* Hooks */ static MonoThreadPoolFunc tp_start_func; @@ -189,7 +193,7 @@ enum { #include #ifdef HAVE_EPOLL #include -#elif defined(HAVE_KQUEUE) +#elif defined(USE_KQUEUE_FOR_THREADPOOL) #include #endif /* @@ -215,30 +219,11 @@ is_corlib_type (MonoDomain *domain, MonoClass *klass) /* * Note that we call it is_socket_type() where 'socket' refers to the image * that contains the System.Net.Sockets.Socket type. - * For moonlight there is a System.Net.Sockets.Socket class in both System.dll and System.Net.dll. */ static gboolean is_socket_type (MonoDomain *domain, MonoClass *klass) { - static const char *version = NULL; - static gboolean moonlight; - - if (is_system_type (domain, klass)) - return TRUE; - - /* If moonlight, check if the type is in System.Net.dll too */ - if (version == NULL) { - version = mono_get_runtime_info ()->framework_version; - moonlight = !strcmp (version, "2.1"); - } - - if (!moonlight) - return FALSE; - - if (domain->system_net_dll == NULL) - domain->system_net_dll = mono_image_loaded ("System.Net"); - - return klass->image == domain->system_net_dll; + return is_system_type (domain, klass); } #define check_type_cached(domain, ASSEMBLY, _class, _namespace, _name, loc) do { \ @@ -496,7 +481,7 @@ init_event_system (SocketIOData *data) data->event_system = POLL_BACKEND; } } -#elif defined(HAVE_KQUEUE) +#elif defined(USE_KQUEUE_FOR_THREADPOOL) if (data->event_system == KQUEUE_BACKEND) data->event_data = tp_kqueue_init (data); #endif @@ -525,7 +510,7 @@ socket_io_init (SocketIOData *data) data->sock_to_state = mono_g_hash_table_new_type (g_direct_hash, g_direct_equal, MONO_HASH_VALUE_GC); #ifdef HAVE_EPOLL data->event_system = EPOLL_BACKEND; -#elif defined(HAVE_KQUEUE) +#elif defined(USE_KQUEUE_FOR_THREADPOOL) data->event_system = KQUEUE_BACKEND; #else data->event_system = POLL_BACKEND; @@ -534,7 +519,7 @@ socket_io_init (SocketIOData *data) data->event_system = POLL_BACKEND; init_event_system (data); - mono_thread_create_internal (mono_get_root_domain (), data->wait, data, TRUE, SMALL_STACK); + mono_thread_create_internal (mono_get_root_domain (), data->wait, data, TRUE, FALSE, SMALL_STACK); LeaveCriticalSection (&data->io_lock); data->inited = 2; threadpool_start_thread (&async_io_tp); @@ -574,8 +559,8 @@ socket_io_add (MonoAsyncResult *ares, MonoSocketAsyncResult *state) mono_g_hash_table_replace (data->sock_to_state, state->handle, list); ievt = get_events_from_list (list); - data->modify (data->event_data, fd, state->operation, ievt, is_new); - LeaveCriticalSection (&data->io_lock); + /* The modify function leaves the io_lock critical section. */ + data->modify (data, fd, state->operation, ievt, is_new); } #ifndef DISABLE_SOCKETS @@ -618,6 +603,7 @@ mono_async_invoke (ThreadPool *tp, MonoAsyncResult *ares) MonoObject *res, *exc = NULL; MonoArray *out_args = NULL; HANDLE wait_event = NULL; + MonoInternalThread *thread = mono_thread_internal_current (); if (ares->execution_context) { /* use captured ExecutionContext (if available) */ @@ -630,7 +616,10 @@ mono_async_invoke (ThreadPool *tp, MonoAsyncResult *ares) if (ac == NULL) { /* Fast path from ThreadPool.*QueueUserWorkItem */ void *pa = ares->async_state; + /* The debugger needs this */ + thread->async_invoke_method = ((MonoDelegate*)ares->async_delegate)->method; res = mono_runtime_delegate_invoke (ares->async_delegate, &pa, &exc); + thread->async_invoke_method = NULL; } else { MonoObject *cb_exc = NULL; @@ -653,7 +642,9 @@ mono_async_invoke (ThreadPool *tp, MonoAsyncResult *ares) if (ac != NULL && ac->cb_method) { void *pa = &ares; cb_exc = NULL; + thread->async_invoke_method = ac->cb_method; mono_runtime_invoke (ac->cb_method, ac->cb_target, pa, &cb_exc); + thread->async_invoke_method = NULL; exc = cb_exc; } else { exc = NULL; @@ -683,8 +674,10 @@ threadpool_start_idle_threads (ThreadPool *tp) if (InterlockedCompareExchange (&tp->nthreads, n + 1, n) == n) break; } +#ifndef DISABLE_PERFCOUNTERS mono_perfcounter_update_value (tp->pc_nthreads, TRUE, 1); - mono_thread_create_internal (mono_get_root_domain (), tp->async_invoke, tp, TRUE, stack_size); +#endif + mono_thread_create_internal (mono_get_root_domain (), tp->async_invoke, tp, TRUE, FALSE, stack_size); SleepEx (100, TRUE); } while (1); } @@ -700,6 +693,7 @@ threadpool_init (ThreadPool *tp, int min_threads, int max_threads, void (*async_ MONO_SEM_INIT (&tp->new_job, 0); } +#ifndef DISABLE_PERFCOUNTERS static void * init_perf_counter (const char *category, const char *counter) { @@ -718,6 +712,7 @@ init_perf_counter (const char *category, const char *counter) machine = mono_string_new (root, "."); return mono_perfcounter_get_impl (category_str, counter_str, NULL, machine, &type, &custom); } +#endif #ifdef DEBUG static void @@ -777,6 +772,7 @@ monitor_thread (gpointer unused) ves_icall_System_Threading_Thread_SetName_internal (thread, mono_string_new (mono_domain_get (), "Threadpool monitor")); while (1) { ms = 500; + i = 10; //number of spurious awakes we tolerate before doing a round of rebalancing. do { guint32 ts; ts = mono_msec_ticks (); @@ -787,11 +783,14 @@ monitor_thread (gpointer unused) break; if (THREAD_WANTS_A_BREAK (thread)) mono_thread_interruption_checkpoint (); - } while (ms > 0); + } while (ms > 0 && i--); if (mono_runtime_is_shutting_down ()) break; + if (suspended) + continue; + for (i = 0; i < 2; i++) { ThreadPool *tp; tp = pools [i]; @@ -817,7 +816,13 @@ monitor_thread (gpointer unused) } void -mono_thread_pool_init () +mono_thread_pool_init_tls (void) +{ + mono_wsq_init (); +} + +void +mono_thread_pool_init (void) { gint threads_per_cpu = 1; gint thread_count; @@ -854,8 +859,8 @@ mono_thread_pool_init () InitializeCriticalSection (&wsqs_lock); wsqs = g_ptr_array_sized_new (MAX (100 * cpu_count, thread_count)); - mono_wsq_init (); +#ifndef DISABLE_PERFCOUNTERS async_tp.pc_nitems = init_perf_counter ("Mono Threadpool", "Work Items Added"); g_assert (async_tp.pc_nitems); @@ -867,6 +872,7 @@ mono_thread_pool_init () async_io_tp.pc_nthreads = init_perf_counter ("Mono Threadpool", "# of IO Threads"); g_assert (async_io_tp.pc_nthreads); +#endif tp_inited = 2; #ifdef DEBUG signal (SIGALRM, signal_handler); @@ -1019,8 +1025,10 @@ threadpool_start_thread (ThreadPool *tp) stack_size = (!tp->is_io) ? 0 : SMALL_STACK; while (!mono_runtime_is_shutting_down () && (n = tp->nthreads) < tp->max_threads) { if (InterlockedCompareExchange (&tp->nthreads, n + 1, n) == n) { +#ifndef DISABLE_PERFCOUNTERS mono_perfcounter_update_value (tp->pc_nthreads, TRUE, 1); - mono_thread_create_internal (mono_get_root_domain (), tp->async_invoke, tp, TRUE, stack_size); +#endif + mono_thread_create_internal (mono_get_root_domain (), tp->async_invoke, tp, TRUE, FALSE, stack_size); return TRUE; } } @@ -1059,13 +1067,15 @@ threadpool_append_jobs (ThreadPool *tp, MonoObject **jobs, gint njobs) if (tp->pool_status == 0 && InterlockedCompareExchange (&tp->pool_status, 1, 0) == 0) { if (!tp->is_io) { - mono_thread_create_internal (mono_get_root_domain (), monitor_thread, NULL, TRUE, SMALL_STACK); + mono_thread_create_internal (mono_get_root_domain (), monitor_thread, NULL, TRUE, FALSE, SMALL_STACK); threadpool_start_thread (tp); } /* Create on demand up to min_threads to avoid startup penalty for apps that don't use * the threadpool that much - * mono_thread_create_internal (mono_get_root_domain (), threadpool_start_idle_threads, tp, TRUE, SMALL_STACK); - */ + */ + if (mono_config_is_server_mode ()) { + mono_thread_create_internal (mono_get_root_domain (), threadpool_start_idle_threads, tp, TRUE, FALSE, SMALL_STACK); + } } for (i = 0; i < njobs; i++) { @@ -1077,7 +1087,9 @@ threadpool_append_jobs (ThreadPool *tp, MonoObject **jobs, gint njobs) o->add_time = mono_100ns_ticks (); } threadpool_jobs_inc (ar); +#ifndef DISABLE_PERFCOUNTERS mono_perfcounter_update_value (tp->pc_nitems, TRUE, 1); +#endif if (!tp->is_io && mono_wsq_local_push (ar)) continue; @@ -1092,10 +1104,13 @@ static void threadpool_clear_queue (ThreadPool *tp, MonoDomain *domain) { MonoObject *obj; - MonoMList *other; + MonoMList *other = NULL; + MonoCQ *queue = tp->queue; - other = NULL; - while (mono_cq_dequeue (tp->queue, &obj)) { + if (!queue) + return; + + while (mono_cq_dequeue (queue, &obj)) { if (obj == NULL) continue; if (obj->vtable->domain != domain) @@ -1103,6 +1118,9 @@ threadpool_clear_queue (ThreadPool *tp, MonoDomain *domain) threadpool_jobs_dec (obj); } + if (mono_runtime_is_shutting_down ()) + return; + while (other) { threadpool_append_job (tp, (MonoObject *) mono_mlist_get_data (other)); other = mono_mlist_next (other); @@ -1282,9 +1300,10 @@ try_steal (MonoWSQ *local_wsq, gpointer *data, gboolean retry) static gboolean dequeue_or_steal (ThreadPool *tp, gpointer *data, MonoWSQ *local_wsq) { - if (mono_runtime_is_shutting_down ()) + MonoCQ *queue = tp->queue; + if (mono_runtime_is_shutting_down () || !queue) return FALSE; - mono_cq_dequeue (tp->queue, (MonoObject **) data); + mono_cq_dequeue (queue, (MonoObject **) data); if (!tp->is_io && !*data) try_steal (local_wsq, data, FALSE); return (*data != NULL); @@ -1377,26 +1396,63 @@ should_i_die (ThreadPool *tp) return result; } +static void +set_tp_thread_info (ThreadPool *tp) +{ + const gchar *name; + MonoInternalThread *thread = mono_thread_internal_current (); + + mono_profiler_thread_start (thread->tid); + name = (tp->is_io) ? "IO Threadpool worker" : "Threadpool worker"; + mono_thread_set_name_internal (thread, mono_string_new (mono_domain_get (), name), FALSE); +} + +static void +clear_thread_state (void) +{ + MonoInternalThread *thread = mono_thread_internal_current (); + /* If the callee changes the background status, set it back to TRUE */ + mono_thread_clr_state (thread , ~ThreadState_Background); + if (!mono_thread_test_state (thread , ThreadState_Background)) + ves_icall_System_Threading_Thread_SetState (thread, ThreadState_Background); +} + +static void +check_for_interruption_critical (void) +{ + MonoInternalThread *thread; + /*RULE NUMBER ONE OF SKIP_THREAD: NEVER POKE MANAGED STATE.*/ + mono_gc_set_skip_thread (FALSE); + + thread = mono_thread_internal_current (); + if (THREAD_WANTS_A_BREAK (thread)) + mono_thread_interruption_checkpoint (); + + /*RULE NUMBER TWO OF SKIP_THREAD: READ RULE NUMBER ONE.*/ + mono_gc_set_skip_thread (TRUE); +} + +static void +fire_profiler_thread_end (void) +{ + MonoInternalThread *thread = mono_thread_internal_current (); + mono_profiler_thread_end (thread->tid); +} + static void async_invoke_thread (gpointer data) { MonoDomain *domain; - MonoInternalThread *thread; MonoWSQ *wsq; ThreadPool *tp; gboolean must_die; - const gchar *name; tp = data; wsq = NULL; if (!tp->is_io) wsq = add_wsq (); - thread = mono_thread_internal_current (); - - mono_profiler_thread_start (thread->tid); - name = (tp->is_io) ? "IO Threadpool worker" : "Threadpool worker"; - mono_thread_set_name_internal (thread, mono_string_new (mono_domain_get (), name), FALSE); + set_tp_thread_info (tp); if (tp_start_func) tp_start_func (tp_hooks_user_data); @@ -1478,10 +1534,7 @@ async_invoke_thread (gpointer data) } mono_thread_pop_appdomain_ref (); InterlockedDecrement (&tp->busy_threads); - /* If the callee changes the background status, set it back to TRUE */ - mono_thread_clr_state (thread , ~ThreadState_Background); - if (!mono_thread_test_state (thread , ThreadState_Background)) - ves_icall_System_Threading_Thread_SetState (thread, ThreadState_Background); + clear_thread_state (); } } @@ -1514,8 +1567,7 @@ async_invoke_thread (gpointer data) #endif if (mono_runtime_is_shutting_down ()) break; - if (THREAD_WANTS_A_BREAK (thread)) - mono_thread_interruption_checkpoint (); + check_for_interruption_critical (); } InterlockedDecrement (&tp->waiting); @@ -1545,12 +1597,14 @@ async_invoke_thread (gpointer data) if (!down && nt <= tp->min_threads) break; if (down || InterlockedCompareExchange (&tp->nthreads, nt - 1, nt) == nt) { +#ifndef DISABLE_PERFCOUNTERS mono_perfcounter_update_value (tp->pc_nthreads, TRUE, -1); +#endif if (!tp->is_io) { remove_wsq (wsq); } - mono_profiler_thread_end (thread->tid); + fire_profiler_thread_end (); if (tp_finish_func) tp_finish_func (tp_hooks_user_data); @@ -1601,9 +1655,9 @@ ves_icall_System_Threading_ThreadPool_SetMinThreads (gint workerThreads, gint co InterlockedExchange (&async_tp.min_threads, workerThreads); InterlockedExchange (&async_io_tp.min_threads, completionPortThreads); if (workerThreads > async_tp.nthreads) - mono_thread_create_internal (mono_get_root_domain (), threadpool_start_idle_threads, &async_tp, TRUE, SMALL_STACK); + mono_thread_create_internal (mono_get_root_domain (), threadpool_start_idle_threads, &async_tp, TRUE, FALSE, SMALL_STACK); if (completionPortThreads > async_io_tp.nthreads) - mono_thread_create_internal (mono_get_root_domain (), threadpool_start_idle_threads, &async_io_tp, TRUE, SMALL_STACK); + mono_thread_create_internal (mono_get_root_domain (), threadpool_start_idle_threads, &async_io_tp, TRUE, FALSE, SMALL_STACK); return TRUE; } @@ -1682,3 +1736,21 @@ mono_internal_thread_unhandled_exception (MonoObject* exc) mono_thread_internal_reset_abort (mono_thread_internal_current ()); } } + +/* + * Suspend creation of new threads. + */ +void +mono_thread_pool_suspend (void) +{ + suspended = TRUE; +} + +/* + * Resume creation of new threads. + */ +void +mono_thread_pool_resume (void) +{ + suspended = FALSE; +}