[threadpool] Fix alignment of ThreadPoolWorkerCounter (#4871)
[mono.git] / mono / metadata / threadpool-worker-default.c
1 /**
2  * \file
3  * native threadpool worker
4  *
5  * Author:
6  *      Ludovic Henry (ludovic.henry@xamarin.com)
7  *
8  * Licensed under the MIT license. See LICENSE file in the project root for full license information.
9  */
10
11 #include <stdlib.h>
12 #define _USE_MATH_DEFINES // needed by MSVC to define math constants
13 #include <math.h>
14 #include <config.h>
15 #include <glib.h>
16
17 #include <mono/metadata/class-internals.h>
18 #include <mono/metadata/exception.h>
19 #include <mono/metadata/gc-internals.h>
20 #include <mono/metadata/object.h>
21 #include <mono/metadata/object-internals.h>
22 #include <mono/metadata/threadpool.h>
23 #include <mono/metadata/threadpool-worker.h>
24 #include <mono/metadata/threadpool-io.h>
25 #include <mono/metadata/w32event.h>
26 #include <mono/utils/atomic.h>
27 #include <mono/utils/mono-compiler.h>
28 #include <mono/utils/mono-complex.h>
29 #include <mono/utils/mono-logger.h>
30 #include <mono/utils/mono-logger-internals.h>
31 #include <mono/utils/mono-proclib.h>
32 #include <mono/utils/mono-threads.h>
33 #include <mono/utils/mono-time.h>
34 #include <mono/utils/mono-rand.h>
35 #include <mono/utils/refcount.h>
36 #include <mono/utils/w32api.h>
37
38 #define CPU_USAGE_LOW 80
39 #define CPU_USAGE_HIGH 95
40
41 #define MONITOR_INTERVAL 500 // ms
42 #define MONITOR_MINIMAL_LIFETIME 60 * 1000 // ms
43
44 #define WORKER_CREATION_MAX_PER_SEC 10
45
46 /* The exponent to apply to the gain. 1.0 means to use linear gain,
47  * higher values will enhance large moves and damp small ones.
48  * default: 2.0 */
49 #define HILL_CLIMBING_GAIN_EXPONENT 2.0
50
51 /* The 'cost' of a thread. 0 means drive for increased throughput regardless
52  * of thread count, higher values bias more against higher thread counts.
53  * default: 0.15 */
54 #define HILL_CLIMBING_BIAS 0.15
55
56 #define HILL_CLIMBING_WAVE_PERIOD 4
57 #define HILL_CLIMBING_MAX_WAVE_MAGNITUDE 20
58 #define HILL_CLIMBING_WAVE_MAGNITUDE_MULTIPLIER 1.0
59 #define HILL_CLIMBING_WAVE_HISTORY_SIZE 8
60 #define HILL_CLIMBING_TARGET_SIGNAL_TO_NOISE_RATIO 3.0
61 #define HILL_CLIMBING_MAX_CHANGE_PER_SECOND 4
62 #define HILL_CLIMBING_MAX_CHANGE_PER_SAMPLE 20
63 #define HILL_CLIMBING_SAMPLE_INTERVAL_LOW 10
64 #define HILL_CLIMBING_SAMPLE_INTERVAL_HIGH 200
65 #define HILL_CLIMBING_ERROR_SMOOTHING_FACTOR 0.01
66 #define HILL_CLIMBING_MAX_SAMPLE_ERROR_PERCENT 0.15
67
68 typedef enum {
69         TRANSITION_WARMUP,
70         TRANSITION_INITIALIZING,
71         TRANSITION_RANDOM_MOVE,
72         TRANSITION_CLIMBING_MOVE,
73         TRANSITION_CHANGE_POINT,
74         TRANSITION_STABILIZING,
75         TRANSITION_STARVATION,
76         TRANSITION_THREAD_TIMED_OUT,
77         TRANSITION_UNDEFINED,
78 } ThreadPoolHeuristicStateTransition;
79
80 typedef struct {
81         gint32 wave_period;
82         gint32 samples_to_measure;
83         gdouble target_throughput_ratio;
84         gdouble target_signal_to_noise_ratio;
85         gdouble max_change_per_second;
86         gdouble max_change_per_sample;
87         gint32 max_thread_wave_magnitude;
88         gint32 sample_interval_low;
89         gdouble thread_magnitude_multiplier;
90         gint32 sample_interval_high;
91         gdouble throughput_error_smoothing_factor;
92         gdouble gain_exponent;
93         gdouble max_sample_error;
94
95         gdouble current_control_setting;
96         gint64 total_samples;
97         gint16 last_thread_count;
98         gdouble elapsed_since_last_change;
99         gdouble completions_since_last_change;
100
101         gdouble average_throughput_noise;
102
103         gdouble *samples;
104         gdouble *thread_counts;
105
106         guint32 current_sample_interval;
107         gpointer random_interval_generator;
108
109         gint32 accumulated_completion_count;
110         gdouble accumulated_sample_duration;
111 } ThreadPoolHillClimbing;
112
113 typedef union {
114         struct {
115                 gint16 max_working; /* determined by heuristic */
116                 gint16 starting; /* starting, but not yet in worker_thread */
117                 gint16 working; /* executing worker_thread */
118                 gint16 parked; /* parked */
119         } _;
120         gint64 as_gint64;
121 } ThreadPoolWorkerCounter
122 #ifdef __GNUC__
123 __attribute__((aligned(64)))
124 #endif
125 ;
126
127 typedef struct {
128         MonoRefCount ref;
129
130         MonoThreadPoolWorkerCallback callback;
131
132         ThreadPoolWorkerCounter counters;
133
134         MonoCoopMutex parked_threads_lock;
135         gint32 parked_threads_count;
136         MonoCoopCond parked_threads_cond;
137
138         volatile gint32 work_items_count;
139
140         guint32 worker_creation_current_second;
141         guint32 worker_creation_current_count;
142         MonoCoopMutex worker_creation_lock;
143
144         gint32 heuristic_completions;
145         gint64 heuristic_sample_start;
146         gint64 heuristic_last_dequeue; // ms
147         gint64 heuristic_last_adjustment; // ms
148         gint64 heuristic_adjustment_interval; // ms
149         ThreadPoolHillClimbing heuristic_hill_climbing;
150         MonoCoopMutex heuristic_lock;
151
152         gint32 limit_worker_min;
153         gint32 limit_worker_max;
154
155         MonoCpuUsageState *cpu_usage_state;
156         gint32 cpu_usage;
157
158         /* suspended by the debugger */
159         gboolean suspended;
160
161         gint32 monitor_status;
162 } ThreadPoolWorker;
163
164 enum {
165         MONITOR_STATUS_REQUESTED,
166         MONITOR_STATUS_WAITING_FOR_REQUEST,
167         MONITOR_STATUS_NOT_RUNNING,
168 };
169
170 static ThreadPoolWorker worker;
171
172 #define COUNTER_CHECK(counter) \
173         do { \
174                 g_assert (counter._.max_working > 0); \
175                 g_assert (counter._.starting >= 0); \
176                 g_assert (counter._.working >= 0); \
177         } while (0)
178
179 #define COUNTER_ATOMIC(var,block) \
180         do { \
181                 ThreadPoolWorkerCounter __old; \
182                 do { \
183                         __old = COUNTER_READ (); \
184                         (var) = __old; \
185                         { block; } \
186                         COUNTER_CHECK (var); \
187                 } while (InterlockedCompareExchange64 (&worker.counters.as_gint64, (var).as_gint64, __old.as_gint64) != __old.as_gint64); \
188         } while (0)
189
190 static inline ThreadPoolWorkerCounter
191 COUNTER_READ (void)
192 {
193         ThreadPoolWorkerCounter counter;
194         counter.as_gint64 = InterlockedRead64 (&worker.counters.as_gint64);
195         return counter;
196 }
197
198 static gpointer
199 rand_create (void)
200 {
201         mono_rand_open ();
202         return mono_rand_init (NULL, 0);
203 }
204
205 static guint32
206 rand_next (gpointer *handle, guint32 min, guint32 max)
207 {
208         MonoError error;
209         guint32 val;
210         mono_rand_try_get_uint32 (handle, &val, min, max, &error);
211         // FIXME handle error
212         mono_error_assert_ok (&error);
213         return val;
214 }
215
216 static void
217 destroy (gpointer data)
218 {
219         mono_coop_mutex_destroy (&worker.parked_threads_lock);
220         mono_coop_cond_destroy (&worker.parked_threads_cond);
221
222         mono_coop_mutex_destroy (&worker.worker_creation_lock);
223
224         mono_coop_mutex_destroy (&worker.heuristic_lock);
225
226         g_free (worker.cpu_usage_state);
227 }
228
229 void
230 mono_threadpool_worker_init (MonoThreadPoolWorkerCallback callback)
231 {
232         ThreadPoolHillClimbing *hc;
233         const char *threads_per_cpu_env;
234         gint threads_per_cpu;
235         gint threads_count;
236
237         mono_refcount_init (&worker, destroy);
238
239         worker.callback = callback;
240
241         mono_coop_mutex_init (&worker.parked_threads_lock);
242         worker.parked_threads_count = 0;
243         mono_coop_cond_init (&worker.parked_threads_cond);
244
245         worker.worker_creation_current_second = -1;
246         mono_coop_mutex_init (&worker.worker_creation_lock);
247
248         worker.heuristic_adjustment_interval = 10;
249         mono_coop_mutex_init (&worker.heuristic_lock);
250
251         mono_rand_open ();
252
253         hc = &worker.heuristic_hill_climbing;
254
255         hc->wave_period = HILL_CLIMBING_WAVE_PERIOD;
256         hc->max_thread_wave_magnitude = HILL_CLIMBING_MAX_WAVE_MAGNITUDE;
257         hc->thread_magnitude_multiplier = (gdouble) HILL_CLIMBING_WAVE_MAGNITUDE_MULTIPLIER;
258         hc->samples_to_measure = hc->wave_period * HILL_CLIMBING_WAVE_HISTORY_SIZE;
259         hc->target_throughput_ratio = (gdouble) HILL_CLIMBING_BIAS;
260         hc->target_signal_to_noise_ratio = (gdouble) HILL_CLIMBING_TARGET_SIGNAL_TO_NOISE_RATIO;
261         hc->max_change_per_second = (gdouble) HILL_CLIMBING_MAX_CHANGE_PER_SECOND;
262         hc->max_change_per_sample = (gdouble) HILL_CLIMBING_MAX_CHANGE_PER_SAMPLE;
263         hc->sample_interval_low = HILL_CLIMBING_SAMPLE_INTERVAL_LOW;
264         hc->sample_interval_high = HILL_CLIMBING_SAMPLE_INTERVAL_HIGH;
265         hc->throughput_error_smoothing_factor = (gdouble) HILL_CLIMBING_ERROR_SMOOTHING_FACTOR;
266         hc->gain_exponent = (gdouble) HILL_CLIMBING_GAIN_EXPONENT;
267         hc->max_sample_error = (gdouble) HILL_CLIMBING_MAX_SAMPLE_ERROR_PERCENT;
268         hc->current_control_setting = 0;
269         hc->total_samples = 0;
270         hc->last_thread_count = 0;
271         hc->average_throughput_noise = 0;
272         hc->elapsed_since_last_change = 0;
273         hc->accumulated_completion_count = 0;
274         hc->accumulated_sample_duration = 0;
275         hc->samples = g_new0 (gdouble, hc->samples_to_measure);
276         hc->thread_counts = g_new0 (gdouble, hc->samples_to_measure);
277         hc->random_interval_generator = rand_create ();
278         hc->current_sample_interval = rand_next (&hc->random_interval_generator, hc->sample_interval_low, hc->sample_interval_high);
279
280         if (!(threads_per_cpu_env = g_getenv ("MONO_THREADS_PER_CPU")))
281                 threads_per_cpu = 1;
282         else
283                 threads_per_cpu = CLAMP (atoi (threads_per_cpu_env), 1, 50);
284
285         threads_count = mono_cpu_count () * threads_per_cpu;
286
287         worker.limit_worker_min = threads_count;
288
289 #if defined (PLATFORM_ANDROID) || defined (HOST_IOS)
290         worker.limit_worker_max = CLAMP (threads_count * 100, MIN (threads_count, 200), MAX (threads_count, 200));
291 #else
292         worker.limit_worker_max = threads_count * 100;
293 #endif
294
295         worker.counters._.max_working = worker.limit_worker_min;
296
297         worker.cpu_usage_state = g_new0 (MonoCpuUsageState, 1);
298
299         worker.suspended = FALSE;
300
301         worker.monitor_status = MONITOR_STATUS_NOT_RUNNING;
302 }
303
304 void
305 mono_threadpool_worker_cleanup (void)
306 {
307         mono_refcount_dec (&worker);
308 }
309
310 static void
311 work_item_push (void)
312 {
313         gint32 old, new;
314
315         do {
316                 old = InterlockedRead (&worker.work_items_count);
317                 g_assert (old >= 0);
318
319                 new = old + 1;
320         } while (InterlockedCompareExchange (&worker.work_items_count, new, old) != old);
321 }
322
323 static gboolean
324 work_item_try_pop (void)
325 {
326         gint32 old, new;
327
328         do {
329                 old = InterlockedRead (&worker.work_items_count);
330                 g_assert (old >= 0);
331
332                 if (old == 0)
333                         return FALSE;
334
335                 new = old - 1;
336         } while (InterlockedCompareExchange (&worker.work_items_count, new, old) != old);
337
338         return TRUE;
339 }
340
341 static gint32
342 work_item_count (void)
343 {
344         return InterlockedRead (&worker.work_items_count);
345 }
346
347 static void worker_request (void);
348
349 void
350 mono_threadpool_worker_request (void)
351 {
352         if (!mono_refcount_tryinc (&worker))
353                 return;
354
355         work_item_push ();
356
357         worker_request ();
358
359         mono_refcount_dec (&worker);
360 }
361
362 static void
363 worker_wait_interrupt (gpointer unused)
364 {
365         /* If the runtime is not shutting down, we are not using this mechanism to wake up a unparked thread, and if the
366          * runtime is shutting down, then we need to wake up ALL the threads.
367          * It might be a bit wasteful, but I witnessed shutdown hang where the main thread would abort and then wait for all
368          * background threads to exit (see mono_thread_manage). This would go wrong because not all threadpool threads would
369          * be unparked. It would end up getting unstucked because of the timeout, but that would delay shutdown by 5-60s. */
370         if (!mono_runtime_is_shutting_down ())
371                 return;
372
373         if (!mono_refcount_tryinc (&worker))
374                 return;
375
376         mono_coop_mutex_lock (&worker.parked_threads_lock);
377         mono_coop_cond_broadcast (&worker.parked_threads_cond);
378         mono_coop_mutex_unlock (&worker.parked_threads_lock);
379
380         mono_refcount_dec (&worker);
381 }
382
383 /* return TRUE if timeout, FALSE otherwise (worker unpark or interrupt) */
384 static gboolean
385 worker_park (void)
386 {
387         gboolean timeout = FALSE;
388         gboolean interrupted = FALSE;
389
390         mono_trace (G_LOG_LEVEL_INFO, MONO_TRACE_THREADPOOL, "[%p] worker parking", mono_native_thread_id_get ());
391
392         mono_coop_mutex_lock (&worker.parked_threads_lock);
393
394         if (!mono_runtime_is_shutting_down ()) {
395                 static gpointer rand_handle = NULL;
396                 MonoInternalThread *thread;
397                 ThreadPoolWorkerCounter counter;
398
399                 if (!rand_handle)
400                         rand_handle = rand_create ();
401                 g_assert (rand_handle);
402
403                 thread = mono_thread_internal_current ();
404                 g_assert (thread);
405
406                 COUNTER_ATOMIC (counter, {
407                         counter._.working --;
408                         counter._.parked ++;
409                 });
410
411                 worker.parked_threads_count += 1;
412
413                 mono_thread_info_install_interrupt (worker_wait_interrupt, NULL, &interrupted);
414                 if (interrupted)
415                         goto done;
416
417                 if (mono_coop_cond_timedwait (&worker.parked_threads_cond, &worker.parked_threads_lock, rand_next (&rand_handle, 5 * 1000, 60 * 1000)) != 0)
418                         timeout = TRUE;
419
420                 mono_thread_info_uninstall_interrupt (&interrupted);
421
422 done:
423                 worker.parked_threads_count -= 1;
424
425                 COUNTER_ATOMIC (counter, {
426                         counter._.working ++;
427                         counter._.parked --;
428                 });
429         }
430
431         mono_coop_mutex_unlock (&worker.parked_threads_lock);
432
433         mono_trace (G_LOG_LEVEL_INFO, MONO_TRACE_THREADPOOL, "[%p] worker unparking, timeout? %s interrupted? %s",
434                 mono_native_thread_id_get (), timeout ? "yes" : "no", interrupted ? "yes" : "no");
435
436         return timeout;
437 }
438
439 static gboolean
440 worker_try_unpark (void)
441 {
442         gboolean res = FALSE;
443
444         mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try unpark worker", mono_native_thread_id_get ());
445
446         mono_coop_mutex_lock (&worker.parked_threads_lock);
447         if (worker.parked_threads_count > 0) {
448                 mono_coop_cond_signal (&worker.parked_threads_cond);
449                 res = TRUE;
450         }
451         mono_coop_mutex_unlock (&worker.parked_threads_lock);
452
453         mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try unpark worker, success? %s", mono_native_thread_id_get (), res ? "yes" : "no");
454
455         return res;
456 }
457
458 static gsize WINAPI
459 worker_thread (gpointer unused)
460 {
461         MonoInternalThread *thread;
462         ThreadPoolWorkerCounter counter;
463
464         mono_trace (G_LOG_LEVEL_INFO, MONO_TRACE_THREADPOOL, "[%p] worker starting", mono_native_thread_id_get ());
465
466         if (!mono_refcount_tryinc (&worker))
467                 return 0;
468
469         COUNTER_ATOMIC (counter, {
470                 counter._.starting --;
471                 counter._.working ++;
472         });
473
474         thread = mono_thread_internal_current ();
475         g_assert (thread);
476
477         while (!mono_runtime_is_shutting_down ()) {
478                 if (mono_thread_interruption_checkpoint ())
479                         continue;
480
481                 if (!work_item_try_pop ()) {
482                         gboolean timeout;
483
484                         timeout = worker_park ();
485                         if (timeout)
486                                 break;
487
488                         continue;
489                 }
490
491                 mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] worker executing",
492                         mono_native_thread_id_get ());
493
494                 worker.callback ();
495         }
496
497         COUNTER_ATOMIC (counter, {
498                 counter._.working --;
499         });
500
501         mono_trace (G_LOG_LEVEL_INFO, MONO_TRACE_THREADPOOL, "[%p] worker finishing", mono_native_thread_id_get ());
502
503         mono_refcount_dec (&worker);
504
505         return 0;
506 }
507
508 static gboolean
509 worker_try_create (void)
510 {
511         MonoError error;
512         MonoInternalThread *thread;
513         gint64 current_ticks;
514         gint32 now;
515         ThreadPoolWorkerCounter counter;
516
517         if (mono_runtime_is_shutting_down ())
518                 return FALSE;
519
520         mono_coop_mutex_lock (&worker.worker_creation_lock);
521
522         mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try create worker", mono_native_thread_id_get ());
523
524         current_ticks = mono_100ns_ticks ();
525         if (0 == current_ticks) {
526                 g_warning ("failed to get 100ns ticks");
527         } else {
528                 now = current_ticks / (10 * 1000 * 1000);
529                 if (worker.worker_creation_current_second != now) {
530                         worker.worker_creation_current_second = now;
531                         worker.worker_creation_current_count = 0;
532                 } else {
533                         g_assert (worker.worker_creation_current_count <= WORKER_CREATION_MAX_PER_SEC);
534                         if (worker.worker_creation_current_count == WORKER_CREATION_MAX_PER_SEC) {
535                                 mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try create worker, failed: maximum number of worker created per second reached, current count = %d",
536                                         mono_native_thread_id_get (), worker.worker_creation_current_count);
537                                 mono_coop_mutex_unlock (&worker.worker_creation_lock);
538                                 return FALSE;
539                         }
540                 }
541         }
542
543         COUNTER_ATOMIC (counter, {
544                 if (counter._.working >= counter._.max_working) {
545                         mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try create worker, failed: maximum number of working threads reached",
546                                 mono_native_thread_id_get ());
547                         mono_coop_mutex_unlock (&worker.worker_creation_lock);
548                         return FALSE;
549                 }
550                 counter._.starting ++;
551         });
552
553         thread = mono_thread_create_internal (mono_get_root_domain (), worker_thread, NULL, MONO_THREAD_CREATE_FLAGS_THREADPOOL, &error);
554         if (!thread) {
555                 mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try create worker, failed: could not create thread due to %s", mono_native_thread_id_get (), mono_error_get_message (&error));
556                 mono_error_cleanup (&error);
557
558                 COUNTER_ATOMIC (counter, {
559                         counter._.starting --;
560                 });
561
562                 mono_coop_mutex_unlock (&worker.worker_creation_lock);
563
564                 return FALSE;
565         }
566
567         worker.worker_creation_current_count += 1;
568
569         mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try create worker, created %p, now = %d count = %d",
570                 mono_native_thread_id_get (), (gpointer) thread->tid, now, worker.worker_creation_current_count);
571
572         mono_coop_mutex_unlock (&worker.worker_creation_lock);
573         return TRUE;
574 }
575
576 static void monitor_ensure_running (void);
577
578 static void
579 worker_request (void)
580 {
581         if (worker.suspended)
582                 return;
583
584         monitor_ensure_running ();
585
586         if (worker_try_unpark ()) {
587                 mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] request worker, unparked", mono_native_thread_id_get ());
588                 return;
589         }
590
591         if (worker_try_create ()) {
592                 mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] request worker, created", mono_native_thread_id_get ());
593                 return;
594         }
595
596         mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] request worker, failed", mono_native_thread_id_get ());
597 }
598
599 static gboolean
600 monitor_should_keep_running (void)
601 {
602         static gint64 last_should_keep_running = -1;
603
604         g_assert (worker.monitor_status == MONITOR_STATUS_WAITING_FOR_REQUEST || worker.monitor_status == MONITOR_STATUS_REQUESTED);
605
606         if (InterlockedExchange (&worker.monitor_status, MONITOR_STATUS_WAITING_FOR_REQUEST) == MONITOR_STATUS_WAITING_FOR_REQUEST) {
607                 gboolean should_keep_running = TRUE, force_should_keep_running = FALSE;
608
609                 if (mono_runtime_is_shutting_down ()) {
610                         should_keep_running = FALSE;
611                 } else {
612                         if (work_item_count () == 0)
613                                 should_keep_running = FALSE;
614
615                         if (!should_keep_running) {
616                                 if (last_should_keep_running == -1 || mono_100ns_ticks () - last_should_keep_running < MONITOR_MINIMAL_LIFETIME * 1000 * 10) {
617                                         should_keep_running = force_should_keep_running = TRUE;
618                                 }
619                         }
620                 }
621
622                 if (should_keep_running) {
623                         if (last_should_keep_running == -1 || !force_should_keep_running)
624                                 last_should_keep_running = mono_100ns_ticks ();
625                 } else {
626                         last_should_keep_running = -1;
627                         if (InterlockedCompareExchange (&worker.monitor_status, MONITOR_STATUS_NOT_RUNNING, MONITOR_STATUS_WAITING_FOR_REQUEST) == MONITOR_STATUS_WAITING_FOR_REQUEST)
628                                 return FALSE;
629                 }
630         }
631
632         g_assert (worker.monitor_status == MONITOR_STATUS_WAITING_FOR_REQUEST || worker.monitor_status == MONITOR_STATUS_REQUESTED);
633
634         return TRUE;
635 }
636
637 static gboolean
638 monitor_sufficient_delay_since_last_dequeue (void)
639 {
640         gint64 threshold;
641
642         if (worker.cpu_usage < CPU_USAGE_LOW) {
643                 threshold = MONITOR_INTERVAL;
644         } else {
645                 ThreadPoolWorkerCounter counter;
646                 counter = COUNTER_READ ();
647                 threshold = counter._.max_working * MONITOR_INTERVAL * 2;
648         }
649
650         return mono_msec_ticks () >= worker.heuristic_last_dequeue + threshold;
651 }
652
653 static void hill_climbing_force_change (gint16 new_thread_count, ThreadPoolHeuristicStateTransition transition);
654
655 static gsize WINAPI
656 monitor_thread (gpointer unused)
657 {
658         MonoInternalThread *internal;
659         guint i;
660
661         if (!mono_refcount_tryinc (&worker))
662                 return 0;
663
664         internal = mono_thread_internal_current ();
665         g_assert (internal);
666
667         mono_cpu_usage (worker.cpu_usage_state);
668
669         // printf ("monitor_thread: start\n");
670
671         mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] monitor thread, started", mono_native_thread_id_get ());
672
673         do {
674                 ThreadPoolWorkerCounter counter;
675                 gboolean limit_worker_max_reached;
676                 gint32 interval_left = MONITOR_INTERVAL;
677                 gint32 awake = 0; /* number of spurious awakes we tolerate before doing a round of rebalancing */
678
679                 g_assert (worker.monitor_status != MONITOR_STATUS_NOT_RUNNING);
680
681                 // counter = COUNTER_READ ();
682                 // printf ("monitor_thread: starting = %d working = %d parked = %d max_working = %d\n",
683                 //      counter._.starting, counter._.working, counter._.parked, counter._.max_working);
684
685                 do {
686                         gint64 ts;
687                         gboolean alerted = FALSE;
688
689                         if (mono_runtime_is_shutting_down ())
690                                 break;
691
692                         ts = mono_msec_ticks ();
693                         if (mono_thread_info_sleep (interval_left, &alerted) == 0)
694                                 break;
695                         interval_left -= mono_msec_ticks () - ts;
696
697                         mono_thread_interruption_checkpoint ();
698                 } while (interval_left > 0 && ++awake < 10);
699
700                 if (mono_runtime_is_shutting_down ())
701                         continue;
702
703                 if (worker.suspended)
704                         continue;
705
706                 if (work_item_count () == 0)
707                         continue;
708
709                 worker.cpu_usage = mono_cpu_usage (worker.cpu_usage_state);
710
711                 if (!monitor_sufficient_delay_since_last_dequeue ())
712                         continue;
713
714                 limit_worker_max_reached = FALSE;
715
716                 COUNTER_ATOMIC (counter, {
717                         if (counter._.max_working >= worker.limit_worker_max) {
718                                 limit_worker_max_reached = TRUE;
719                                 break;
720                         }
721                         counter._.max_working ++;
722                 });
723
724                 if (limit_worker_max_reached)
725                         continue;
726
727                 hill_climbing_force_change (counter._.max_working, TRANSITION_STARVATION);
728
729                 for (i = 0; i < 5; ++i) {
730                         if (mono_runtime_is_shutting_down ())
731                                 break;
732
733                         if (worker_try_unpark ()) {
734                                 mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] monitor thread, unparked", mono_native_thread_id_get ());
735                                 break;
736                         }
737
738                         if (worker_try_create ()) {
739                                 mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] monitor thread, created", mono_native_thread_id_get ());
740                                 break;
741                         }
742                 }
743         } while (monitor_should_keep_running ());
744
745         // printf ("monitor_thread: stop\n");
746
747         mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] monitor thread, finished", mono_native_thread_id_get ());
748
749         mono_refcount_dec (&worker);
750         return 0;
751 }
752
753 static void
754 monitor_ensure_running (void)
755 {
756         MonoError error;
757         for (;;) {
758                 switch (worker.monitor_status) {
759                 case MONITOR_STATUS_REQUESTED:
760                         // printf ("monitor_thread: requested\n");
761                         return;
762                 case MONITOR_STATUS_WAITING_FOR_REQUEST:
763                         // printf ("monitor_thread: waiting for request\n");
764                         InterlockedCompareExchange (&worker.monitor_status, MONITOR_STATUS_REQUESTED, MONITOR_STATUS_WAITING_FOR_REQUEST);
765                         break;
766                 case MONITOR_STATUS_NOT_RUNNING:
767                         // printf ("monitor_thread: not running\n");
768                         if (mono_runtime_is_shutting_down ())
769                                 return;
770                         if (InterlockedCompareExchange (&worker.monitor_status, MONITOR_STATUS_REQUESTED, MONITOR_STATUS_NOT_RUNNING) == MONITOR_STATUS_NOT_RUNNING) {
771                                 // printf ("monitor_thread: creating\n");
772                                 if (!mono_thread_create_internal (mono_get_root_domain (), monitor_thread, NULL, MONO_THREAD_CREATE_FLAGS_THREADPOOL | MONO_THREAD_CREATE_FLAGS_SMALL_STACK, &error)) {
773                                         // printf ("monitor_thread: creating failed\n");
774                                         worker.monitor_status = MONITOR_STATUS_NOT_RUNNING;
775                                         mono_error_cleanup (&error);
776                                         mono_refcount_dec (&worker);
777                                 }
778                                 return;
779                         }
780                         break;
781                 default: g_assert_not_reached ();
782                 }
783         }
784 }
785
786 static void
787 hill_climbing_change_thread_count (gint16 new_thread_count, ThreadPoolHeuristicStateTransition transition)
788 {
789         ThreadPoolHillClimbing *hc;
790
791         hc = &worker.heuristic_hill_climbing;
792
793         mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] hill climbing, change max number of threads %d", mono_native_thread_id_get (), new_thread_count);
794
795         hc->last_thread_count = new_thread_count;
796         hc->current_sample_interval = rand_next (&hc->random_interval_generator, hc->sample_interval_low, hc->sample_interval_high);
797         hc->elapsed_since_last_change = 0;
798         hc->completions_since_last_change = 0;
799 }
800
801 static void
802 hill_climbing_force_change (gint16 new_thread_count, ThreadPoolHeuristicStateTransition transition)
803 {
804         ThreadPoolHillClimbing *hc;
805
806         hc = &worker.heuristic_hill_climbing;
807
808         if (new_thread_count != hc->last_thread_count) {
809                 hc->current_control_setting += new_thread_count - hc->last_thread_count;
810                 hill_climbing_change_thread_count (new_thread_count, transition);
811         }
812 }
813
814 static double_complex
815 hill_climbing_get_wave_component (gdouble *samples, guint sample_count, gdouble period)
816 {
817         ThreadPoolHillClimbing *hc;
818         gdouble w, cosine, sine, coeff, q0, q1, q2;
819         guint i;
820
821         g_assert (sample_count >= period);
822         g_assert (period >= 2);
823
824         hc = &worker.heuristic_hill_climbing;
825
826         w = 2.0 * M_PI / period;
827         cosine = cos (w);
828         sine = sin (w);
829         coeff = 2.0 * cosine;
830         q0 = q1 = q2 = 0;
831
832         for (i = 0; i < sample_count; ++i) {
833                 q0 = coeff * q1 - q2 + samples [(hc->total_samples - sample_count + i) % hc->samples_to_measure];
834                 q2 = q1;
835                 q1 = q0;
836         }
837
838         return mono_double_complex_scalar_div (mono_double_complex_make (q1 - q2 * cosine, (q2 * sine)), ((gdouble)sample_count));
839 }
840
841 static gint16
842 hill_climbing_update (gint16 current_thread_count, guint32 sample_duration, gint32 completions, gint64 *adjustment_interval)
843 {
844         ThreadPoolHillClimbing *hc;
845         ThreadPoolHeuristicStateTransition transition;
846         gdouble throughput;
847         gdouble throughput_error_estimate;
848         gdouble confidence;
849         gdouble move;
850         gdouble gain;
851         gint sample_index;
852         gint sample_count;
853         gint new_thread_wave_magnitude;
854         gint new_thread_count;
855         double_complex thread_wave_component;
856         double_complex throughput_wave_component;
857         double_complex ratio;
858
859         g_assert (adjustment_interval);
860
861         hc = &worker.heuristic_hill_climbing;
862
863         /* If someone changed the thread count without telling us, update our records accordingly. */
864         if (current_thread_count != hc->last_thread_count)
865                 hill_climbing_force_change (current_thread_count, TRANSITION_INITIALIZING);
866
867         /* Update the cumulative stats for this thread count */
868         hc->elapsed_since_last_change += sample_duration;
869         hc->completions_since_last_change += completions;
870
871         /* Add in any data we've already collected about this sample */
872         sample_duration += hc->accumulated_sample_duration;
873         completions += hc->accumulated_completion_count;
874
875         /* We need to make sure we're collecting reasonably accurate data. Since we're just counting the end
876          * of each work item, we are goinng to be missing some data about what really happened during the
877          * sample interval. The count produced by each thread includes an initial work item that may have
878          * started well before the start of the interval, and each thread may have been running some new
879          * work item for some time before the end of the interval, which did not yet get counted. So
880          * our count is going to be off by +/- threadCount workitems.
881          *
882          * The exception is that the thread that reported to us last time definitely wasn't running any work
883          * at that time, and the thread that's reporting now definitely isn't running a work item now. So
884          * we really only need to consider threadCount-1 threads.
885          *
886          * Thus the percent error in our count is +/- (threadCount-1)/numCompletions.
887          *
888          * We cannot rely on the frequency-domain analysis we'll be doing later to filter out this error, because
889          * of the way it accumulates over time. If this sample is off by, say, 33% in the negative direction,
890          * then the next one likely will be too. The one after that will include the sum of the completions
891          * we missed in the previous samples, and so will be 33% positive. So every three samples we'll have
892          * two "low" samples and one "high" sample. This will appear as periodic variation right in the frequency
893          * range we're targeting, which will not be filtered by the frequency-domain translation. */
894         if (hc->total_samples > 0 && ((current_thread_count - 1.0) / completions) >= hc->max_sample_error) {
895                 /* Not accurate enough yet. Let's accumulate the data so
896                  * far, and tell the ThreadPoolWorker to collect a little more. */
897                 hc->accumulated_sample_duration = sample_duration;
898                 hc->accumulated_completion_count = completions;
899                 *adjustment_interval = 10;
900                 return current_thread_count;
901         }
902
903         /* We've got enouugh data for our sample; reset our accumulators for next time. */
904         hc->accumulated_sample_duration = 0;
905         hc->accumulated_completion_count = 0;
906
907         /* Add the current thread count and throughput sample to our history. */
908         throughput = ((gdouble) completions) / sample_duration;
909
910         sample_index = hc->total_samples % hc->samples_to_measure;
911         hc->samples [sample_index] = throughput;
912         hc->thread_counts [sample_index] = current_thread_count;
913         hc->total_samples ++;
914
915         /* Set up defaults for our metrics. */
916         thread_wave_component = mono_double_complex_make(0, 0);
917         throughput_wave_component = mono_double_complex_make(0, 0);
918         throughput_error_estimate = 0;
919         ratio = mono_double_complex_make(0, 0);
920         confidence = 0;
921
922         transition = TRANSITION_WARMUP;
923
924         /* How many samples will we use? It must be at least the three wave periods we're looking for, and it must also
925          * be a whole multiple of the primary wave's period; otherwise the frequency we're looking for will fall between
926          * two frequency bands in the Fourier analysis, and we won't be able to measure it accurately. */
927         sample_count = ((gint) MIN (hc->total_samples - 1, hc->samples_to_measure) / hc->wave_period) * hc->wave_period;
928
929         if (sample_count > hc->wave_period) {
930                 guint i;
931                 gdouble average_throughput;
932                 gdouble average_thread_count;
933                 gdouble sample_sum = 0;
934                 gdouble thread_sum = 0;
935
936                 /* Average the throughput and thread count samples, so we can scale the wave magnitudes later. */
937                 for (i = 0; i < sample_count; ++i) {
938                         guint j = (hc->total_samples - sample_count + i) % hc->samples_to_measure;
939                         sample_sum += hc->samples [j];
940                         thread_sum += hc->thread_counts [j];
941                 }
942
943                 average_throughput = sample_sum / sample_count;
944                 average_thread_count = thread_sum / sample_count;
945
946                 if (average_throughput > 0 && average_thread_count > 0) {
947                         gdouble noise_for_confidence, adjacent_period_1, adjacent_period_2;
948
949                         /* Calculate the periods of the adjacent frequency bands we'll be using to
950                          * measure noise levels. We want the two adjacent Fourier frequency bands. */
951                         adjacent_period_1 = sample_count / (((gdouble) sample_count) / ((gdouble) hc->wave_period) + 1);
952                         adjacent_period_2 = sample_count / (((gdouble) sample_count) / ((gdouble) hc->wave_period) - 1);
953
954                         /* Get the the three different frequency components of the throughput (scaled by average
955                          * throughput). Our "error" estimate (the amount of noise that might be present in the
956                          * frequency band we're really interested in) is the average of the adjacent bands. */
957                         throughput_wave_component = mono_double_complex_scalar_div (hill_climbing_get_wave_component (hc->samples, sample_count, hc->wave_period), average_throughput);
958                         throughput_error_estimate = cabs (mono_double_complex_scalar_div (hill_climbing_get_wave_component (hc->samples, sample_count, adjacent_period_1), average_throughput));
959
960                         if (adjacent_period_2 <= sample_count) {
961                                 throughput_error_estimate = MAX (throughput_error_estimate, cabs (mono_double_complex_scalar_div (hill_climbing_get_wave_component (
962                                         hc->samples, sample_count, adjacent_period_2), average_throughput)));
963                         }
964
965                         /* Do the same for the thread counts, so we have something to compare to. We don't
966                          * measure thread count noise, because there is none; these are exact measurements. */
967                         thread_wave_component = mono_double_complex_scalar_div (hill_climbing_get_wave_component (hc->thread_counts, sample_count, hc->wave_period), average_thread_count);
968
969                         /* Update our moving average of the throughput noise. We'll use this
970                          * later as feedback to determine the new size of the thread wave. */
971                         if (hc->average_throughput_noise == 0) {
972                                 hc->average_throughput_noise = throughput_error_estimate;
973                         } else {
974                                 hc->average_throughput_noise = (hc->throughput_error_smoothing_factor * throughput_error_estimate)
975                                         + ((1.0 + hc->throughput_error_smoothing_factor) * hc->average_throughput_noise);
976                         }
977
978                         if (cabs (thread_wave_component) > 0) {
979                                 /* Adjust the throughput wave so it's centered around the target wave,
980                                  * and then calculate the adjusted throughput/thread ratio. */
981                                 ratio = mono_double_complex_div (mono_double_complex_sub (throughput_wave_component, mono_double_complex_scalar_mul(thread_wave_component, hc->target_throughput_ratio)), thread_wave_component);
982                                 transition = TRANSITION_CLIMBING_MOVE;
983                         } else {
984                                 ratio = mono_double_complex_make (0, 0);
985                                 transition = TRANSITION_STABILIZING;
986                         }
987
988                         noise_for_confidence = MAX (hc->average_throughput_noise, throughput_error_estimate);
989                         if (noise_for_confidence > 0) {
990                                 confidence = cabs (thread_wave_component) / noise_for_confidence / hc->target_signal_to_noise_ratio;
991                         } else {
992                                 /* there is no noise! */
993                                 confidence = 1.0;
994                         }
995                 }
996         }
997
998         /* We use just the real part of the complex ratio we just calculated. If the throughput signal
999          * is exactly in phase with the thread signal, this will be the same as taking the magnitude of
1000          * the complex move and moving that far up. If they're 180 degrees out of phase, we'll move
1001          * backward (because this indicates that our changes are having the opposite of the intended effect).
1002          * If they're 90 degrees out of phase, we won't move at all, because we can't tell wether we're
1003          * having a negative or positive effect on throughput. */
1004         move = creal (ratio);
1005         move = CLAMP (move, -1.0, 1.0);
1006
1007         /* Apply our confidence multiplier. */
1008         move *= CLAMP (confidence, -1.0, 1.0);
1009
1010         /* Now apply non-linear gain, such that values around zero are attenuated, while higher values
1011          * are enhanced. This allows us to move quickly if we're far away from the target, but more slowly
1012         * if we're getting close, giving us rapid ramp-up without wild oscillations around the target. */
1013         gain = hc->max_change_per_second * sample_duration;
1014         move = pow (fabs (move), hc->gain_exponent) * (move >= 0.0 ? 1 : -1) * gain;
1015         move = MIN (move, hc->max_change_per_sample);
1016
1017         /* If the result was positive, and CPU is > 95%, refuse the move. */
1018         if (move > 0.0 && worker.cpu_usage > CPU_USAGE_HIGH)
1019                 move = 0.0;
1020
1021         /* Apply the move to our control setting. */
1022         hc->current_control_setting += move;
1023
1024         /* Calculate the new thread wave magnitude, which is based on the moving average we've been keeping of the
1025          * throughput error.  This average starts at zero, so we'll start with a nice safe little wave at first. */
1026         new_thread_wave_magnitude = (gint)(0.5 + (hc->current_control_setting * hc->average_throughput_noise
1027                 * hc->target_signal_to_noise_ratio * hc->thread_magnitude_multiplier * 2.0));
1028         new_thread_wave_magnitude = CLAMP (new_thread_wave_magnitude, 1, hc->max_thread_wave_magnitude);
1029
1030         /* Make sure our control setting is within the ThreadPoolWorker's limits. */
1031         hc->current_control_setting = CLAMP (hc->current_control_setting, worker.limit_worker_min, worker.limit_worker_max - new_thread_wave_magnitude);
1032
1033         /* Calculate the new thread count (control setting + square wave). */
1034         new_thread_count = (gint)(hc->current_control_setting + new_thread_wave_magnitude * ((hc->total_samples / (hc->wave_period / 2)) % 2));
1035
1036         /* Make sure the new thread count doesn't exceed the ThreadPoolWorker's limits. */
1037         new_thread_count = CLAMP (new_thread_count, worker.limit_worker_min, worker.limit_worker_max);
1038
1039         if (new_thread_count != current_thread_count)
1040                 hill_climbing_change_thread_count (new_thread_count, transition);
1041
1042         if (creal (ratio) < 0.0 && new_thread_count == worker.limit_worker_min)
1043                 *adjustment_interval = (gint)(0.5 + hc->current_sample_interval * (10.0 * MAX (-1.0 * creal (ratio), 1.0)));
1044         else
1045                 *adjustment_interval = hc->current_sample_interval;
1046
1047         return new_thread_count;
1048 }
1049
1050 static gboolean
1051 heuristic_should_adjust (void)
1052 {
1053         if (worker.heuristic_last_dequeue > worker.heuristic_last_adjustment + worker.heuristic_adjustment_interval) {
1054                 ThreadPoolWorkerCounter counter;
1055                 counter = COUNTER_READ ();
1056                 if (counter._.working <= counter._.max_working)
1057                         return TRUE;
1058         }
1059
1060         return FALSE;
1061 }
1062
1063 static void
1064 heuristic_adjust (void)
1065 {
1066         if (mono_coop_mutex_trylock (&worker.heuristic_lock) == 0) {
1067                 gint32 completions = InterlockedExchange (&worker.heuristic_completions, 0);
1068                 gint64 sample_end = mono_msec_ticks ();
1069                 gint64 sample_duration = sample_end - worker.heuristic_sample_start;
1070
1071                 if (sample_duration >= worker.heuristic_adjustment_interval / 2) {
1072                         ThreadPoolWorkerCounter counter;
1073                         gint16 new_thread_count;
1074
1075                         counter = COUNTER_READ ();
1076                         new_thread_count = hill_climbing_update (counter._.max_working, sample_duration, completions, &worker.heuristic_adjustment_interval);
1077
1078                         COUNTER_ATOMIC (counter, {
1079                                 counter._.max_working = new_thread_count;
1080                         });
1081
1082                         if (new_thread_count > counter._.max_working)
1083                                 worker_request ();
1084
1085                         worker.heuristic_sample_start = sample_end;
1086                         worker.heuristic_last_adjustment = mono_msec_ticks ();
1087                 }
1088
1089                 mono_coop_mutex_unlock (&worker.heuristic_lock);
1090         }
1091 }
1092
1093 static void
1094 heuristic_notify_work_completed (void)
1095 {
1096         InterlockedIncrement (&worker.heuristic_completions);
1097         worker.heuristic_last_dequeue = mono_msec_ticks ();
1098
1099         if (heuristic_should_adjust ())
1100                 heuristic_adjust ();
1101 }
1102
1103 gboolean
1104 mono_threadpool_worker_notify_completed (void)
1105 {
1106         ThreadPoolWorkerCounter counter;
1107
1108         heuristic_notify_work_completed ();
1109
1110         counter = COUNTER_READ ();
1111         return counter._.working <= counter._.max_working;
1112 }
1113
1114 gint32
1115 mono_threadpool_worker_get_min (void)
1116 {
1117         gint32 ret;
1118
1119         if (!mono_refcount_tryinc (&worker))
1120                 return 0;
1121
1122         ret = worker.limit_worker_min;
1123
1124         mono_refcount_dec (&worker);
1125         return ret;
1126 }
1127
1128 gboolean
1129 mono_threadpool_worker_set_min (gint32 value)
1130 {
1131         if (value <= 0 || value > worker.limit_worker_max)
1132                 return FALSE;
1133
1134         if (!mono_refcount_tryinc (&worker))
1135                 return FALSE;
1136
1137         worker.limit_worker_min = value;
1138
1139         mono_refcount_dec (&worker);
1140         return TRUE;
1141 }
1142
1143 gint32
1144 mono_threadpool_worker_get_max (void)
1145 {
1146         gint32 ret;
1147
1148         if (!mono_refcount_tryinc (&worker))
1149                 return 0;
1150
1151         ret = worker.limit_worker_max;
1152
1153         mono_refcount_dec (&worker);
1154         return ret;
1155 }
1156
1157 gboolean
1158 mono_threadpool_worker_set_max (gint32 value)
1159 {
1160         gint32 cpu_count;
1161
1162         cpu_count = mono_cpu_count ();
1163         if (value < worker.limit_worker_min || value < cpu_count)
1164                 return FALSE;
1165
1166         if (!mono_refcount_tryinc (&worker))
1167                 return FALSE;
1168
1169         worker.limit_worker_max = value;
1170
1171         mono_refcount_dec (&worker);
1172         return TRUE;
1173 }
1174
1175 void
1176 mono_threadpool_worker_set_suspended (gboolean suspended)
1177 {
1178         if (!mono_refcount_tryinc (&worker))
1179                 return;
1180
1181         worker.suspended = suspended;
1182         if (!suspended)
1183                 worker_request ();
1184
1185         mono_refcount_dec (&worker);
1186 }