X-Git-Url: http://wien.tomnetworks.com/gitweb/?a=blobdiff_plain;f=mono%2Fmetadata%2Fthreadpool-ms.c;h=7849d4554f5646220aa302bcc00ae013dfcd837d;hb=76b32f3ee38e7b29cdbaa657a4826b65579f4e93;hp=4cda0e814d52d17bb32ba12f4bc0abc5816b1a85;hpb=56ad07531d611d0957134c6d60885f1f8b62845f;p=mono.git diff --git a/mono/metadata/threadpool-ms.c b/mono/metadata/threadpool-ms.c index 4cda0e814d5..7849d4554f5 100644 --- a/mono/metadata/threadpool-ms.c +++ b/mono/metadata/threadpool-ms.c @@ -33,6 +33,7 @@ #include #include #include +#include #include #include #include @@ -159,6 +160,11 @@ typedef struct { gboolean suspended; } ThreadPool; +typedef struct { + gint32 ref; + MonoCoopCond cond; +} ThreadPoolDomainCleanupSemaphore; + typedef enum { TRANSITION_WARMUP, TRANSITION_INITIALIZING, @@ -430,6 +436,14 @@ domain_get (MonoDomain *domain, gboolean create) } if (create) { + ThreadPoolDomainCleanupSemaphore *cleanup_semaphore; + cleanup_semaphore = g_new0 (ThreadPoolDomainCleanupSemaphore, 1); + cleanup_semaphore->ref = 2; + mono_coop_cond_init (&cleanup_semaphore->cond); + + g_assert(!domain->cleanup_semaphore); + domain->cleanup_semaphore = cleanup_semaphore; + tpdomain = g_new0 (ThreadPoolDomain, 1); tpdomain->domain = domain; domain_add (tpdomain); @@ -476,7 +490,7 @@ domain_get_next (ThreadPoolDomain *current) break; } } - g_assert (current_idx >= 0); + g_assert (current_idx != (guint)-1); } for (i = current_idx + 1; i < len + current_idx + 1; ++i) { ThreadPoolDomain *tmp = (ThreadPoolDomain *)g_ptr_array_index (threadpool->domains, i % len); @@ -635,6 +649,9 @@ worker_thread (gpointer data) if (retire) retire = FALSE; + /* The tpdomain->domain might have unloaded, while this thread was parked */ + previous_tpdomain = NULL; + continue; } @@ -678,10 +695,23 @@ worker_thread (gpointer data) g_assert (tpdomain->domain->threadpool_jobs >= 0); if (tpdomain->domain->threadpool_jobs == 0 && mono_domain_is_unloading (tpdomain->domain)) { - gboolean removed = domain_remove (tpdomain); + ThreadPoolDomainCleanupSemaphore *cleanup_semaphore; + gboolean removed; + + removed = domain_remove(tpdomain); g_assert (removed); - if (tpdomain->domain->cleanup_semaphore) - ReleaseSemaphore (tpdomain->domain->cleanup_semaphore, 1, NULL); + + cleanup_semaphore = (ThreadPoolDomainCleanupSemaphore*) tpdomain->domain->cleanup_semaphore; + g_assert (cleanup_semaphore); + + mono_coop_cond_signal (&cleanup_semaphore->cond); + + if (InterlockedDecrement (&cleanup_semaphore->ref) == 0) { + mono_coop_cond_destroy (&cleanup_semaphore->cond); + g_free (cleanup_semaphore); + tpdomain->domain->cleanup_semaphore = NULL; + } + domain_free (tpdomain); tpdomain = NULL; } @@ -708,13 +738,15 @@ worker_try_create (void) { ThreadPoolCounter counter; MonoInternalThread *thread; + gint64 current_ticks; gint32 now; mono_coop_mutex_lock (&threadpool->worker_creation_lock); mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try create worker", mono_native_thread_id_get ()); - - if ((now = mono_100ns_ticks () / 10 / 1000 / 1000) == 0) { + current_ticks = mono_100ns_ticks (); + now = current_ticks / (10 * 1000 * 1000); + if (0 == current_ticks) { g_warning ("failed to get 100ns ticks"); } else { if (threadpool->worker_creation_current_second != now) { @@ -1366,11 +1398,11 @@ mono_threadpool_ms_begin_invoke (MonoDomain *domain, MonoObject *target, MonoMet } MonoObject * -mono_threadpool_ms_end_invoke (MonoAsyncResult *ares, MonoArray **out_args, MonoObject **exc) +mono_threadpool_ms_end_invoke (MonoAsyncResult *ares, MonoArray **out_args, MonoObject **exc, MonoError *error) { - MonoError error; MonoAsyncCall *ac; + mono_error_init (error); g_assert (exc); g_assert (out_args); @@ -1381,7 +1413,7 @@ mono_threadpool_ms_end_invoke (MonoAsyncResult *ares, MonoArray **out_args, Mono mono_monitor_enter ((MonoObject*) ares); if (ares->endinvoke_called) { - *exc = (MonoObject*) mono_get_exception_invalid_operation (NULL); + mono_error_set_invalid_operation(error, "Delegate EndInvoke method called more than once"); mono_monitor_exit ((MonoObject*) ares); return NULL; } @@ -1396,10 +1428,13 @@ mono_threadpool_ms_end_invoke (MonoAsyncResult *ares, MonoArray **out_args, Mono if (ares->handle) { wait_event = mono_wait_handle_get_handle ((MonoWaitHandle*) ares->handle); } else { - wait_event = CreateEvent (NULL, TRUE, FALSE, NULL); + wait_event = mono_w32event_create (TRUE, FALSE); g_assert(wait_event); - MonoWaitHandle *wait_handle = mono_wait_handle_new (mono_object_domain (ares), wait_event, &error); - mono_error_raise_exception (&error); /* FIXME don't raise here */ + MonoWaitHandle *wait_handle = mono_wait_handle_new (mono_object_domain (ares), wait_event, error); + if (!is_ok (error)) { + CloseHandle (wait_event); + return NULL; + } MONO_OBJECT_SETREF (ares, handle, (MonoObject*) wait_handle); } mono_monitor_exit ((MonoObject*) ares); @@ -1419,9 +1454,10 @@ mono_threadpool_ms_end_invoke (MonoAsyncResult *ares, MonoArray **out_args, Mono gboolean mono_threadpool_ms_remove_domain_jobs (MonoDomain *domain, int timeout) { - gboolean res = TRUE; gint64 end; - gpointer sem; + ThreadPoolDomain *tpdomain; + ThreadPoolDomainCleanupSemaphore *cleanup_semaphore; + gboolean ret; g_assert (domain); g_assert (timeout >= -1); @@ -1440,38 +1476,59 @@ mono_threadpool_ms_remove_domain_jobs (MonoDomain *domain, int timeout) #endif /* - * There might be some threads out that could be about to execute stuff from the given domain. - * We avoid that by setting up a semaphore to be pulsed by the thread that reaches zero. - */ - sem = domain->cleanup_semaphore = CreateSemaphore (NULL, 0, 1, NULL); + * There might be some threads out that could be about to execute stuff from the given domain. + * We avoid that by waiting on a semaphore to be pulsed by the thread that reaches zero. + * The semaphore is only created for domains which queued threadpool jobs. + * We always wait on the semaphore rather than ensuring domain->threadpool_jobs is 0. + * There may be pending outstanding requests which will create new jobs. + * The semaphore is signaled the threadpool domain has been removed from list + * and we know no more jobs for the domain will be processed. + */ + + mono_lazy_initialize(&status, initialize); + mono_coop_mutex_lock(&threadpool->domains_lock); + + tpdomain = domain_get (domain, FALSE); + if (!tpdomain || tpdomain->outstanding_request == 0) { + mono_coop_mutex_unlock(&threadpool->domains_lock); + return TRUE; + } - /* - * The memory barrier here is required to have global ordering between assigning to cleanup_semaphone - * and reading threadpool_jobs. Otherwise this thread could read a stale version of threadpool_jobs - * and wait forever. - */ - mono_memory_write_barrier (); + g_assert (domain->cleanup_semaphore); + cleanup_semaphore = (ThreadPoolDomainCleanupSemaphore*) domain->cleanup_semaphore; + + ret = TRUE; - while (domain->threadpool_jobs) { - gint64 now; + do { + if (timeout == -1) { + mono_coop_cond_wait (&cleanup_semaphore->cond, &threadpool->domains_lock); + } else { + gint64 now; + gint res; - if (timeout != -1) { - now = mono_msec_ticks (); + now = mono_msec_ticks(); if (now > end) { - res = FALSE; + ret = FALSE; + break; + } + + res = mono_coop_cond_timedwait (&cleanup_semaphore->cond, &threadpool->domains_lock, end - now); + if (res != 0) { + ret = FALSE; break; } } + } while (tpdomain->outstanding_request != 0); - MONO_ENTER_GC_SAFE; - WaitForSingleObject (sem, timeout != -1 ? end - now : timeout); - MONO_EXIT_GC_SAFE; + if (InterlockedDecrement (&cleanup_semaphore->ref) == 0) { + mono_coop_cond_destroy (&cleanup_semaphore->cond); + g_free (cleanup_semaphore); + domain->cleanup_semaphore = NULL; } - domain->cleanup_semaphore = NULL; - CloseHandle (sem); + mono_coop_mutex_unlock(&threadpool->domains_lock); - return res; + return ret; } void