From: Rodrigo Kumpera Date: Tue, 7 Oct 2014 14:34:43 +0000 (-0400) Subject: Merge pull request #1179 from ludovic-henry/pr25-threadpool X-Git-Url: http://wien.tomnetworks.com/gitweb/?a=commitdiff_plain;h=bd002446fc59b1c7f2e69f15a99f8f70da8c8756;hp=fa2c2870e56fa25ee1142ebdba9910a07504eda8;p=mono.git Merge pull request #1179 from ludovic-henry/pr25-threadpool [threadpool] Improve heuristic to approach optimal number of threads based on workload --- diff --git a/mono/metadata/mono-wsq.c b/mono/metadata/mono-wsq.c index 7a9d2258d12..859fb69c55b 100644 --- a/mono/metadata/mono-wsq.c +++ b/mono/metadata/mono-wsq.c @@ -24,6 +24,7 @@ struct _MonoWSQ { volatile gint tail; MonoArray *queue; gint32 mask; + gint32 suspended; MonoSemType lock; }; @@ -61,6 +62,7 @@ mono_wsq_create () wsq = g_new0 (MonoWSQ, 1); wsq->mask = INITIAL_LENGTH - 1; + wsq->suspended = 0; MONO_GC_REGISTER_ROOT_SINGLE (wsq->queue); root = mono_get_root_domain (); wsq->queue = mono_array_new_cached (root, mono_defaults.object_class, INITIAL_LENGTH); @@ -72,6 +74,12 @@ mono_wsq_create () return wsq; } +gboolean +mono_wsq_suspend (MonoWSQ *wsq) +{ + return InterlockedCompareExchange (&wsq->suspended, 1, 0) == 0; +} + void mono_wsq_destroy (MonoWSQ *wsq) { @@ -112,6 +120,11 @@ mono_wsq_local_push (void *obj) return FALSE; } + if (wsq->suspended) { + WSQ_DEBUG ("local_push: wsq suspended\n"); + return FALSE; + } + tail = wsq->tail; if (tail < wsq->head + wsq->mask) { mono_array_setref (wsq->queue, tail & wsq->mask, (MonoObject *) obj); diff --git a/mono/metadata/mono-wsq.h b/mono/metadata/mono-wsq.h index 7208dadb6b9..ccb064d4f18 100644 --- a/mono/metadata/mono-wsq.h +++ b/mono/metadata/mono-wsq.h @@ -20,6 +20,7 @@ gboolean mono_wsq_local_push (void *obj) MONO_INTERNAL; gboolean mono_wsq_local_pop (void **ptr) MONO_INTERNAL; void mono_wsq_try_steal (MonoWSQ *wsq, void **ptr, guint32 ms_timeout) MONO_INTERNAL; gint mono_wsq_count (MonoWSQ *wsq) MONO_INTERNAL; +gboolean mono_wsq_suspend (MonoWSQ *wsq) MONO_INTERNAL; G_END_DECLS diff --git a/mono/metadata/threadpool.c b/mono/metadata/threadpool.c index 39ade54daa1..37c47896683 100644 --- a/mono/metadata/threadpool.c +++ b/mono/metadata/threadpool.c @@ -41,6 +41,7 @@ #include #endif #include +#include #ifdef HAVE_SYS_SOCKET_H #include #endif @@ -62,13 +63,6 @@ #define THREAD_WANTS_A_BREAK(t) ((t->state & (ThreadState_StopRequested | \ ThreadState_SuspendRequested)) != 0) -#define SPIN_TRYLOCK(i) (InterlockedCompareExchange (&(i), 1, 0) == 0) -#define SPIN_LOCK(i) do { \ - if (SPIN_TRYLOCK (i)) \ - break; \ - } while (1) - -#define SPIN_UNLOCK(i) i = 0 #define SMALL_STACK (128 * (sizeof (gpointer) / 4) * 1024) /* DEBUG: prints tp data every 2s */ @@ -83,6 +77,12 @@ enum { KQUEUE_BACKEND }; +enum { + MONITOR_STATE_AWAKE, + MONITOR_STATE_FALLING_ASLEEP, + MONITOR_STATE_SLEEPING +}; + typedef struct { mono_mutex_t io_lock; /* access to sock_to_state */ int inited; // 0 -> not initialized , 1->initializing, 2->initialized, 3->cleaned up @@ -127,12 +127,10 @@ typedef struct { void *pc_nthreads; /* Performance counter for total number of active threads */ /**/ volatile gint destroy_thread; - volatile gint ignore_times; /* Used when there's a thread being created or destroyed */ - volatile gint sp_lock; /* spin lock used to protect ignore_times */ - volatile gint64 last_check; - volatile gint64 time_sum; - volatile gint n_sum; - gint64 averages [2]; +#if DEBUG + volatile gint32 njobs; +#endif + volatile gint32 nexecuted; gboolean is_io; } ThreadPool; @@ -148,6 +146,7 @@ static void threadpool_init (ThreadPool *tp, int min_threads, int max_threads, v static void threadpool_start_idle_threads (ThreadPool *tp); static void threadpool_kill_idle_threads (ThreadPool *tp); static gboolean threadpool_start_thread (ThreadPool *tp); +static void threadpool_kill_thread (ThreadPool *tp); static void monitor_thread (gpointer data); static void socket_io_cleanup (SocketIOData *data); static MonoObject *get_io_event (MonoMList **list, gint event); @@ -163,6 +162,10 @@ static GPtrArray *wsqs; mono_mutex_t wsqs_lock; static gboolean suspended; +static volatile gint32 monitor_njobs = 0; +static volatile gint32 monitor_state; +static MonoSemType monitor_sem; + /* Hooks */ static MonoThreadPoolFunc tp_start_func; static MonoThreadPoolFunc tp_finish_func; @@ -656,6 +659,16 @@ mono_async_invoke (ThreadPool *tp, MonoAsyncResult *ares) mono_thread_set_execution_context (ares->original_context); ares->original_context = NULL; } + +#if DEBUG + InterlockedDecrement (&tp->njobs); +#endif + if (!tp->is_io) + InterlockedIncrement (&tp->nexecuted); + + if (InterlockedDecrement (&monitor_njobs) == 0) + monitor_state = MONITOR_STATE_FALLING_ASLEEP; + return exc; } @@ -757,21 +770,105 @@ signal_handler (int signo) } #endif +#define SAMPLES_PERIOD 500 +#define HISTORY_SIZE 10 +/* number of iteration without any jobs + in the queue before going to sleep */ +#define NUM_WAITING_ITERATIONS 10 + +typedef struct { + gint32 nexecuted; + gint32 nthreads; + gint8 nthreads_diff; +} SamplesHistory; + +static gint8 +monitor_heuristic (gint16 *current, gint16 *history_size, SamplesHistory *history, ThreadPool *tp) +{ + int i; + gint8 decision; + gint16 cur, max = 0; + + /* + * The following heuristic tries to approach the optimal number of threads to maximize jobs throughput. To + * achieve this, it simply stores the number of jobs executed (nexecuted), the number of Threads (nthreads) + * and the decision (nthreads_diff) for the past HISTORY_SIZE periods of time, each period being of + * duration SAMPLES_PERIOD ms. This history gives us an insight into what happened, and to see if we should + * increase or reduce the number of threads by comparing the last period (current) to the best one. + * + * The algorithm can be describe as following : + * - if we have a better throughput than the best period : we should either increase the number of threads + * in case we already have more threads, either reduce the number of threads if we have less threads; this + * is equivalent to move away from the number of threads of the best period, because we are currently better + * - if we have a worse throughput than the best period : we should either decrease the number of threads if + * we have more threads, either increase the number of threads if we have less threads; this is equivalent + * to get closer to the number of threads of the best period, because we are currently worse + */ + + *history_size = MIN (*history_size + 1, HISTORY_SIZE); + cur = *current = (*current + 1) % *history_size; + + history [cur].nthreads = tp->nthreads; + history [cur].nexecuted = InterlockedExchange (&tp->nexecuted, 0); + + if (tp->waiting) { + /* if we have waiting thread in the pool, then do not create a new one */ + history [cur].nthreads_diff = tp->waiting > 1 ? -1 : 0; + decision = 0; + } else if (tp->nthreads < tp->min_threads) { + history [cur].nthreads_diff = 1; + decision = 1; + } else if (*history_size <= 1) { + /* first iteration, let's add a thread by default */ + history [cur].nthreads_diff = 1; + decision = 2; + } else { + max = cur == 0 ? 1 : 0; + for (i = 0; i < *history_size; i++) { + if (i == cur) + continue; + if (history [i].nexecuted > history [max].nexecuted) + max = i; + } + + if (history [cur].nexecuted >= history [max].nexecuted) { + /* we improved the situation, let's continue ! */ + history [cur].nthreads_diff = history [cur].nthreads >= history [max].nthreads ? 1 : -1; + decision = 3; + } else { + /* we made it worse, let's return to previous situation */ + history [cur].nthreads_diff = history [cur].nthreads >= history [max].nthreads ? -1 : 1; + decision = 4; + } + } + +#if DEBUG + printf ("monitor_thread: decision: %1d, history [current]: {nexecuted: %5d, nthreads: %3d, waiting: %2d, nthreads_diff: %2d}, history [max]: {nexecuted: %5d, nthreads: %3d}\n", + decision, history [cur].nexecuted, history [cur].nthreads, tp->waiting, history [cur].nthreads_diff, history [max].nexecuted, history [max].nthreads); +#endif + + return history [cur].nthreads_diff; +} + static void monitor_thread (gpointer unused) { ThreadPool *pools [2]; MonoInternalThread *thread; - guint32 ms; - gboolean need_one; int i; + guint32 ms; + gint8 num_waiting_iterations = 0; + + gint16 history_size = 0, current = -1; + SamplesHistory *history = malloc (sizeof (SamplesHistory) * HISTORY_SIZE); + pools [0] = &async_tp; pools [1] = &async_io_tp; thread = mono_thread_internal_current (); ves_icall_System_Threading_Thread_SetName_internal (thread, mono_string_new (mono_domain_get (), "Threadpool monitor")); while (1) { - ms = 500; + ms = SAMPLES_PERIOD; i = 10; //number of spurious awakes we tolerate before doing a round of rebalancing. do { guint32 ts; @@ -791,26 +888,40 @@ monitor_thread (gpointer unused) if (suspended) continue; + switch (monitor_state) { + case MONITOR_STATE_AWAKE: + num_waiting_iterations = 0; + break; + case MONITOR_STATE_FALLING_ASLEEP: + if (++num_waiting_iterations == NUM_WAITING_ITERATIONS) { + if (monitor_state == MONITOR_STATE_FALLING_ASLEEP && InterlockedCompareExchange (&monitor_state, MONITOR_STATE_SLEEPING, MONITOR_STATE_FALLING_ASLEEP) == MONITOR_STATE_FALLING_ASLEEP) { + MONO_SEM_WAIT (&monitor_sem); + + num_waiting_iterations = 0; + current = -1; + history_size = 0; + } + } + break; + case MONITOR_STATE_SLEEPING: + g_assert_not_reached (); + } + for (i = 0; i < 2; i++) { ThreadPool *tp; tp = pools [i]; - if (tp->waiting > 0) - continue; - need_one = (mono_cq_count (tp->queue) > 0); - if (!need_one && !tp->is_io) { - mono_mutex_lock (&wsqs_lock); - for (i = 0; wsqs != NULL && i < wsqs->len; i++) { - MonoWSQ *wsq; - wsq = g_ptr_array_index (wsqs, i); - if (mono_wsq_count (wsq) != 0) { - need_one = TRUE; - break; - } - } - mono_mutex_unlock (&wsqs_lock); + + if (tp->is_io) { + if (!tp->waiting && mono_cq_count (tp->queue) > 0) + threadpool_start_thread (tp); + } else { + gint8 nthreads_diff = monitor_heuristic (¤t, &history_size, history, tp); + + if (nthreads_diff > 0) + threadpool_start_thread (tp); + else if (nthreads_diff < 0) + threadpool_kill_thread (tp); } - if (need_one) - threadpool_start_thread (tp); } } } @@ -878,6 +989,10 @@ mono_thread_pool_init (void) signal (SIGALRM, signal_handler); alarm (2); #endif + + MONO_SEM_INIT (&monitor_sem, 0); + monitor_state = MONITOR_STATE_AWAKE; + monitor_njobs = 0; } static MonoAsyncResult * @@ -1014,6 +1129,8 @@ mono_thread_pool_cleanup (void) mono_mutex_unlock (&wsqs_lock); MONO_SEM_DESTROY (&async_tp.new_job); } + + MONO_SEM_DESTROY (&monitor_sem); } static gboolean @@ -1043,6 +1160,13 @@ pulse_on_new_job (ThreadPool *tp) MONO_SEM_POST (&tp->new_job); } +static void +threadpool_kill_thread (ThreadPool *tp) +{ + if (tp->destroy_thread == 0 && InterlockedCompareExchange (&tp->destroy_thread, 1, 0) == 0) + pulse_on_new_job (tp); +} + void icall_append_job (MonoObject *ar) { @@ -1058,7 +1182,6 @@ threadpool_append_job (ThreadPool *tp, MonoObject *ar) static void threadpool_append_jobs (ThreadPool *tp, MonoObject **jobs, gint njobs) { - static int job_counter; MonoObject *ar; gint i; @@ -1078,14 +1201,18 @@ threadpool_append_jobs (ThreadPool *tp, MonoObject **jobs, gint njobs) } } + InterlockedAdd (&monitor_njobs, njobs); + + if (monitor_state == MONITOR_STATE_SLEEPING && InterlockedCompareExchange (&monitor_state, MONITOR_STATE_AWAKE, MONITOR_STATE_SLEEPING) == MONITOR_STATE_SLEEPING) + MONO_SEM_POST (&monitor_sem); + + if (monitor_state == MONITOR_STATE_FALLING_ASLEEP) + InterlockedCompareExchange (&monitor_state, MONITOR_STATE_AWAKE, MONITOR_STATE_FALLING_ASLEEP); + for (i = 0; i < njobs; i++) { ar = jobs [i]; if (ar == NULL || mono_domain_is_unloading (ar->vtable->domain)) continue; /* Might happen when cleaning domain jobs */ - if (!tp->is_io && (InterlockedIncrement (&job_counter) % 10) == 0) { - MonoAsyncResult *o = (MonoAsyncResult *) ar; - o->add_time = mono_100ns_ticks (); - } threadpool_jobs_inc (ar); #ifndef DISABLE_PERFCOUNTERS mono_perfcounter_update_value (tp->pc_nitems, TRUE, 1); @@ -1096,6 +1223,10 @@ threadpool_append_jobs (ThreadPool *tp, MonoObject **jobs, gint njobs) mono_cq_enqueue (tp->queue, ar); } +#if DEBUG + InterlockedAdd (&tp->njobs, njobs); +#endif + for (i = 0; tp->waiting > 0 && i < MIN(njobs, tp->max_threads); i++) pulse_on_new_job (tp); } @@ -1309,84 +1440,6 @@ dequeue_or_steal (ThreadPool *tp, gpointer *data, MonoWSQ *local_wsq) return (*data != NULL); } -static void -process_idle_times (ThreadPool *tp, gint64 t) -{ - gint64 ticks; - gint64 avg; - gboolean compute_avg; - gint new_threads; - gint64 per1; - - if (tp->ignore_times || t <= 0) - return; - - compute_avg = FALSE; - ticks = mono_100ns_ticks (); - t = ticks - t; - SPIN_LOCK (tp->sp_lock); - if (tp->ignore_times) { - SPIN_UNLOCK (tp->sp_lock); - return; - } - tp->time_sum += t; - tp->n_sum++; - if (tp->last_check == 0) - tp->last_check = ticks; - else if (tp->last_check > 0 && (ticks - tp->last_check) > 5000000) { - tp->ignore_times = 1; - compute_avg = TRUE; - } - SPIN_UNLOCK (tp->sp_lock); - - if (!compute_avg) - return; - - //printf ("Items: %d Time elapsed: %.3fs\n", tp->n_sum, (ticks - tp->last_check) / 10000.0); - tp->last_check = ticks; - new_threads = 0; - avg = tp->time_sum / tp->n_sum; - if (tp->averages [1] == 0) { - tp->averages [1] = avg; - } else { - per1 = ((100 * (ABS (avg - tp->averages [1]))) / tp->averages [1]); - if (per1 > 5) { - if (avg > tp->averages [1]) { - if (tp->averages [1] < tp->averages [0]) { - new_threads = -1; - } else { - new_threads = 1; - } - } else if (avg < tp->averages [1] && tp->averages [1] < tp->averages [0]) { - new_threads = 1; - } - } else { - int min, n; - min = tp->min_threads; - n = tp->nthreads; - if ((n - min) < min && tp->busy_threads == n) - new_threads = 1; - } - /* - if (new_threads != 0) { - printf ("n: %d per1: %lld avg=%lld avg1=%lld avg0=%lld\n", new_threads, per1, avg, tp->averages [1], tp->averages [0]); - } - */ - } - - tp->time_sum = 0; - tp->n_sum = 0; - - tp->averages [0] = tp->averages [1]; - tp->averages [1] = avg; - tp->ignore_times = 0; - - if (new_threads == -1) { - if (tp->destroy_thread == 0 && InterlockedCompareExchange (&tp->destroy_thread, 1, 0) == 0) - pulse_on_new_job (tp); - } -} - static gboolean should_i_die (ThreadPool *tp) { @@ -1513,8 +1566,6 @@ async_invoke_thread (gpointer data) if (tp_item_begin_func) tp_item_begin_func (tp_item_user_data); - if (!is_io_task && ar->add_time > 0) - process_idle_times (tp, ar->add_time); exc = mono_async_invoke (tp, ar); if (tp_item_end_func) tp_item_end_func (tp_item_user_data); @@ -1541,8 +1592,12 @@ async_invoke_thread (gpointer data) ar = NULL; data = NULL; must_die = should_i_die (tp); - if (!must_die && (tp->is_io || !mono_wsq_local_pop (&data))) - dequeue_or_steal (tp, &data, wsq); + if (must_die) { + mono_wsq_suspend (wsq); + } else { + if (tp->is_io || !mono_wsq_local_pop (&data)) + dequeue_or_steal (tp, &data, wsq); + } n_naps = 0; while (!must_die && !data && n_naps < 4) {