Merge pull request #1179 from ludovic-henry/pr25-threadpool
authorRodrigo Kumpera <kumpera@gmail.com>
Tue, 7 Oct 2014 14:34:43 +0000 (10:34 -0400)
committerRodrigo Kumpera <kumpera@gmail.com>
Tue, 7 Oct 2014 14:34:43 +0000 (10:34 -0400)
[threadpool] Improve heuristic to approach optimal number of threads based on workload

mono/metadata/mono-wsq.c
mono/metadata/mono-wsq.h
mono/metadata/threadpool.c

index 7a9d2258d12eae120e02b9ceb83e293625d3dec6..859fb69c55bf16d3e46548a37aeba73973c843e8 100644 (file)
@@ -24,6 +24,7 @@ struct _MonoWSQ {
        volatile gint tail;
        MonoArray *queue;
        gint32 mask;
+       gint32 suspended;
        MonoSemType lock;
 };
 
@@ -61,6 +62,7 @@ mono_wsq_create ()
 
        wsq = g_new0 (MonoWSQ, 1);
        wsq->mask = INITIAL_LENGTH - 1;
+       wsq->suspended = 0;
        MONO_GC_REGISTER_ROOT_SINGLE (wsq->queue);
        root = mono_get_root_domain ();
        wsq->queue = mono_array_new_cached (root, mono_defaults.object_class, INITIAL_LENGTH);
@@ -72,6 +74,12 @@ mono_wsq_create ()
        return wsq;
 }
 
+gboolean
+mono_wsq_suspend (MonoWSQ *wsq)
+{
+       return InterlockedCompareExchange (&wsq->suspended, 1, 0) == 0;
+}
+
 void
 mono_wsq_destroy (MonoWSQ *wsq)
 {
@@ -112,6 +120,11 @@ mono_wsq_local_push (void *obj)
                return FALSE;
        }
 
+       if (wsq->suspended) {
+               WSQ_DEBUG ("local_push: wsq suspended\n");
+               return FALSE;
+       }
+
        tail = wsq->tail;
        if (tail < wsq->head + wsq->mask) {
                mono_array_setref (wsq->queue, tail & wsq->mask, (MonoObject *) obj);
index 7208dadb6b99da6700a0db4041a2e5031e76977a..ccb064d4f180f2db73e5fce54802c500242a3b49 100644 (file)
@@ -20,6 +20,7 @@ gboolean mono_wsq_local_push (void *obj) MONO_INTERNAL;
 gboolean mono_wsq_local_pop (void **ptr) MONO_INTERNAL;
 void mono_wsq_try_steal (MonoWSQ *wsq, void **ptr, guint32 ms_timeout) MONO_INTERNAL;
 gint mono_wsq_count (MonoWSQ *wsq) MONO_INTERNAL;
+gboolean mono_wsq_suspend (MonoWSQ *wsq) MONO_INTERNAL;
 
 G_END_DECLS
 
index 39ade54daa14f83a12c5982a15be9d014e30f289..37c4789668321dee9933a96b6f88e30e3d0ea5cb 100644 (file)
@@ -41,6 +41,7 @@
 #include <unistd.h>
 #endif
 #include <string.h>
+#include <math.h>
 #ifdef HAVE_SYS_SOCKET_H
 #include <sys/socket.h>
 #endif
 #define THREAD_WANTS_A_BREAK(t) ((t->state & (ThreadState_StopRequested | \
                                                ThreadState_SuspendRequested)) != 0)
 
-#define SPIN_TRYLOCK(i) (InterlockedCompareExchange (&(i), 1, 0) == 0)
-#define SPIN_LOCK(i) do { \
-                               if (SPIN_TRYLOCK (i)) \
-                                       break; \
-                       } while (1)
-
-#define SPIN_UNLOCK(i) i = 0
 #define SMALL_STACK (128 * (sizeof (gpointer) / 4) * 1024)
 
 /* DEBUG: prints tp data every 2s */
@@ -83,6 +77,12 @@ enum {
        KQUEUE_BACKEND
 };
 
+enum {
+       MONITOR_STATE_AWAKE,
+       MONITOR_STATE_FALLING_ASLEEP,
+       MONITOR_STATE_SLEEPING
+};
+
 typedef struct {
        mono_mutex_t io_lock; /* access to sock_to_state */
        int inited; // 0 -> not initialized , 1->initializing, 2->initialized, 3->cleaned up
@@ -127,12 +127,10 @@ typedef struct {
        void *pc_nthreads; /* Performance counter for total number of active threads */
        /**/
        volatile gint destroy_thread;
-       volatile gint ignore_times; /* Used when there's a thread being created or destroyed */
-       volatile gint sp_lock; /* spin lock used to protect ignore_times */
-       volatile gint64 last_check;
-       volatile gint64 time_sum;
-       volatile gint n_sum;
-       gint64 averages [2];
+#if DEBUG
+       volatile gint32 njobs;
+#endif
+       volatile gint32 nexecuted;
        gboolean is_io;
 } ThreadPool;
 
@@ -148,6 +146,7 @@ static void threadpool_init (ThreadPool *tp, int min_threads, int max_threads, v
 static void threadpool_start_idle_threads (ThreadPool *tp);
 static void threadpool_kill_idle_threads (ThreadPool *tp);
 static gboolean threadpool_start_thread (ThreadPool *tp);
+static void threadpool_kill_thread (ThreadPool *tp);
 static void monitor_thread (gpointer data);
 static void socket_io_cleanup (SocketIOData *data);
 static MonoObject *get_io_event (MonoMList **list, gint event);
@@ -163,6 +162,10 @@ static GPtrArray *wsqs;
 mono_mutex_t wsqs_lock;
 static gboolean suspended;
 
+static volatile gint32 monitor_njobs = 0;
+static volatile gint32 monitor_state;
+static MonoSemType monitor_sem;
+
 /* Hooks */
 static MonoThreadPoolFunc tp_start_func;
 static MonoThreadPoolFunc tp_finish_func;
@@ -656,6 +659,16 @@ mono_async_invoke (ThreadPool *tp, MonoAsyncResult *ares)
                mono_thread_set_execution_context (ares->original_context);
                ares->original_context = NULL;
        }
+
+#if DEBUG
+       InterlockedDecrement (&tp->njobs);
+#endif
+       if (!tp->is_io)
+               InterlockedIncrement (&tp->nexecuted);
+
+       if (InterlockedDecrement (&monitor_njobs) == 0)
+               monitor_state = MONITOR_STATE_FALLING_ASLEEP;
+
        return exc;
 }
 
@@ -757,21 +770,105 @@ signal_handler (int signo)
 }
 #endif
 
+#define SAMPLES_PERIOD 500
+#define HISTORY_SIZE 10
+/* number of iteration without any jobs
+   in the queue before going to sleep */
+#define NUM_WAITING_ITERATIONS 10
+
+typedef struct {
+       gint32 nexecuted;
+       gint32 nthreads;
+       gint8 nthreads_diff;
+} SamplesHistory;
+
+static gint8
+monitor_heuristic (gint16 *current, gint16 *history_size, SamplesHistory *history, ThreadPool *tp)
+{
+       int i;
+       gint8 decision;
+       gint16 cur, max = 0;
+
+       /*
+        * The following heuristic tries to approach the optimal number of threads to maximize jobs throughput. To
+        * achieve this, it simply stores the number of jobs executed (nexecuted), the number of Threads (nthreads)
+        * and the decision (nthreads_diff) for the past HISTORY_SIZE periods of time, each period being of
+        * duration SAMPLES_PERIOD ms. This history gives us an insight into what happened, and to see if we should
+        * increase or reduce the number of threads by comparing the last period (current) to the best one.
+        *
+        * The algorithm can be describe as following :
+        *  - if we have a better throughput than the best period : we should either increase the number of threads
+        *     in case we already have more threads, either reduce the number of threads if we have less threads; this
+        *     is equivalent to move away from the number of threads of the best period, because we are currently better
+        *  - if we have a worse throughput than the best period : we should either decrease the number of threads if
+        *     we have more threads, either increase the number of threads if we have less threads;  this is equivalent
+        *     to get closer to the number of threads of the best period, because we are currently worse
+        */
+
+       *history_size = MIN (*history_size + 1, HISTORY_SIZE);
+       cur = *current = (*current + 1) % *history_size;
+
+       history [cur].nthreads = tp->nthreads;
+       history [cur].nexecuted = InterlockedExchange (&tp->nexecuted, 0);
+
+       if (tp->waiting) {
+               /* if we have waiting thread in the pool, then do not create a new one */
+               history [cur].nthreads_diff = tp->waiting > 1 ? -1 : 0;
+               decision = 0;
+       } else if (tp->nthreads < tp->min_threads) {
+               history [cur].nthreads_diff = 1;
+               decision = 1;
+       } else if (*history_size <= 1) {
+               /* first iteration, let's add a thread by default */
+               history [cur].nthreads_diff = 1;
+               decision = 2;
+       } else {
+               max = cur == 0 ? 1 : 0;
+               for (i = 0; i < *history_size; i++) {
+                       if (i == cur)
+                               continue;
+                       if (history [i].nexecuted > history [max].nexecuted)
+                               max = i;
+               }
+
+               if (history [cur].nexecuted >= history [max].nexecuted) {
+                       /* we improved the situation, let's continue ! */
+                       history [cur].nthreads_diff = history [cur].nthreads >= history [max].nthreads ? 1 : -1;
+                       decision = 3;
+               } else {
+                       /* we made it worse, let's return to previous situation */
+                       history [cur].nthreads_diff = history [cur].nthreads >= history [max].nthreads ? -1 : 1;
+                       decision = 4;
+               }
+       }
+
+#if DEBUG
+       printf ("monitor_thread: decision: %1d, history [current]: {nexecuted: %5d, nthreads: %3d, waiting: %2d, nthreads_diff: %2d}, history [max]: {nexecuted: %5d, nthreads: %3d}\n",
+                       decision, history [cur].nexecuted, history [cur].nthreads, tp->waiting, history [cur].nthreads_diff, history [max].nexecuted, history [max].nthreads);
+#endif
+       
+       return history [cur].nthreads_diff;
+}
+
 static void
 monitor_thread (gpointer unused)
 {
        ThreadPool *pools [2];
        MonoInternalThread *thread;
-       guint32 ms;
-       gboolean need_one;
        int i;
 
+       guint32 ms;
+       gint8 num_waiting_iterations = 0;
+
+       gint16 history_size = 0, current = -1;
+       SamplesHistory *history = malloc (sizeof (SamplesHistory) * HISTORY_SIZE);
+
        pools [0] = &async_tp;
        pools [1] = &async_io_tp;
        thread = mono_thread_internal_current ();
        ves_icall_System_Threading_Thread_SetName_internal (thread, mono_string_new (mono_domain_get (), "Threadpool monitor"));
        while (1) {
-               ms = 500;
+               ms = SAMPLES_PERIOD;
                i = 10; //number of spurious awakes we tolerate before doing a round of rebalancing.
                do {
                        guint32 ts;
@@ -791,26 +888,40 @@ monitor_thread (gpointer unused)
                if (suspended)
                        continue;
 
+               switch (monitor_state) {
+               case MONITOR_STATE_AWAKE:
+                       num_waiting_iterations = 0;
+                       break;
+               case MONITOR_STATE_FALLING_ASLEEP:
+                       if (++num_waiting_iterations == NUM_WAITING_ITERATIONS) {
+                               if (monitor_state == MONITOR_STATE_FALLING_ASLEEP && InterlockedCompareExchange (&monitor_state, MONITOR_STATE_SLEEPING, MONITOR_STATE_FALLING_ASLEEP) == MONITOR_STATE_FALLING_ASLEEP) {
+                                       MONO_SEM_WAIT (&monitor_sem);
+
+                                       num_waiting_iterations = 0;
+                                       current = -1;
+                                       history_size = 0;
+                               }
+                       }
+                       break;
+               case MONITOR_STATE_SLEEPING:
+                       g_assert_not_reached ();
+               }
+
                for (i = 0; i < 2; i++) {
                        ThreadPool *tp;
                        tp = pools [i];
-                       if (tp->waiting > 0)
-                               continue;
-                       need_one = (mono_cq_count (tp->queue) > 0);
-                       if (!need_one && !tp->is_io) {
-                               mono_mutex_lock (&wsqs_lock);
-                               for (i = 0; wsqs != NULL && i < wsqs->len; i++) {
-                                       MonoWSQ *wsq;
-                                       wsq = g_ptr_array_index (wsqs, i);
-                                       if (mono_wsq_count (wsq) != 0) {
-                                               need_one = TRUE;
-                                               break;
-                                       }
-                               }
-                               mono_mutex_unlock (&wsqs_lock);
+
+                       if (tp->is_io) {
+                               if (!tp->waiting && mono_cq_count (tp->queue) > 0)
+                                       threadpool_start_thread (tp);
+                       } else {
+                               gint8 nthreads_diff = monitor_heuristic (&current, &history_size, history, tp);
+
+                               if (nthreads_diff > 0)
+                                       threadpool_start_thread (tp);
+                               else if (nthreads_diff < 0)
+                                       threadpool_kill_thread (tp);
                        }
-                       if (need_one)
-                               threadpool_start_thread (tp);
                }
        }
 }
@@ -878,6 +989,10 @@ mono_thread_pool_init (void)
        signal (SIGALRM, signal_handler);
        alarm (2);
 #endif
+
+       MONO_SEM_INIT (&monitor_sem, 0);
+       monitor_state = MONITOR_STATE_AWAKE;
+       monitor_njobs = 0;
 }
 
 static MonoAsyncResult *
@@ -1014,6 +1129,8 @@ mono_thread_pool_cleanup (void)
                mono_mutex_unlock (&wsqs_lock);
                MONO_SEM_DESTROY (&async_tp.new_job);
        }
+
+       MONO_SEM_DESTROY (&monitor_sem);
 }
 
 static gboolean
@@ -1043,6 +1160,13 @@ pulse_on_new_job (ThreadPool *tp)
                MONO_SEM_POST (&tp->new_job);
 }
 
+static void
+threadpool_kill_thread (ThreadPool *tp)
+{
+       if (tp->destroy_thread == 0 && InterlockedCompareExchange (&tp->destroy_thread, 1, 0) == 0)
+               pulse_on_new_job (tp);
+}
+
 void
 icall_append_job (MonoObject *ar)
 {
@@ -1058,7 +1182,6 @@ threadpool_append_job (ThreadPool *tp, MonoObject *ar)
 static void
 threadpool_append_jobs (ThreadPool *tp, MonoObject **jobs, gint njobs)
 {
-       static int job_counter;
        MonoObject *ar;
        gint i;
 
@@ -1078,14 +1201,18 @@ threadpool_append_jobs (ThreadPool *tp, MonoObject **jobs, gint njobs)
                }
        }
 
+       InterlockedAdd (&monitor_njobs, njobs);
+
+       if (monitor_state == MONITOR_STATE_SLEEPING && InterlockedCompareExchange (&monitor_state, MONITOR_STATE_AWAKE, MONITOR_STATE_SLEEPING) == MONITOR_STATE_SLEEPING)
+               MONO_SEM_POST (&monitor_sem);
+
+       if (monitor_state == MONITOR_STATE_FALLING_ASLEEP)
+               InterlockedCompareExchange (&monitor_state, MONITOR_STATE_AWAKE, MONITOR_STATE_FALLING_ASLEEP);
+
        for (i = 0; i < njobs; i++) {
                ar = jobs [i];
                if (ar == NULL || mono_domain_is_unloading (ar->vtable->domain))
                        continue; /* Might happen when cleaning domain jobs */
-               if (!tp->is_io && (InterlockedIncrement (&job_counter) % 10) == 0) {
-                       MonoAsyncResult *o = (MonoAsyncResult *) ar;
-                       o->add_time = mono_100ns_ticks ();
-               }
                threadpool_jobs_inc (ar); 
 #ifndef DISABLE_PERFCOUNTERS
                mono_perfcounter_update_value (tp->pc_nitems, TRUE, 1);
@@ -1096,6 +1223,10 @@ threadpool_append_jobs (ThreadPool *tp, MonoObject **jobs, gint njobs)
                mono_cq_enqueue (tp->queue, ar);
        }
 
+#if DEBUG
+       InterlockedAdd (&tp->njobs, njobs);
+#endif
+
        for (i = 0; tp->waiting > 0 && i < MIN(njobs, tp->max_threads); i++)
                pulse_on_new_job (tp);
 }
@@ -1309,84 +1440,6 @@ dequeue_or_steal (ThreadPool *tp, gpointer *data, MonoWSQ *local_wsq)
        return (*data != NULL);
 }
 
-static void
-process_idle_times (ThreadPool *tp, gint64 t)
-{
-       gint64 ticks;
-       gint64 avg;
-       gboolean compute_avg;
-       gint new_threads;
-       gint64 per1;
-
-       if (tp->ignore_times || t <= 0)
-               return;
-
-       compute_avg = FALSE;
-       ticks = mono_100ns_ticks ();
-       t = ticks - t;
-       SPIN_LOCK (tp->sp_lock);
-       if (tp->ignore_times) {
-               SPIN_UNLOCK (tp->sp_lock);
-               return;
-       }
-       tp->time_sum += t;
-       tp->n_sum++;
-       if (tp->last_check == 0)
-               tp->last_check = ticks;
-       else if (tp->last_check > 0 && (ticks - tp->last_check) > 5000000) {
-               tp->ignore_times = 1;
-               compute_avg = TRUE;
-       }
-       SPIN_UNLOCK (tp->sp_lock);
-
-       if (!compute_avg)
-               return;
-
-       //printf ("Items: %d Time elapsed: %.3fs\n", tp->n_sum, (ticks - tp->last_check) / 10000.0);
-       tp->last_check = ticks;
-       new_threads = 0;
-       avg = tp->time_sum / tp->n_sum;
-       if (tp->averages [1] == 0) {
-               tp->averages [1] = avg;
-       } else {
-               per1 = ((100 * (ABS (avg - tp->averages [1]))) / tp->averages [1]);
-               if (per1 > 5) {
-                       if (avg > tp->averages [1]) {
-                               if (tp->averages [1] < tp->averages [0]) {
-                                       new_threads = -1;
-                               } else {
-                                       new_threads = 1;
-                               }
-                       } else if (avg < tp->averages [1] && tp->averages [1] < tp->averages [0]) {
-                               new_threads = 1;
-                       }
-               } else {
-                       int min, n;
-                       min = tp->min_threads;
-                       n = tp->nthreads;
-                       if ((n - min) < min && tp->busy_threads == n)
-                               new_threads = 1;
-               }
-               /*
-               if (new_threads != 0) {
-                       printf ("n: %d per1: %lld avg=%lld avg1=%lld avg0=%lld\n", new_threads, per1, avg, tp->averages [1], tp->averages [0]);
-               }
-               */
-       }
-
-       tp->time_sum = 0;
-       tp->n_sum = 0;
-
-       tp->averages [0] = tp->averages [1];
-       tp->averages [1] = avg;
-       tp->ignore_times = 0;
-
-       if (new_threads == -1) {
-               if (tp->destroy_thread == 0 && InterlockedCompareExchange (&tp->destroy_thread, 1, 0) == 0)
-                       pulse_on_new_job (tp);
-       }
-}
-
 static gboolean
 should_i_die (ThreadPool *tp)
 {
@@ -1513,8 +1566,6 @@ async_invoke_thread (gpointer data)
                                        if (tp_item_begin_func)
                                                tp_item_begin_func (tp_item_user_data);
 
-                                       if (!is_io_task && ar->add_time > 0)
-                                               process_idle_times (tp, ar->add_time);
                                        exc = mono_async_invoke (tp, ar);
                                        if (tp_item_end_func)
                                                tp_item_end_func (tp_item_user_data);
@@ -1541,8 +1592,12 @@ async_invoke_thread (gpointer data)
                ar = NULL;
                data = NULL;
                must_die = should_i_die (tp);
-               if (!must_die && (tp->is_io || !mono_wsq_local_pop (&data)))
-                       dequeue_or_steal (tp, &data, wsq);
+               if (must_die) {
+                       mono_wsq_suspend (wsq);
+               } else {
+                       if (tp->is_io || !mono_wsq_local_pop (&data))
+                               dequeue_or_steal (tp, &data, wsq);
+               }
 
                n_naps = 0;
                while (!must_die && !data && n_naps < 4) {