(SOCKET)(gssize)x->handle, x->buffer, x->offset, x->size,\
x->socket_flags, &x->error);
+
+static void
+unregister_job (MonoAsyncResult *obj)
+{
+ EnterCriticalSection (&ares_lock);
+ mono_g_hash_table_remove (ares_htable, obj);
+ LeaveCriticalSection (&ares_lock);
+}
+
+static void
+threadpool_jobs_inc (MonoObject *obj)
+{
+ if (obj)
+ InterlockedIncrement (&obj->vtable->domain->threadpool_jobs);
+}
+
+static gboolean
+threadpool_jobs_dec (MonoObject *obj)
+{
+ MonoDomain *domain = obj->vtable->domain;
+ int remaining_jobs = InterlockedDecrement (&domain->threadpool_jobs);
+ if (remaining_jobs == 0 && domain->cleanup_semaphore) {
+ ReleaseSemaphore (domain->cleanup_semaphore, 1, NULL);
+ return TRUE;
+ }
+ return FALSE;
+}
+
static void
async_invoke_io_thread (gpointer data)
{
/* worker threads invokes methods in different domains,
* so we need to set the right domain here */
domain = ((MonoObject *)ar)->vtable->domain;
- mono_thread_push_appdomain_ref (domain);
- if (mono_domain_set (domain, FALSE)) {
- ASyncCall *ac;
-
- mono_async_invoke (ar);
- ac = (ASyncCall *) ar->object_data;
- /*
- if (ac->msg->exc != NULL)
- mono_unhandled_exception (ac->msg->exc);
- */
- mono_domain_set (mono_get_root_domain (), TRUE);
+
+ g_assert (domain);
+
+ if (domain->state == MONO_APPDOMAIN_UNLOADED || domain->state == MONO_APPDOMAIN_UNLOADING) {
+ threadpool_jobs_dec ((MonoObject *)ar);
+ unregister_job (ar);
+ data = NULL;
+ } else {
+ mono_thread_push_appdomain_ref (domain);
+ if (threadpool_jobs_dec ((MonoObject *)ar)) {
+ unregister_job (ar);
+ data = NULL;
+ mono_thread_pop_appdomain_ref ();
+ continue;
+ }
+ if (mono_domain_set (domain, FALSE)) {
+ ASyncCall *ac;
+
+ mono_async_invoke (ar);
+ ac = (ASyncCall *) ar->object_data;
+ /*
+ if (ac->msg->exc != NULL)
+ mono_unhandled_exception (ac->msg->exc);
+ */
+ mono_domain_set (mono_get_root_domain (), TRUE);
+ }
+ mono_thread_pop_appdomain_ref ();
+ InterlockedDecrement (&busy_io_worker_threads);
+ /* If the callee changes the background status, set it back to TRUE */
+ if (*version != '1' && !mono_thread_test_state (thread , ThreadState_Background))
+ ves_icall_System_Threading_Thread_SetState (thread, ThreadState_Background);
}
- mono_thread_pop_appdomain_ref ();
- InterlockedDecrement (&busy_io_worker_threads);
- /* If the callee changes the background status, set it back to TRUE */
- if (*version != '1' && !mono_thread_test_state (thread , ThreadState_Background))
- ves_icall_System_Threading_Thread_SetState (thread, ThreadState_Background);
}
data = dequeue_job (&io_queue_lock, &async_io_queue);
worker < mono_io_max_worker_threads) {
InterlockedIncrement (&busy_io_worker_threads);
InterlockedIncrement (&io_worker_threads);
+ threadpool_jobs_inc ((MonoObject *)ares);
mono_thread_create_internal (mono_get_root_domain (), async_invoke_io_thread, ares, TRUE);
} else {
append_job (&io_queue_lock, &async_io_queue, (MonoObject*)ares);
existing = (int) InterlockedCompareExchange (&mono_worker_threads, 0, -1);
if ((needed - existing) > 0) {
start_tpthread (data);
+ if (data)
+ threadpool_jobs_dec ((MonoObject*)data);
data = NULL;
Sleep (500);
}
} while ((needed - existing) > 0);
/* If we don't start any thread here, make sure 'data' is processed. */
- if (data != NULL)
+ if (data != NULL) {
start_thread_or_queue (data);
+ threadpool_jobs_dec ((MonoObject*)data);
+ }
}
static void
{
InterlockedIncrement (&mono_worker_threads);
InterlockedIncrement (&busy_worker_threads);
+ threadpool_jobs_inc ((MonoObject *)data);
mono_thread_create_internal (mono_get_root_domain (), async_invoke_thread, data, TRUE);
}
int busy, worker;
if ((int) InterlockedCompareExchange (&tp_idle_started, 1, 0) == 0) {
+ threadpool_jobs_inc ((MonoObject*)ares);
mono_thread_create_internal (mono_get_root_domain (), start_idle_threads, ares, TRUE);
return;
}
static void
append_job (CRITICAL_SECTION *cs, TPQueue *list, MonoObject *ar)
{
+ threadpool_jobs_inc (ar);
+
EnterCriticalSection (cs);
if (list->array && (list->next_elem < mono_array_length (list->array))) {
mono_array_setref (list->array, list->next_elem, ar);
LeaveCriticalSection (cs);
}
+
+static void
+clear_queue (CRITICAL_SECTION *cs, TPQueue *list, MonoDomain *domain)
+{
+ int i, count = 0;
+ EnterCriticalSection (cs);
+ /*remove*/
+ for (i = list->first_elem; i < list->next_elem; ++i) {
+ MonoObject *obj = mono_array_get (list->array, MonoObject*, i);
+ if (obj->vtable->domain == domain) {
+ unregister_job ((MonoAsyncResult*)obj);
+
+ mono_array_set (list->array, MonoObject*, i, NULL);
+ InterlockedDecrement (&domain->threadpool_jobs);
+ ++count;
+ }
+ }
+ /*compact*/
+ if (count) {
+ int idx = 0;
+ for (i = list->first_elem; i < list->next_elem; ++i) {
+ MonoObject *obj = mono_array_get (list->array, MonoObject*, i);
+ if (obj)
+ mono_array_set (list->array, MonoObject*, idx++, obj);
+ }
+ list->first_elem = 0;
+ list->next_elem = count;
+ }
+ LeaveCriticalSection (cs);
+}
+
+/*
+ * Clean up the threadpool of all domain jobs.
+ * Can only be called as part of the domain unloading process as
+ * it will wait for all jobs to be visible to the interruption code.
+ */
+gboolean
+mono_thread_pool_remove_domain_jobs (MonoDomain *domain, int timeout)
+{
+ HANDLE sem_handle;
+ int result = TRUE;
+ guint32 start_time = 0;
+
+ clear_queue (&mono_delegate_section, &async_call_queue, domain);
+ clear_queue (&io_queue_lock, &async_io_queue, domain);
+
+ /*
+ * There might be some threads out that could be about to execute stuff from the given domain.
+ * We avoid that by setting up a semaphore to be pulsed by the thread that reaches zero.
+ */
+ sem_handle = CreateSemaphore (NULL, 0, 1, NULL);
+
+ domain->cleanup_semaphore = sem_handle;
+ /*
+ * The memory barrier here is required to have global ordering between assigning to cleanup_semaphone
+ * and reading threadpool_jobs.
+ * Otherwise this thread could read a stale version of threadpool_jobs and wait forever.
+ */
+ mono_memory_write_barrier ();
+
+ if (domain->threadpool_jobs && timeout != -1)
+ start_time = mono_msec_ticks ();
+ while (domain->threadpool_jobs) {
+ WaitForSingleObject (sem_handle, timeout);
+ if (timeout != -1 && (mono_msec_ticks () - start_time) > timeout) {
+ result = FALSE;
+ break;
+ }
+ }
+
+ domain->cleanup_semaphore = NULL;
+ CloseHandle (sem_handle);
+ return result;
+}
+
+
static MonoObject*
dequeue_job (CRITICAL_SECTION *cs, TPQueue *list)
{
/* worker threads invokes methods in different domains,
* so we need to set the right domain here */
domain = ((MonoObject *)ar)->vtable->domain;
- mono_thread_push_appdomain_ref (domain);
- if (mono_domain_set (domain, FALSE)) {
- ASyncCall *ac;
-
- mono_async_invoke (ar);
- ac = (ASyncCall *) ar->object_data;
- /*
- if (ac->msg->exc != NULL)
- mono_unhandled_exception (ac->msg->exc);
- */
- mono_domain_set (mono_get_root_domain (), TRUE);
+
+ g_assert (domain);
+
+ if (domain->state == MONO_APPDOMAIN_UNLOADED || domain->state == MONO_APPDOMAIN_UNLOADING) {
+ threadpool_jobs_dec ((MonoObject *)ar);
+ unregister_job (ar);
+ data = NULL;
+ } else {
+ mono_thread_push_appdomain_ref (domain);
+ if (threadpool_jobs_dec ((MonoObject *)ar)) {
+ unregister_job (ar);
+ data = NULL;
+ mono_thread_pop_appdomain_ref ();
+ continue;
+ }
+
+ if (mono_domain_set (domain, FALSE)) {
+ ASyncCall *ac;
+
+ mono_async_invoke (ar);
+ ac = (ASyncCall *) ar->object_data;
+ /*
+ if (ac->msg->exc != NULL)
+ mono_unhandled_exception (ac->msg->exc);
+ */
+ mono_domain_set (mono_get_root_domain (), TRUE);
+ }
+ mono_thread_pop_appdomain_ref ();
+ InterlockedDecrement (&busy_worker_threads);
+ /* If the callee changes the background status, set it back to TRUE */
+ if (*version != '1' && !mono_thread_test_state (thread , ThreadState_Background))
+ ves_icall_System_Threading_Thread_SetState (thread, ThreadState_Background);
}
- mono_thread_pop_appdomain_ref ();
- InterlockedDecrement (&busy_worker_threads);
- /* If the callee changes the background status, set it back to TRUE */
- if (*version != '1' && !mono_thread_test_state (thread , ThreadState_Background))
- ves_icall_System_Threading_Thread_SetState (thread, ThreadState_Background);
}
data = dequeue_job (&mono_delegate_section, &async_call_queue);