Merge pull request #3826 from monojenkins/update-csprojs
[mono.git] / mono / metadata / threadpool-ms.c
index 4cda0e814d52d17bb32ba12f4bc0abc5816b1a85..d22470415e84e1891759e5a31df748eacde76531 100644 (file)
@@ -33,6 +33,7 @@
 #include <mono/metadata/object-internals.h>
 #include <mono/metadata/threadpool-ms.h>
 #include <mono/metadata/threadpool-ms-io.h>
+#include <mono/metadata/w32event.h>
 #include <mono/utils/atomic.h>
 #include <mono/utils/mono-compiler.h>
 #include <mono/utils/mono-complex.h>
@@ -159,6 +160,11 @@ typedef struct {
        gboolean suspended;
 } ThreadPool;
 
+typedef struct {
+       gint32 ref;
+       MonoCoopCond cond;
+} ThreadPoolDomainCleanupSemaphore;
+
 typedef enum {
        TRANSITION_WARMUP,
        TRANSITION_INITIALIZING,
@@ -430,6 +436,14 @@ domain_get (MonoDomain *domain, gboolean create)
        }
 
        if (create) {
+               ThreadPoolDomainCleanupSemaphore *cleanup_semaphore;
+               cleanup_semaphore = g_new0 (ThreadPoolDomainCleanupSemaphore, 1);
+               cleanup_semaphore->ref = 2;
+               mono_coop_cond_init (&cleanup_semaphore->cond);
+
+               g_assert(!domain->cleanup_semaphore);
+               domain->cleanup_semaphore = cleanup_semaphore;
+
                tpdomain = g_new0 (ThreadPoolDomain, 1);
                tpdomain->domain = domain;
                domain_add (tpdomain);
@@ -476,7 +490,7 @@ domain_get_next (ThreadPoolDomain *current)
                                        break;
                                }
                        }
-                       g_assert (current_idx >= 0);
+                       g_assert (current_idx != (guint)-1);
                }
                for (i = current_idx + 1; i < len + current_idx + 1; ++i) {
                        ThreadPoolDomain *tmp = (ThreadPoolDomain *)g_ptr_array_index (threadpool->domains, i % len);
@@ -635,13 +649,16 @@ worker_thread (gpointer data)
                        if (retire)
                                retire = FALSE;
 
+                       /* The tpdomain->domain might have unloaded, while this thread was parked */
+                       previous_tpdomain = NULL;
+
                        continue;
                }
 
                tpdomain->outstanding_request --;
                g_assert (tpdomain->outstanding_request >= 0);
 
-               mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] worker running in domain %p",
+               mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] worker running in domain %p (outstanding requests %d) ",
                        mono_native_thread_id_get (), tpdomain->domain, tpdomain->outstanding_request);
 
                g_assert (tpdomain->domain);
@@ -678,10 +695,23 @@ worker_thread (gpointer data)
                g_assert (tpdomain->domain->threadpool_jobs >= 0);
 
                if (tpdomain->domain->threadpool_jobs == 0 && mono_domain_is_unloading (tpdomain->domain)) {
-                       gboolean removed = domain_remove (tpdomain);
+                       ThreadPoolDomainCleanupSemaphore *cleanup_semaphore;
+                       gboolean removed;
+
+                       removed = domain_remove(tpdomain);
                        g_assert (removed);
-                       if (tpdomain->domain->cleanup_semaphore)
-                               ReleaseSemaphore (tpdomain->domain->cleanup_semaphore, 1, NULL);
+
+                       cleanup_semaphore = (ThreadPoolDomainCleanupSemaphore*) tpdomain->domain->cleanup_semaphore;
+                       g_assert (cleanup_semaphore);
+
+                       mono_coop_cond_signal (&cleanup_semaphore->cond);
+
+                       if (InterlockedDecrement (&cleanup_semaphore->ref) == 0) {
+                               mono_coop_cond_destroy (&cleanup_semaphore->cond);
+                               g_free (cleanup_semaphore);
+                               tpdomain->domain->cleanup_semaphore = NULL;
+                       }
+
                        domain_free (tpdomain);
                        tpdomain = NULL;
                }
@@ -708,13 +738,15 @@ worker_try_create (void)
 {
        ThreadPoolCounter counter;
        MonoInternalThread *thread;
+       gint64 current_ticks;
        gint32 now;
 
        mono_coop_mutex_lock (&threadpool->worker_creation_lock);
 
        mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try create worker", mono_native_thread_id_get ());
-
-       if ((now = mono_100ns_ticks () / 10 / 1000 / 1000) == 0) {
+       current_ticks = mono_100ns_ticks ();
+       now = current_ticks / (10 * 1000 * 1000);
+       if (0 == current_ticks) {
                g_warning ("failed to get 100ns ticks");
        } else {
                if (threadpool->worker_creation_current_second != now) {
@@ -746,7 +778,7 @@ worker_try_create (void)
        if ((thread = mono_thread_create_internal (mono_get_root_domain (), worker_thread, NULL, TRUE, 0, &error)) != NULL) {
                threadpool->worker_creation_current_count += 1;
 
-               mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try create worker, created %p, now = %d count = %d", mono_native_thread_id_get (), thread->tid, now, threadpool->worker_creation_current_count);
+               mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try create worker, created %p, now = %d count = %d", mono_native_thread_id_get (), GUINT_TO_POINTER(thread->tid), now, threadpool->worker_creation_current_count);
                mono_coop_mutex_unlock (&threadpool->worker_creation_lock);
                return TRUE;
        }
@@ -1366,11 +1398,11 @@ mono_threadpool_ms_begin_invoke (MonoDomain *domain, MonoObject *target, MonoMet
 }
 
 MonoObject *
-mono_threadpool_ms_end_invoke (MonoAsyncResult *ares, MonoArray **out_args, MonoObject **exc)
+mono_threadpool_ms_end_invoke (MonoAsyncResult *ares, MonoArray **out_args, MonoObject **exc, MonoError *error)
 {
-       MonoError error;
        MonoAsyncCall *ac;
 
+       mono_error_init (error);
        g_assert (exc);
        g_assert (out_args);
 
@@ -1381,7 +1413,7 @@ mono_threadpool_ms_end_invoke (MonoAsyncResult *ares, MonoArray **out_args, Mono
        mono_monitor_enter ((MonoObject*) ares);
 
        if (ares->endinvoke_called) {
-               *exc = (MonoObject*) mono_get_exception_invalid_operation (NULL);
+               mono_error_set_invalid_operation(error, "Delegate EndInvoke method called more than once");
                mono_monitor_exit ((MonoObject*) ares);
                return NULL;
        }
@@ -1396,10 +1428,13 @@ mono_threadpool_ms_end_invoke (MonoAsyncResult *ares, MonoArray **out_args, Mono
                if (ares->handle) {
                        wait_event = mono_wait_handle_get_handle ((MonoWaitHandle*) ares->handle);
                } else {
-                       wait_event = CreateEvent (NULL, TRUE, FALSE, NULL);
+                       wait_event = mono_w32event_create (TRUE, FALSE);
                        g_assert(wait_event);
-                       MonoWaitHandle *wait_handle = mono_wait_handle_new (mono_object_domain (ares), wait_event, &error);
-                       mono_error_raise_exception (&error); /* FIXME don't raise here */
+                       MonoWaitHandle *wait_handle = mono_wait_handle_new (mono_object_domain (ares), wait_event, error);
+                       if (!is_ok (error)) {
+                               CloseHandle (wait_event);
+                               return NULL;
+                       }
                        MONO_OBJECT_SETREF (ares, handle, (MonoObject*) wait_handle);
                }
                mono_monitor_exit ((MonoObject*) ares);
@@ -1419,9 +1454,10 @@ mono_threadpool_ms_end_invoke (MonoAsyncResult *ares, MonoArray **out_args, Mono
 gboolean
 mono_threadpool_ms_remove_domain_jobs (MonoDomain *domain, int timeout)
 {
-       gboolean res = TRUE;
        gint64 end;
-       gpointer sem;
+       ThreadPoolDomain *tpdomain;
+       ThreadPoolDomainCleanupSemaphore *cleanup_semaphore;
+       gboolean ret;
 
        g_assert (domain);
        g_assert (timeout >= -1);
@@ -1440,38 +1476,59 @@ mono_threadpool_ms_remove_domain_jobs (MonoDomain *domain, int timeout)
 #endif
 
        /*
-        * There might be some threads out that could be about to execute stuff from the given domain.
-        * We avoid that by setting up a semaphore to be pulsed by the thread that reaches zero.
-        */
-       sem = domain->cleanup_semaphore = CreateSemaphore (NULL, 0, 1, NULL);
+       * There might be some threads out that could be about to execute stuff from the given domain.
+       * We avoid that by waiting on a semaphore to be pulsed by the thread that reaches zero.
+       * The semaphore is only created for domains which queued threadpool jobs.
+       * We always wait on the semaphore rather than ensuring domain->threadpool_jobs is 0.
+       * There may be pending outstanding requests which will create new jobs.
+       * The semaphore is signaled the threadpool domain has been removed from list
+       * and we know no more jobs for the domain will be processed.
+       */
+
+       mono_lazy_initialize(&status, initialize);
+       mono_coop_mutex_lock(&threadpool->domains_lock);
+
+       tpdomain = domain_get (domain, FALSE);
+       if (!tpdomain || tpdomain->outstanding_request == 0) {
+               mono_coop_mutex_unlock(&threadpool->domains_lock);
+               return TRUE;
+       }
 
-       /*
-        * The memory barrier here is required to have global ordering between assigning to cleanup_semaphone
-        * and reading threadpool_jobs. Otherwise this thread could read a stale version of threadpool_jobs
-        * and wait forever.
-        */
-       mono_memory_write_barrier ();
+       g_assert (domain->cleanup_semaphore);
+       cleanup_semaphore = (ThreadPoolDomainCleanupSemaphore*) domain->cleanup_semaphore;
+
+       ret = TRUE;
 
-       while (domain->threadpool_jobs) {
-               gint64 now;
+       do {
+               if (timeout == -1) {
+                       mono_coop_cond_wait (&cleanup_semaphore->cond, &threadpool->domains_lock);
+               } else {
+                       gint64 now;
+                       gint res;
 
-               if (timeout != -1) {
-                       now = mono_msec_ticks ();
+                       now = mono_msec_ticks();
                        if (now > end) {
-                               res = FALSE;
+                               ret = FALSE;
+                               break;
+                       }
+
+                       res = mono_coop_cond_timedwait (&cleanup_semaphore->cond, &threadpool->domains_lock, end - now);
+                       if (res != 0) {
+                               ret = FALSE;
                                break;
                        }
                }
+       } while (tpdomain->outstanding_request != 0);
 
-               MONO_ENTER_GC_SAFE;
-               WaitForSingleObject (sem, timeout != -1 ? end - now : timeout);
-               MONO_EXIT_GC_SAFE;
+       if (InterlockedDecrement (&cleanup_semaphore->ref) == 0) {
+               mono_coop_cond_destroy (&cleanup_semaphore->cond);
+               g_free (cleanup_semaphore);
+               domain->cleanup_semaphore = NULL;
        }
 
-       domain->cleanup_semaphore = NULL;
-       CloseHandle (sem);
+       mono_coop_mutex_unlock(&threadpool->domains_lock);
 
-       return res;
+       return ret;
 }
 
 void