X-Git-Url: http://wien.tomnetworks.com/gitweb/?a=blobdiff_plain;f=mono%2Fmetadata%2Fthreadpool-worker-default.c;h=b0ff471fe4ee92f0f8fda2cd3ea423c2f701c807;hb=44fee4ff97aff7a337fb9afc603a0a5f6933f646;hp=f410534223dacd272fa9c95d6320d0426eab7438;hpb=a3304910e9e08ec965fb06ef6fa4a19257dddf8c;p=mono.git diff --git a/mono/metadata/threadpool-worker-default.c b/mono/metadata/threadpool-worker-default.c index f410534223d..b0ff471fe4e 100644 --- a/mono/metadata/threadpool-worker-default.c +++ b/mono/metadata/threadpool-worker-default.c @@ -110,11 +110,6 @@ typedef struct { gdouble accumulated_sample_duration; } ThreadPoolHillClimbing; -typedef struct { - MonoThreadPoolWorkerCallback callback; - gpointer data; -} ThreadPoolWorkItem; - typedef union { struct { gint16 max_working; /* determined by heuristic */ @@ -123,21 +118,24 @@ typedef union { gint16 parked; /* parked */ } _; gint64 as_gint64; -} ThreadPoolWorkerCounter; +} ThreadPoolWorkerCounter +#ifdef __GNUC__ +__attribute__((aligned(64))) +#endif +; typedef struct { MonoRefCount ref; + MonoThreadPoolWorkerCallback callback; + ThreadPoolWorkerCounter counters; MonoCoopMutex parked_threads_lock; gint32 parked_threads_count; MonoCoopCond parked_threads_cond; - ThreadPoolWorkItem *work_items; // ThreadPoolWorkItem [] - gint32 work_items_count; - gint32 work_items_size; - MonoCoopMutex work_items_lock; + volatile gint32 work_items_count; guint32 worker_creation_current_second; guint32 worker_creation_current_count; @@ -221,8 +219,6 @@ destroy (gpointer data) mono_coop_mutex_destroy (&worker.parked_threads_lock); mono_coop_cond_destroy (&worker.parked_threads_cond); - mono_coop_mutex_destroy (&worker.work_items_lock); - mono_coop_mutex_destroy (&worker.worker_creation_lock); mono_coop_mutex_destroy (&worker.heuristic_lock); @@ -231,7 +227,7 @@ destroy (gpointer data) } void -mono_threadpool_worker_init (void) +mono_threadpool_worker_init (MonoThreadPoolWorkerCallback callback) { ThreadPoolHillClimbing *hc; const char *threads_per_cpu_env; @@ -240,13 +236,12 @@ mono_threadpool_worker_init (void) mono_refcount_init (&worker, destroy); + worker.callback = callback; + mono_coop_mutex_init (&worker.parked_threads_lock); worker.parked_threads_count = 0; mono_coop_cond_init (&worker.parked_threads_cond); - /* worker.work_items_size is inited to 0 */ - mono_coop_mutex_init (&worker.work_items_lock); - worker.worker_creation_current_second = -1; mono_coop_mutex_init (&worker.worker_creation_lock); @@ -291,7 +286,7 @@ mono_threadpool_worker_init (void) worker.limit_worker_min = threads_count; -#if defined (PLATFORM_ANDROID) || defined (HOST_IOS) +#if defined (HOST_ANDROID) || defined (HOST_IOS) worker.limit_worker_max = CLAMP (threads_count * 100, MIN (threads_count, 200), MAX (threads_count, 200)); #else worker.limit_worker_max = threads_count * 100; @@ -313,69 +308,32 @@ mono_threadpool_worker_cleanup (void) } static void -work_item_lock (void) +work_item_push (void) { - mono_coop_mutex_lock (&worker.work_items_lock); -} - -static void -work_item_unlock (void) -{ - mono_coop_mutex_unlock (&worker.work_items_lock); -} - -static void -work_item_push (MonoThreadPoolWorkerCallback callback, gpointer data) -{ - ThreadPoolWorkItem work_item; - - g_assert (callback); - - work_item.callback = callback; - work_item.data = data; + gint32 old, new; - work_item_lock (); - - g_assert (worker.work_items_count <= worker.work_items_size); - - if (G_UNLIKELY (worker.work_items_count == worker.work_items_size)) { - worker.work_items_size += 64; - worker.work_items = g_renew (ThreadPoolWorkItem, worker.work_items, worker.work_items_size); - } - - g_assert (worker.work_items); - - worker.work_items [worker.work_items_count ++] = work_item; - - // printf ("[push] worker.work_items = %p, worker.work_items_count = %d, worker.work_items_size = %d\n", - // worker.work_items, worker.work_items_count, worker.work_items_size); + do { + old = InterlockedRead (&worker.work_items_count); + g_assert (old >= 0); - work_item_unlock (); + new = old + 1; + } while (InterlockedCompareExchange (&worker.work_items_count, new, old) != old); } static gboolean -work_item_try_pop (ThreadPoolWorkItem *work_item) +work_item_try_pop (void) { - g_assert (work_item); - - work_item_lock (); - - // printf ("[pop] worker.work_items = %p, worker.work_items_count = %d, worker.work_items_size = %d\n", - // worker.work_items, worker.work_items_count, worker.work_items_size); + gint32 old, new; - if (worker.work_items_count == 0) { - work_item_unlock (); - return FALSE; - } - - *work_item = worker.work_items [-- worker.work_items_count]; + do { + old = InterlockedRead (&worker.work_items_count); + g_assert (old >= 0); - if (G_UNLIKELY (worker.work_items_count >= 64 * 3 && worker.work_items_count < worker.work_items_size / 2)) { - worker.work_items_size -= 64; - worker.work_items = g_renew (ThreadPoolWorkItem, worker.work_items, worker.work_items_size); - } + if (old == 0) + return FALSE; - work_item_unlock (); + new = old - 1; + } while (InterlockedCompareExchange (&worker.work_items_count, new, old) != old); return TRUE; } @@ -383,24 +341,18 @@ work_item_try_pop (ThreadPoolWorkItem *work_item) static gint32 work_item_count (void) { - gint32 count; - - work_item_lock (); - count = worker.work_items_count; - work_item_unlock (); - - return count; + return InterlockedRead (&worker.work_items_count); } static void worker_request (void); void -mono_threadpool_worker_enqueue (MonoThreadPoolWorkerCallback callback, gpointer data) +mono_threadpool_worker_request (void) { if (!mono_refcount_tryinc (&worker)) return; - work_item_push (callback, data); + work_item_push (); worker_request (); @@ -435,7 +387,8 @@ worker_park (void) gboolean timeout = FALSE; gboolean interrupted = FALSE; - mono_trace (G_LOG_LEVEL_INFO, MONO_TRACE_THREADPOOL, "[%p] worker parking", mono_native_thread_id_get ()); + mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] worker parking", + GUINT_TO_POINTER (MONO_NATIVE_THREAD_ID_TO_UINT (mono_native_thread_id_get ()))); mono_coop_mutex_lock (&worker.parked_threads_lock); @@ -478,8 +431,8 @@ done: mono_coop_mutex_unlock (&worker.parked_threads_lock); - mono_trace (G_LOG_LEVEL_INFO, MONO_TRACE_THREADPOOL, "[%p] worker unparking, timeout? %s interrupted? %s", - mono_native_thread_id_get (), timeout ? "yes" : "no", interrupted ? "yes" : "no"); + mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] worker unparking, timeout? %s interrupted? %s", + GUINT_TO_POINTER (MONO_NATIVE_THREAD_ID_TO_UINT (mono_native_thread_id_get ())), timeout ? "yes" : "no", interrupted ? "yes" : "no"); return timeout; } @@ -489,7 +442,8 @@ worker_try_unpark (void) { gboolean res = FALSE; - mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try unpark worker", mono_native_thread_id_get ()); + mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try unpark worker", + GUINT_TO_POINTER (MONO_NATIVE_THREAD_ID_TO_UINT (mono_native_thread_id_get ()))); mono_coop_mutex_lock (&worker.parked_threads_lock); if (worker.parked_threads_count > 0) { @@ -498,7 +452,8 @@ worker_try_unpark (void) } mono_coop_mutex_unlock (&worker.parked_threads_lock); - mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try unpark worker, success? %s", mono_native_thread_id_get (), res ? "yes" : "no"); + mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try unpark worker, success? %s", + GUINT_TO_POINTER (MONO_NATIVE_THREAD_ID_TO_UINT (mono_native_thread_id_get ())), res ? "yes" : "no"); return res; } @@ -509,7 +464,8 @@ worker_thread (gpointer unused) MonoInternalThread *thread; ThreadPoolWorkerCounter counter; - mono_trace (G_LOG_LEVEL_INFO, MONO_TRACE_THREADPOOL, "[%p] worker starting", mono_native_thread_id_get ()); + mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] worker starting", + GUINT_TO_POINTER (MONO_NATIVE_THREAD_ID_TO_UINT (mono_native_thread_id_get ()))); if (!mono_refcount_tryinc (&worker)) return 0; @@ -523,12 +479,10 @@ worker_thread (gpointer unused) g_assert (thread); while (!mono_runtime_is_shutting_down ()) { - ThreadPoolWorkItem work_item; - if (mono_thread_interruption_checkpoint ()) continue; - if (!work_item_try_pop (&work_item)) { + if (!work_item_try_pop ()) { gboolean timeout; timeout = worker_park (); @@ -538,17 +492,18 @@ worker_thread (gpointer unused) continue; } - mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] worker executing %p (%p)", - mono_native_thread_id_get (), work_item.callback, work_item.data); + mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] worker executing", + GUINT_TO_POINTER (MONO_NATIVE_THREAD_ID_TO_UINT (mono_native_thread_id_get ()))); - work_item.callback (work_item.data); + worker.callback (); } COUNTER_ATOMIC (counter, { counter._.working --; }); - mono_trace (G_LOG_LEVEL_INFO, MONO_TRACE_THREADPOOL, "[%p] worker finishing", mono_native_thread_id_get ()); + mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] worker finishing", + GUINT_TO_POINTER (MONO_NATIVE_THREAD_ID_TO_UINT (mono_native_thread_id_get ()))); mono_refcount_dec (&worker); @@ -569,7 +524,8 @@ worker_try_create (void) mono_coop_mutex_lock (&worker.worker_creation_lock); - mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try create worker", mono_native_thread_id_get ()); + mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try create worker", + GUINT_TO_POINTER (MONO_NATIVE_THREAD_ID_TO_UINT (mono_native_thread_id_get ()))); current_ticks = mono_100ns_ticks (); if (0 == current_ticks) { @@ -583,7 +539,7 @@ worker_try_create (void) g_assert (worker.worker_creation_current_count <= WORKER_CREATION_MAX_PER_SEC); if (worker.worker_creation_current_count == WORKER_CREATION_MAX_PER_SEC) { mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try create worker, failed: maximum number of worker created per second reached, current count = %d", - mono_native_thread_id_get (), worker.worker_creation_current_count); + GUINT_TO_POINTER (MONO_NATIVE_THREAD_ID_TO_UINT (mono_native_thread_id_get ())), worker.worker_creation_current_count); mono_coop_mutex_unlock (&worker.worker_creation_lock); return FALSE; } @@ -593,7 +549,7 @@ worker_try_create (void) COUNTER_ATOMIC (counter, { if (counter._.working >= counter._.max_working) { mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try create worker, failed: maximum number of working threads reached", - mono_native_thread_id_get ()); + GUINT_TO_POINTER (MONO_NATIVE_THREAD_ID_TO_UINT (mono_native_thread_id_get ()))); mono_coop_mutex_unlock (&worker.worker_creation_lock); return FALSE; } @@ -602,7 +558,8 @@ worker_try_create (void) thread = mono_thread_create_internal (mono_get_root_domain (), worker_thread, NULL, MONO_THREAD_CREATE_FLAGS_THREADPOOL, &error); if (!thread) { - mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try create worker, failed: could not create thread due to %s", mono_native_thread_id_get (), mono_error_get_message (&error)); + mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try create worker, failed: could not create thread due to %s", + GUINT_TO_POINTER (MONO_NATIVE_THREAD_ID_TO_UINT (mono_native_thread_id_get ())), mono_error_get_message (&error)); mono_error_cleanup (&error); COUNTER_ATOMIC (counter, { @@ -617,7 +574,7 @@ worker_try_create (void) worker.worker_creation_current_count += 1; mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try create worker, created %p, now = %d count = %d", - mono_native_thread_id_get (), (gpointer) thread->tid, now, worker.worker_creation_current_count); + GUINT_TO_POINTER (MONO_NATIVE_THREAD_ID_TO_UINT (mono_native_thread_id_get ())), (gpointer) thread->tid, now, worker.worker_creation_current_count); mono_coop_mutex_unlock (&worker.worker_creation_lock); return TRUE; @@ -634,16 +591,19 @@ worker_request (void) monitor_ensure_running (); if (worker_try_unpark ()) { - mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] request worker, unparked", mono_native_thread_id_get ()); + mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] request worker, unparked", + GUINT_TO_POINTER (MONO_NATIVE_THREAD_ID_TO_UINT (mono_native_thread_id_get ()))); return; } if (worker_try_create ()) { - mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] request worker, created", mono_native_thread_id_get ()); + mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] request worker, created", + GUINT_TO_POINTER (MONO_NATIVE_THREAD_ID_TO_UINT (mono_native_thread_id_get ()))); return; } - mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] request worker, failed", mono_native_thread_id_get ()); + mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] request worker, failed", + GUINT_TO_POINTER (MONO_NATIVE_THREAD_ID_TO_UINT (mono_native_thread_id_get ()))); } static gboolean @@ -718,7 +678,8 @@ monitor_thread (gpointer unused) // printf ("monitor_thread: start\n"); - mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] monitor thread, started", mono_native_thread_id_get ()); + mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] monitor thread, started", + GUINT_TO_POINTER (MONO_NATIVE_THREAD_ID_TO_UINT (mono_native_thread_id_get ()))); do { ThreadPoolWorkerCounter counter; @@ -781,12 +742,14 @@ monitor_thread (gpointer unused) break; if (worker_try_unpark ()) { - mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] monitor thread, unparked", mono_native_thread_id_get ()); + mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] monitor thread, unparked", + GUINT_TO_POINTER (MONO_NATIVE_THREAD_ID_TO_UINT (mono_native_thread_id_get ()))); break; } if (worker_try_create ()) { - mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] monitor thread, created", mono_native_thread_id_get ()); + mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] monitor thread, created", + GUINT_TO_POINTER (MONO_NATIVE_THREAD_ID_TO_UINT (mono_native_thread_id_get ()))); break; } } @@ -794,7 +757,8 @@ monitor_thread (gpointer unused) // printf ("monitor_thread: stop\n"); - mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] monitor thread, finished", mono_native_thread_id_get ()); + mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] monitor thread, finished", + GUINT_TO_POINTER (MONO_NATIVE_THREAD_ID_TO_UINT (mono_native_thread_id_get ()))); mono_refcount_dec (&worker); return 0; @@ -840,7 +804,8 @@ hill_climbing_change_thread_count (gint16 new_thread_count, ThreadPoolHeuristicS hc = &worker.heuristic_hill_climbing; - mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] hill climbing, change max number of threads %d", mono_native_thread_id_get (), new_thread_count); + mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] hill climbing, change max number of threads %d", + GUINT_TO_POINTER (MONO_NATIVE_THREAD_ID_TO_UINT (mono_native_thread_id_get ())), new_thread_count); hc->last_thread_count = new_thread_count; hc->current_sample_interval = rand_next (&hc->random_interval_generator, hc->sample_interval_low, hc->sample_interval_high);