From: Ludovic Henry Date: Tue, 28 Mar 2017 21:03:31 +0000 (-0400) Subject: [threadpool] Pass callback at initialization (#4546) X-Git-Url: http://wien.tomnetworks.com/gitweb/?p=mono.git;a=commitdiff_plain;h=a9e73e7370307c2fb346b645c890bb91fd875d33 [threadpool] Pass callback at initialization (#4546) This remove the possibility to execute different callbacks on the threadpool, but this allows us to remove the work_items_lock, which is called on the hot path of the ThreadPool. --- diff --git a/mono/metadata/threadpool-worker-default.c b/mono/metadata/threadpool-worker-default.c index f410534223d..cfcecf95a85 100644 --- a/mono/metadata/threadpool-worker-default.c +++ b/mono/metadata/threadpool-worker-default.c @@ -110,11 +110,6 @@ typedef struct { gdouble accumulated_sample_duration; } ThreadPoolHillClimbing; -typedef struct { - MonoThreadPoolWorkerCallback callback; - gpointer data; -} ThreadPoolWorkItem; - typedef union { struct { gint16 max_working; /* determined by heuristic */ @@ -128,16 +123,15 @@ typedef union { typedef struct { MonoRefCount ref; + MonoThreadPoolWorkerCallback callback; + ThreadPoolWorkerCounter counters; MonoCoopMutex parked_threads_lock; gint32 parked_threads_count; MonoCoopCond parked_threads_cond; - ThreadPoolWorkItem *work_items; // ThreadPoolWorkItem [] - gint32 work_items_count; - gint32 work_items_size; - MonoCoopMutex work_items_lock; + volatile gint32 work_items_count; guint32 worker_creation_current_second; guint32 worker_creation_current_count; @@ -221,8 +215,6 @@ destroy (gpointer data) mono_coop_mutex_destroy (&worker.parked_threads_lock); mono_coop_cond_destroy (&worker.parked_threads_cond); - mono_coop_mutex_destroy (&worker.work_items_lock); - mono_coop_mutex_destroy (&worker.worker_creation_lock); mono_coop_mutex_destroy (&worker.heuristic_lock); @@ -231,7 +223,7 @@ destroy (gpointer data) } void -mono_threadpool_worker_init (void) +mono_threadpool_worker_init (MonoThreadPoolWorkerCallback callback) { ThreadPoolHillClimbing *hc; const char *threads_per_cpu_env; @@ -240,13 +232,12 @@ mono_threadpool_worker_init (void) mono_refcount_init (&worker, destroy); + worker.callback = callback; + mono_coop_mutex_init (&worker.parked_threads_lock); worker.parked_threads_count = 0; mono_coop_cond_init (&worker.parked_threads_cond); - /* worker.work_items_size is inited to 0 */ - mono_coop_mutex_init (&worker.work_items_lock); - worker.worker_creation_current_second = -1; mono_coop_mutex_init (&worker.worker_creation_lock); @@ -313,69 +304,32 @@ mono_threadpool_worker_cleanup (void) } static void -work_item_lock (void) -{ - mono_coop_mutex_lock (&worker.work_items_lock); -} - -static void -work_item_unlock (void) +work_item_push (void) { - mono_coop_mutex_unlock (&worker.work_items_lock); -} - -static void -work_item_push (MonoThreadPoolWorkerCallback callback, gpointer data) -{ - ThreadPoolWorkItem work_item; - - g_assert (callback); - - work_item.callback = callback; - work_item.data = data; + gint32 old, new; - work_item_lock (); - - g_assert (worker.work_items_count <= worker.work_items_size); - - if (G_UNLIKELY (worker.work_items_count == worker.work_items_size)) { - worker.work_items_size += 64; - worker.work_items = g_renew (ThreadPoolWorkItem, worker.work_items, worker.work_items_size); - } - - g_assert (worker.work_items); - - worker.work_items [worker.work_items_count ++] = work_item; - - // printf ("[push] worker.work_items = %p, worker.work_items_count = %d, worker.work_items_size = %d\n", - // worker.work_items, worker.work_items_count, worker.work_items_size); + do { + old = InterlockedRead (&worker.work_items_count); + g_assert (old >= 0); - work_item_unlock (); + new = old + 1; + } while (InterlockedCompareExchange (&worker.work_items_count, new, old) != old); } static gboolean -work_item_try_pop (ThreadPoolWorkItem *work_item) +work_item_try_pop (void) { - g_assert (work_item); - - work_item_lock (); - - // printf ("[pop] worker.work_items = %p, worker.work_items_count = %d, worker.work_items_size = %d\n", - // worker.work_items, worker.work_items_count, worker.work_items_size); + gint32 old, new; - if (worker.work_items_count == 0) { - work_item_unlock (); - return FALSE; - } - - *work_item = worker.work_items [-- worker.work_items_count]; + do { + old = InterlockedRead (&worker.work_items_count); + g_assert (old >= 0); - if (G_UNLIKELY (worker.work_items_count >= 64 * 3 && worker.work_items_count < worker.work_items_size / 2)) { - worker.work_items_size -= 64; - worker.work_items = g_renew (ThreadPoolWorkItem, worker.work_items, worker.work_items_size); - } + if (old == 0) + return FALSE; - work_item_unlock (); + new = old - 1; + } while (InterlockedCompareExchange (&worker.work_items_count, new, old) != old); return TRUE; } @@ -383,24 +337,18 @@ work_item_try_pop (ThreadPoolWorkItem *work_item) static gint32 work_item_count (void) { - gint32 count; - - work_item_lock (); - count = worker.work_items_count; - work_item_unlock (); - - return count; + return InterlockedRead (&worker.work_items_count); } static void worker_request (void); void -mono_threadpool_worker_enqueue (MonoThreadPoolWorkerCallback callback, gpointer data) +mono_threadpool_worker_request (void) { if (!mono_refcount_tryinc (&worker)) return; - work_item_push (callback, data); + work_item_push (); worker_request (); @@ -523,12 +471,10 @@ worker_thread (gpointer unused) g_assert (thread); while (!mono_runtime_is_shutting_down ()) { - ThreadPoolWorkItem work_item; - if (mono_thread_interruption_checkpoint ()) continue; - if (!work_item_try_pop (&work_item)) { + if (!work_item_try_pop ()) { gboolean timeout; timeout = worker_park (); @@ -538,10 +484,10 @@ worker_thread (gpointer unused) continue; } - mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] worker executing %p (%p)", - mono_native_thread_id_get (), work_item.callback, work_item.data); + mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] worker executing", + mono_native_thread_id_get ()); - work_item.callback (work_item.data); + worker.callback (); } COUNTER_ATOMIC (counter, { diff --git a/mono/metadata/threadpool-worker.h b/mono/metadata/threadpool-worker.h index e6f6e10eb42..9b2c51d7f65 100644 --- a/mono/metadata/threadpool-worker.h +++ b/mono/metadata/threadpool-worker.h @@ -7,16 +7,16 @@ #include -typedef void (*MonoThreadPoolWorkerCallback)(gpointer); +typedef void (*MonoThreadPoolWorkerCallback)(void); void -mono_threadpool_worker_init (void); +mono_threadpool_worker_init (MonoThreadPoolWorkerCallback callback); void mono_threadpool_worker_cleanup (void); void -mono_threadpool_worker_enqueue (MonoThreadPoolWorkerCallback callback, gpointer data); +mono_threadpool_worker_request (void); gboolean mono_threadpool_worker_notify_completed (void); diff --git a/mono/metadata/threadpool.c b/mono/metadata/threadpool.c index cf9a8e69deb..3c0fa3d7f6d 100644 --- a/mono/metadata/threadpool.c +++ b/mono/metadata/threadpool.c @@ -122,6 +122,9 @@ destroy (gpointer unused) mono_coop_mutex_destroy (&threadpool.domains_lock); } +static void +worker_callback (void); + static void initialize (void) { @@ -135,7 +138,7 @@ initialize (void) threadpool.limit_io_min = mono_cpu_count (); threadpool.limit_io_max = CLAMP (threadpool.limit_io_min * 100, MIN (threadpool.limit_io_min, 200), MAX (threadpool.limit_io_min, 200)); - mono_threadpool_worker_init (); + mono_threadpool_worker_init (worker_callback); } static void @@ -277,7 +280,7 @@ try_invoke_perform_wait_callback (MonoObject** exc, MonoError *error) } static void -worker_callback (gpointer unused) +worker_callback (void) { MonoError error; ThreadPoolDomain *tpdomain, *previous_tpdomain; @@ -779,7 +782,7 @@ ves_icall_System_Threading_ThreadPool_RequestWorkerThread (void) counter._.starting ++; }); - mono_threadpool_worker_enqueue (worker_callback, NULL); + mono_threadpool_worker_request (); mono_refcount_dec (&threadpool); return TRUE;