#include <unistd.h>
#endif
#include <string.h>
+#include <math.h>
#ifdef HAVE_SYS_SOCKET_H
#include <sys/socket.h>
#endif
#define THREAD_WANTS_A_BREAK(t) ((t->state & (ThreadState_StopRequested | \
ThreadState_SuspendRequested)) != 0)
-#define SPIN_TRYLOCK(i) (InterlockedCompareExchange (&(i), 1, 0) == 0)
-#define SPIN_LOCK(i) do { \
- if (SPIN_TRYLOCK (i)) \
- break; \
- } while (1)
-
-#define SPIN_UNLOCK(i) i = 0
#define SMALL_STACK (128 * (sizeof (gpointer) / 4) * 1024)
/* DEBUG: prints tp data every 2s */
KQUEUE_BACKEND
};
+enum {
+ MONITOR_STATE_AWAKE,
+ MONITOR_STATE_FALLING_ASLEEP,
+ MONITOR_STATE_SLEEPING
+};
+
typedef struct {
mono_mutex_t io_lock; /* access to sock_to_state */
int inited; // 0 -> not initialized , 1->initializing, 2->initialized, 3->cleaned up
void *pc_nthreads; /* Performance counter for total number of active threads */
/**/
volatile gint destroy_thread;
- volatile gint ignore_times; /* Used when there's a thread being created or destroyed */
- volatile gint sp_lock; /* spin lock used to protect ignore_times */
- volatile gint64 last_check;
- volatile gint64 time_sum;
- volatile gint n_sum;
- gint64 averages [2];
+#if DEBUG
+ volatile gint32 njobs;
+#endif
+ volatile gint32 nexecuted;
gboolean is_io;
} ThreadPool;
static void threadpool_start_idle_threads (ThreadPool *tp);
static void threadpool_kill_idle_threads (ThreadPool *tp);
static gboolean threadpool_start_thread (ThreadPool *tp);
+static void threadpool_kill_thread (ThreadPool *tp);
static void monitor_thread (gpointer data);
static void socket_io_cleanup (SocketIOData *data);
static MonoObject *get_io_event (MonoMList **list, gint event);
mono_mutex_t wsqs_lock;
static gboolean suspended;
+static volatile gint32 monitor_njobs = 0;
+static volatile gint32 monitor_state;
+static MonoSemType monitor_sem;
+
/* Hooks */
static MonoThreadPoolFunc tp_start_func;
static MonoThreadPoolFunc tp_finish_func;
mono_thread_set_execution_context (ares->original_context);
ares->original_context = NULL;
}
+
+#if DEBUG
+ InterlockedDecrement (&tp->njobs);
+#endif
+ if (!tp->is_io)
+ InterlockedIncrement (&tp->nexecuted);
+
+ if (InterlockedDecrement (&monitor_njobs) == 0)
+ monitor_state = MONITOR_STATE_FALLING_ASLEEP;
+
return exc;
}
}
#endif
+#define SAMPLES_PERIOD 500
+#define HISTORY_SIZE 10
+/* number of iteration without any jobs
+ in the queue before going to sleep */
+#define NUM_WAITING_ITERATIONS 10
+
+typedef struct {
+ gint32 nexecuted;
+ gint32 nthreads;
+ gint8 nthreads_diff;
+} SamplesHistory;
+
+static gint8
+monitor_heuristic (gint16 *current, gint16 *history_size, SamplesHistory *history, ThreadPool *tp)
+{
+ int i;
+ gint8 decision;
+ gint16 cur, max = 0;
+
+ /*
+ * The following heuristic tries to approach the optimal number of threads to maximize jobs throughput. To
+ * achieve this, it simply stores the number of jobs executed (nexecuted), the number of Threads (nthreads)
+ * and the decision (nthreads_diff) for the past HISTORY_SIZE periods of time, each period being of
+ * duration SAMPLES_PERIOD ms. This history gives us an insight into what happened, and to see if we should
+ * increase or reduce the number of threads by comparing the last period (current) to the best one.
+ *
+ * The algorithm can be describe as following :
+ * - if we have a better throughput than the best period : we should either increase the number of threads
+ * in case we already have more threads, either reduce the number of threads if we have less threads; this
+ * is equivalent to move away from the number of threads of the best period, because we are currently better
+ * - if we have a worse throughput than the best period : we should either decrease the number of threads if
+ * we have more threads, either increase the number of threads if we have less threads; this is equivalent
+ * to get closer to the number of threads of the best period, because we are currently worse
+ */
+
+ *history_size = MIN (*history_size + 1, HISTORY_SIZE);
+ cur = *current = (*current + 1) % *history_size;
+
+ history [cur].nthreads = tp->nthreads;
+ history [cur].nexecuted = InterlockedExchange (&tp->nexecuted, 0);
+
+ if (tp->waiting) {
+ /* if we have waiting thread in the pool, then do not create a new one */
+ history [cur].nthreads_diff = tp->waiting > 1 ? -1 : 0;
+ decision = 0;
+ } else if (tp->nthreads < tp->min_threads) {
+ history [cur].nthreads_diff = 1;
+ decision = 1;
+ } else if (*history_size <= 1) {
+ /* first iteration, let's add a thread by default */
+ history [cur].nthreads_diff = 1;
+ decision = 2;
+ } else {
+ max = cur == 0 ? 1 : 0;
+ for (i = 0; i < *history_size; i++) {
+ if (i == cur)
+ continue;
+ if (history [i].nexecuted > history [max].nexecuted)
+ max = i;
+ }
+
+ if (history [cur].nexecuted >= history [max].nexecuted) {
+ /* we improved the situation, let's continue ! */
+ history [cur].nthreads_diff = history [cur].nthreads >= history [max].nthreads ? 1 : -1;
+ decision = 3;
+ } else {
+ /* we made it worse, let's return to previous situation */
+ history [cur].nthreads_diff = history [cur].nthreads >= history [max].nthreads ? -1 : 1;
+ decision = 4;
+ }
+ }
+
+#if DEBUG
+ printf ("monitor_thread: decision: %1d, history [current]: {nexecuted: %5d, nthreads: %3d, waiting: %2d, nthreads_diff: %2d}, history [max]: {nexecuted: %5d, nthreads: %3d}\n",
+ decision, history [cur].nexecuted, history [cur].nthreads, tp->waiting, history [cur].nthreads_diff, history [max].nexecuted, history [max].nthreads);
+#endif
+
+ return history [cur].nthreads_diff;
+}
+
static void
monitor_thread (gpointer unused)
{
ThreadPool *pools [2];
MonoInternalThread *thread;
- guint32 ms;
- gboolean need_one;
int i;
+ guint32 ms;
+ gint8 num_waiting_iterations = 0;
+
+ gint16 history_size = 0, current = -1;
+ SamplesHistory *history = malloc (sizeof (SamplesHistory) * HISTORY_SIZE);
+
pools [0] = &async_tp;
pools [1] = &async_io_tp;
thread = mono_thread_internal_current ();
ves_icall_System_Threading_Thread_SetName_internal (thread, mono_string_new (mono_domain_get (), "Threadpool monitor"));
while (1) {
- ms = 500;
+ ms = SAMPLES_PERIOD;
i = 10; //number of spurious awakes we tolerate before doing a round of rebalancing.
do {
guint32 ts;
if (suspended)
continue;
+ switch (monitor_state) {
+ case MONITOR_STATE_AWAKE:
+ num_waiting_iterations = 0;
+ break;
+ case MONITOR_STATE_FALLING_ASLEEP:
+ if (++num_waiting_iterations == NUM_WAITING_ITERATIONS) {
+ if (monitor_state == MONITOR_STATE_FALLING_ASLEEP && InterlockedCompareExchange (&monitor_state, MONITOR_STATE_SLEEPING, MONITOR_STATE_FALLING_ASLEEP) == MONITOR_STATE_FALLING_ASLEEP) {
+ MONO_SEM_WAIT (&monitor_sem);
+
+ num_waiting_iterations = 0;
+ current = -1;
+ history_size = 0;
+ }
+ }
+ break;
+ case MONITOR_STATE_SLEEPING:
+ g_assert_not_reached ();
+ }
+
for (i = 0; i < 2; i++) {
ThreadPool *tp;
tp = pools [i];
- if (tp->waiting > 0)
- continue;
- need_one = (mono_cq_count (tp->queue) > 0);
- if (!need_one && !tp->is_io) {
- mono_mutex_lock (&wsqs_lock);
- for (i = 0; wsqs != NULL && i < wsqs->len; i++) {
- MonoWSQ *wsq;
- wsq = g_ptr_array_index (wsqs, i);
- if (mono_wsq_count (wsq) != 0) {
- need_one = TRUE;
- break;
- }
- }
- mono_mutex_unlock (&wsqs_lock);
+
+ if (tp->is_io) {
+ if (!tp->waiting && mono_cq_count (tp->queue) > 0)
+ threadpool_start_thread (tp);
+ } else {
+ gint8 nthreads_diff = monitor_heuristic (¤t, &history_size, history, tp);
+
+ if (nthreads_diff > 0)
+ threadpool_start_thread (tp);
+ else if (nthreads_diff < 0)
+ threadpool_kill_thread (tp);
}
- if (need_one)
- threadpool_start_thread (tp);
}
}
}
signal (SIGALRM, signal_handler);
alarm (2);
#endif
+
+ MONO_SEM_INIT (&monitor_sem, 0);
+ monitor_state = MONITOR_STATE_AWAKE;
+ monitor_njobs = 0;
}
static MonoAsyncResult *
mono_mutex_unlock (&wsqs_lock);
MONO_SEM_DESTROY (&async_tp.new_job);
}
+
+ MONO_SEM_DESTROY (&monitor_sem);
}
static gboolean
MONO_SEM_POST (&tp->new_job);
}
+static void
+threadpool_kill_thread (ThreadPool *tp)
+{
+ if (tp->destroy_thread == 0 && InterlockedCompareExchange (&tp->destroy_thread, 1, 0) == 0)
+ pulse_on_new_job (tp);
+}
+
void
icall_append_job (MonoObject *ar)
{
static void
threadpool_append_jobs (ThreadPool *tp, MonoObject **jobs, gint njobs)
{
- static int job_counter;
MonoObject *ar;
gint i;
}
}
+ InterlockedAdd (&monitor_njobs, njobs);
+
+ if (monitor_state == MONITOR_STATE_SLEEPING && InterlockedCompareExchange (&monitor_state, MONITOR_STATE_AWAKE, MONITOR_STATE_SLEEPING) == MONITOR_STATE_SLEEPING)
+ MONO_SEM_POST (&monitor_sem);
+
+ if (monitor_state == MONITOR_STATE_FALLING_ASLEEP)
+ InterlockedCompareExchange (&monitor_state, MONITOR_STATE_AWAKE, MONITOR_STATE_FALLING_ASLEEP);
+
for (i = 0; i < njobs; i++) {
ar = jobs [i];
if (ar == NULL || mono_domain_is_unloading (ar->vtable->domain))
continue; /* Might happen when cleaning domain jobs */
- if (!tp->is_io && (InterlockedIncrement (&job_counter) % 10) == 0) {
- MonoAsyncResult *o = (MonoAsyncResult *) ar;
- o->add_time = mono_100ns_ticks ();
- }
threadpool_jobs_inc (ar);
#ifndef DISABLE_PERFCOUNTERS
mono_perfcounter_update_value (tp->pc_nitems, TRUE, 1);
mono_cq_enqueue (tp->queue, ar);
}
+#if DEBUG
+ InterlockedAdd (&tp->njobs, njobs);
+#endif
+
for (i = 0; tp->waiting > 0 && i < MIN(njobs, tp->max_threads); i++)
pulse_on_new_job (tp);
}
return (*data != NULL);
}
-static void
-process_idle_times (ThreadPool *tp, gint64 t)
-{
- gint64 ticks;
- gint64 avg;
- gboolean compute_avg;
- gint new_threads;
- gint64 per1;
-
- if (tp->ignore_times || t <= 0)
- return;
-
- compute_avg = FALSE;
- ticks = mono_100ns_ticks ();
- t = ticks - t;
- SPIN_LOCK (tp->sp_lock);
- if (tp->ignore_times) {
- SPIN_UNLOCK (tp->sp_lock);
- return;
- }
- tp->time_sum += t;
- tp->n_sum++;
- if (tp->last_check == 0)
- tp->last_check = ticks;
- else if (tp->last_check > 0 && (ticks - tp->last_check) > 5000000) {
- tp->ignore_times = 1;
- compute_avg = TRUE;
- }
- SPIN_UNLOCK (tp->sp_lock);
-
- if (!compute_avg)
- return;
-
- //printf ("Items: %d Time elapsed: %.3fs\n", tp->n_sum, (ticks - tp->last_check) / 10000.0);
- tp->last_check = ticks;
- new_threads = 0;
- avg = tp->time_sum / tp->n_sum;
- if (tp->averages [1] == 0) {
- tp->averages [1] = avg;
- } else {
- per1 = ((100 * (ABS (avg - tp->averages [1]))) / tp->averages [1]);
- if (per1 > 5) {
- if (avg > tp->averages [1]) {
- if (tp->averages [1] < tp->averages [0]) {
- new_threads = -1;
- } else {
- new_threads = 1;
- }
- } else if (avg < tp->averages [1] && tp->averages [1] < tp->averages [0]) {
- new_threads = 1;
- }
- } else {
- int min, n;
- min = tp->min_threads;
- n = tp->nthreads;
- if ((n - min) < min && tp->busy_threads == n)
- new_threads = 1;
- }
- /*
- if (new_threads != 0) {
- printf ("n: %d per1: %lld avg=%lld avg1=%lld avg0=%lld\n", new_threads, per1, avg, tp->averages [1], tp->averages [0]);
- }
- */
- }
-
- tp->time_sum = 0;
- tp->n_sum = 0;
-
- tp->averages [0] = tp->averages [1];
- tp->averages [1] = avg;
- tp->ignore_times = 0;
-
- if (new_threads == -1) {
- if (tp->destroy_thread == 0 && InterlockedCompareExchange (&tp->destroy_thread, 1, 0) == 0)
- pulse_on_new_job (tp);
- }
-}
-
static gboolean
should_i_die (ThreadPool *tp)
{
if (tp_item_begin_func)
tp_item_begin_func (tp_item_user_data);
- if (!is_io_task && ar->add_time > 0)
- process_idle_times (tp, ar->add_time);
exc = mono_async_invoke (tp, ar);
if (tp_item_end_func)
tp_item_end_func (tp_item_user_data);
ar = NULL;
data = NULL;
must_die = should_i_die (tp);
- if (!must_die && (tp->is_io || !mono_wsq_local_pop (&data)))
- dequeue_or_steal (tp, &data, wsq);
+ if (must_die) {
+ mono_wsq_suspend (wsq);
+ } else {
+ if (tp->is_io || !mono_wsq_local_pop (&data))
+ dequeue_or_steal (tp, &data, wsq);
+ }
n_naps = 0;
while (!must_die && !data && n_naps < 4) {