2008-11-10 Rodrigo Kumpera <rkumpera@novell.com>
authorRodrigo Kumpera <kumpera@gmail.com>
Mon, 10 Nov 2008 20:58:13 +0000 (20:58 -0000)
committerRodrigo Kumpera <kumpera@gmail.com>
Mon, 10 Nov 2008 20:58:13 +0000 (20:58 -0000)
* appdomain.c (unload_thread_main): Clean up threadpool by
calling mono_thread_pool_remove_domain_jobs.

* domain-internals.h (struct _MonoDomain): Add new fields to
help coordinate the cleanup of the threadpool.

* threadpool.c (mono_thread_pool_remove_domain_jobs): New fuction
that cleans up the threadpool of all jobs associated with an appdomain.
It does that by cleaning up the queues and making sure all active
threads are accounted.

* threadpool.c (async_invoke_io_thread): Ignore job if its domain is
unloaded or in the process of. Take this is such way that there is
no race condition between another thread starting the unload and the
current thread acknowledging it.

* threadpool.c (async_invoke_thread): Same.

* threadpool.c (start_io_thread_or_queue): Increment threadpool_jobs before
firing the new thread.

* threadpool.c (start_tpthread): Same.

* theadpool.c (append_job): Increment threadpool_jobs before queueing.

* threadpool.h: Add mono_thread_pool_remove_domain_jobs.

svn path=/trunk/mono/; revision=118413

mono/metadata/ChangeLog
mono/metadata/appdomain.c
mono/metadata/domain-internals.h
mono/metadata/threadpool.c
mono/metadata/threadpool.h

index b5fab23409bb9f13b8bce8e438ea4a57ba043d8a..59078aa358dae364f4628491a6302587f2e22216 100644 (file)
@@ -1,3 +1,32 @@
+2008-11-10  Rodrigo Kumpera  <rkumpera@novell.com>
+
+       * appdomain.c (unload_thread_main): Clean up threadpool by
+       calling mono_thread_pool_remove_domain_jobs.
+
+       * domain-internals.h (struct _MonoDomain): Add new fields to
+       help coordinate the cleanup of the threadpool.
+
+       * threadpool.c (mono_thread_pool_remove_domain_jobs): New fuction
+       that cleans up the threadpool of all jobs associated with an appdomain.
+       It does that by cleaning up the queues and making sure all active
+       threads are accounted.
+
+       * threadpool.c (async_invoke_io_thread): Ignore job if its domain is
+       unloaded or in the process of. Take this is such way that there is
+       no race condition between another thread starting the unload and the
+       current thread acknowledging it.
+
+       * threadpool.c (async_invoke_thread): Same.
+
+       * threadpool.c (start_io_thread_or_queue): Increment threadpool_jobs before
+       firing the new thread.
+
+       * threadpool.c (start_tpthread): Same.
+
+       * theadpool.c (append_job): Increment threadpool_jobs before queueing.
+
+       * threadpool.h: Add mono_thread_pool_remove_domain_jobs.
+
 2008-11-06  Jonathan Chambers  <joncham@gmail.com>
 
        * file-io.c (ves_icall_System_IO_MonoIO_DuplicateHandle): 
index d229ca018d4d6ab8601df0da15ca8423da5c1353..24362b5ab3ab1f44e458bf962fd7308ac6d803c9 100644 (file)
@@ -1858,6 +1858,7 @@ typedef struct unload_data {
        char *failure_reason;
 } unload_data;
 
+
 static guint32 WINAPI
 unload_thread_main (void *arg)
 {
@@ -1876,6 +1877,11 @@ unload_thread_main (void *arg)
                return 1;
        }
 
+       if (!mono_thread_pool_remove_domain_jobs (domain, -1)) {
+               data->failure_reason = g_strdup_printf ("Cleanup of threadpool jobs of domain %s timed out.", domain->friendly_name);
+               return 1;
+       }
+
        /* Finalize all finalizable objects in the doomed appdomain */
        if (!mono_domain_finalize (domain, -1)) {
                data->failure_reason = g_strdup_printf ("Finalization of domain %s timed out.", domain->friendly_name);
index ceb1c08b14ffb37a9d521d4f13cf1256d2549622..1594e0ecf1ec926447d11cc6be77da9771c45ec0 100644 (file)
@@ -215,6 +215,10 @@ struct _MonoDomain {
 
        /* Information maintained by the JIT engine */
        gpointer runtime_info;
+
+       /*thread pool jobs, used to coordinate shutdown.*/
+       int                                     threadpool_jobs;
+       HANDLE                          cleanup_semaphore;
 };
 
 typedef struct  {
index 727f47ffb2cb3008faaa3b445e5d42e6de1bdc2b..4df375eb5c18ef12b655381cd250f6cc377bcf05 100644 (file)
@@ -231,6 +231,34 @@ get_events_from_list (MonoMList *list)
                                (SOCKET)(gssize)x->handle, x->buffer, x->offset, x->size,\
                                 x->socket_flags, &x->error);
 
+
+static void
+unregister_job (MonoAsyncResult *obj)
+{
+       EnterCriticalSection (&ares_lock);
+       mono_g_hash_table_remove (ares_htable, obj);
+       LeaveCriticalSection (&ares_lock);      
+}
+
+static void
+threadpool_jobs_inc (MonoObject *obj)
+{
+       if (obj)
+               InterlockedIncrement (&obj->vtable->domain->threadpool_jobs);
+}
+
+static gboolean
+threadpool_jobs_dec (MonoObject *obj)
+{
+       MonoDomain *domain = obj->vtable->domain;
+       int remaining_jobs = InterlockedDecrement (&domain->threadpool_jobs);
+       if (remaining_jobs == 0 && domain->cleanup_semaphore) {
+               ReleaseSemaphore (domain->cleanup_semaphore, 1, NULL);
+               return TRUE;
+       }
+       return FALSE;
+}
+
 static void
 async_invoke_io_thread (gpointer data)
 {
@@ -261,23 +289,38 @@ async_invoke_io_thread (gpointer data)
                        /* worker threads invokes methods in different domains,
                         * so we need to set the right domain here */
                        domain = ((MonoObject *)ar)->vtable->domain;
-                       mono_thread_push_appdomain_ref (domain);
-                       if (mono_domain_set (domain, FALSE)) {
-                               ASyncCall *ac;
-
-                               mono_async_invoke (ar);
-                               ac = (ASyncCall *) ar->object_data;
-                               /*
-                               if (ac->msg->exc != NULL)
-                                       mono_unhandled_exception (ac->msg->exc);
-                               */
-                               mono_domain_set (mono_get_root_domain (), TRUE);
+
+                       g_assert (domain);
+
+                       if (domain->state == MONO_APPDOMAIN_UNLOADED || domain->state == MONO_APPDOMAIN_UNLOADING) {
+                               threadpool_jobs_dec ((MonoObject *)ar);
+                               unregister_job (ar);
+                               data = NULL;
+                       } else {
+                               mono_thread_push_appdomain_ref (domain);
+                               if (threadpool_jobs_dec ((MonoObject *)ar)) {
+                                       unregister_job (ar);
+                                       data = NULL;
+                                       mono_thread_pop_appdomain_ref ();
+                                       continue;
+                               }
+                               if (mono_domain_set (domain, FALSE)) {
+                                       ASyncCall *ac;
+
+                                       mono_async_invoke (ar);
+                                       ac = (ASyncCall *) ar->object_data;
+                                       /*
+                                       if (ac->msg->exc != NULL)
+                                               mono_unhandled_exception (ac->msg->exc);
+                                       */
+                                       mono_domain_set (mono_get_root_domain (), TRUE);
+                               }
+                               mono_thread_pop_appdomain_ref ();
+                               InterlockedDecrement (&busy_io_worker_threads);
+                               /* If the callee changes the background status, set it back to TRUE */
+                               if (*version != '1' && !mono_thread_test_state (thread , ThreadState_Background))
+                                       ves_icall_System_Threading_Thread_SetState (thread, ThreadState_Background);
                        }
-                       mono_thread_pop_appdomain_ref ();
-                       InterlockedDecrement (&busy_io_worker_threads);
-                       /* If the callee changes the background status, set it back to TRUE */
-                       if (*version != '1' && !mono_thread_test_state (thread , ThreadState_Background))
-                               ves_icall_System_Threading_Thread_SetState (thread, ThreadState_Background);
                }
 
                data = dequeue_job (&io_queue_lock, &async_io_queue);
@@ -328,6 +371,7 @@ start_io_thread_or_queue (MonoSocketAsyncResult *ares)
            worker < mono_io_max_worker_threads) {
                InterlockedIncrement (&busy_io_worker_threads);
                InterlockedIncrement (&io_worker_threads);
+               threadpool_jobs_inc ((MonoObject *)ares);
                mono_thread_create_internal (mono_get_root_domain (), async_invoke_io_thread, ares, TRUE);
        } else {
                append_job (&io_queue_lock, &async_io_queue, (MonoObject*)ares);
@@ -989,14 +1033,18 @@ start_idle_threads (MonoAsyncResult *data)
                existing = (int) InterlockedCompareExchange (&mono_worker_threads, 0, -1); 
                if ((needed - existing) > 0) {
                        start_tpthread (data);
+                       if (data) 
+                               threadpool_jobs_dec ((MonoObject*)data);
                        data = NULL;
                        Sleep (500);
                }
        } while ((needed - existing) > 0);
 
        /* If we don't start any thread here, make sure 'data' is processed. */
-       if (data != NULL)
+       if (data != NULL) {
                start_thread_or_queue (data);
+               threadpool_jobs_dec ((MonoObject*)data);
+       }
 }
 
 static void
@@ -1004,6 +1052,7 @@ start_tpthread (MonoAsyncResult *data)
 {
        InterlockedIncrement (&mono_worker_threads);
        InterlockedIncrement (&busy_worker_threads);
+       threadpool_jobs_inc ((MonoObject *)data);
        mono_thread_create_internal (mono_get_root_domain (), async_invoke_thread, data, TRUE);
 }
 
@@ -1076,6 +1125,7 @@ start_thread_or_queue (MonoAsyncResult *ares)
        int busy, worker;
 
        if ((int) InterlockedCompareExchange (&tp_idle_started, 1, 0) == 0) {
+               threadpool_jobs_inc ((MonoObject*)ares);
                mono_thread_create_internal (mono_get_root_domain (), start_idle_threads, ares, TRUE);
                return;
        }
@@ -1151,6 +1201,8 @@ mono_thread_pool_cleanup (void)
 static void
 append_job (CRITICAL_SECTION *cs, TPQueue *list, MonoObject *ar)
 {
+       threadpool_jobs_inc (ar); 
+
        EnterCriticalSection (cs);
        if (list->array && (list->next_elem < mono_array_length (list->array))) {
                mono_array_setref (list->array, list->next_elem, ar);
@@ -1179,6 +1231,82 @@ append_job (CRITICAL_SECTION *cs, TPQueue *list, MonoObject *ar)
        LeaveCriticalSection (cs);
 }
 
+
+static void
+clear_queue (CRITICAL_SECTION *cs, TPQueue *list, MonoDomain *domain)
+{
+       int i, count = 0;
+       EnterCriticalSection (cs);
+       /*remove*/
+       for (i = list->first_elem; i < list->next_elem; ++i) {
+               MonoObject *obj = mono_array_get (list->array, MonoObject*, i);
+               if (obj->vtable->domain == domain) {
+                       unregister_job ((MonoAsyncResult*)obj);
+
+                       mono_array_set (list->array, MonoObject*, i, NULL);
+                       InterlockedDecrement (&domain->threadpool_jobs);
+                       ++count;
+               }
+       }
+       /*compact*/
+       if (count) {
+               int idx = 0;
+               for (i = list->first_elem; i < list->next_elem; ++i) {
+                       MonoObject *obj = mono_array_get (list->array, MonoObject*, i);
+                       if (obj)
+                               mono_array_set (list->array, MonoObject*, idx++, obj);
+               }
+               list->first_elem = 0;
+               list->next_elem = count;
+       }
+       LeaveCriticalSection (cs);
+}
+
+/*
+ * Clean up the threadpool of all domain jobs.
+ * Can only be called as part of the domain unloading process as
+ * it will wait for all jobs to be visible to the interruption code. 
+ */
+gboolean
+mono_thread_pool_remove_domain_jobs (MonoDomain *domain, int timeout)
+{
+       HANDLE sem_handle;
+       int result = TRUE;
+       guint32 start_time = 0;
+
+       clear_queue (&mono_delegate_section, &async_call_queue, domain);
+       clear_queue (&io_queue_lock, &async_io_queue, domain);
+
+       /*
+        * There might be some threads out that could be about to execute stuff from the given domain.
+        * We avoid that by setting up a semaphore to be pulsed by the thread that reaches zero.
+        */
+       sem_handle = CreateSemaphore (NULL, 0, 1, NULL);
+       
+       domain->cleanup_semaphore = sem_handle;
+       /*
+        * The memory barrier here is required to have global ordering between assigning to cleanup_semaphone
+        * and reading threadpool_jobs.
+        * Otherwise this thread could read a stale version of threadpool_jobs and wait forever.
+        */
+       mono_memory_write_barrier ();
+
+       if (domain->threadpool_jobs && timeout != -1)
+               start_time = mono_msec_ticks ();
+       while (domain->threadpool_jobs) {
+               WaitForSingleObject (sem_handle, timeout);
+               if (timeout != -1 && (mono_msec_ticks () - start_time) > timeout) {
+                       result = FALSE;
+                       break;
+               }
+       }
+
+       domain->cleanup_semaphore = NULL;
+       CloseHandle (sem_handle);
+       return result;
+}
+
+
 static MonoObject*
 dequeue_job (CRITICAL_SECTION *cs, TPQueue *list)
 {
@@ -1231,23 +1359,39 @@ async_invoke_thread (gpointer data)
                        /* worker threads invokes methods in different domains,
                         * so we need to set the right domain here */
                        domain = ((MonoObject *)ar)->vtable->domain;
-                       mono_thread_push_appdomain_ref (domain);
-                       if (mono_domain_set (domain, FALSE)) {
-                               ASyncCall *ac;
-
-                               mono_async_invoke (ar);
-                               ac = (ASyncCall *) ar->object_data;
-                               /*
-                               if (ac->msg->exc != NULL)
-                                       mono_unhandled_exception (ac->msg->exc);
-                               */
-                               mono_domain_set (mono_get_root_domain (), TRUE);
+
+                       g_assert (domain);
+
+                       if (domain->state == MONO_APPDOMAIN_UNLOADED || domain->state == MONO_APPDOMAIN_UNLOADING) {
+                               threadpool_jobs_dec ((MonoObject *)ar);
+                               unregister_job (ar);
+                               data = NULL;
+                       } else {
+                               mono_thread_push_appdomain_ref (domain);
+                               if (threadpool_jobs_dec ((MonoObject *)ar)) {
+                                       unregister_job (ar);
+                                       data = NULL;
+                                       mono_thread_pop_appdomain_ref ();
+                                       continue;
+                               }
+
+                               if (mono_domain_set (domain, FALSE)) {
+                                       ASyncCall *ac;
+
+                                       mono_async_invoke (ar);
+                                       ac = (ASyncCall *) ar->object_data;
+                                       /*
+                                       if (ac->msg->exc != NULL)
+                                               mono_unhandled_exception (ac->msg->exc);
+                                       */
+                                       mono_domain_set (mono_get_root_domain (), TRUE);
+                               }
+                               mono_thread_pop_appdomain_ref ();
+                               InterlockedDecrement (&busy_worker_threads);
+                               /* If the callee changes the background status, set it back to TRUE */
+                               if (*version != '1' && !mono_thread_test_state (thread , ThreadState_Background))
+                                       ves_icall_System_Threading_Thread_SetState (thread, ThreadState_Background);
                        }
-                       mono_thread_pop_appdomain_ref ();
-                       InterlockedDecrement (&busy_worker_threads);
-                       /* If the callee changes the background status, set it back to TRUE */
-                       if (*version != '1' && !mono_thread_test_state (thread , ThreadState_Background))
-                               ves_icall_System_Threading_Thread_SetState (thread, ThreadState_Background);
                }
 
                data = dequeue_job (&mono_delegate_section, &async_call_queue);
index b24e17150456caaee4f9b1300582aa29852856d5..129addd3adaa740693f322d88f00e44f03461cbb 100644 (file)
@@ -17,6 +17,8 @@ mono_thread_pool_finish (MonoAsyncResult *ares, MonoArray **out_args,
 
 void mono_thread_pool_cleanup (void) MONO_INTERNAL;
 
+gboolean mono_thread_pool_remove_domain_jobs (MonoDomain *domain, int timeout) MONO_INTERNAL;
+
 void
 ves_icall_System_Threading_ThreadPool_GetAvailableThreads (int *workerThreads,
                                                           int *completionPortThreads) MONO_INTERNAL;