aa8289cd51da288d5aae492e11daab745f5bcef8
[mono.git] / mono / metadata / threadpool-ms.c
1 /*
2  * threadpool-ms.c: Microsoft threadpool runtime support
3  *
4  * Author:
5  *      Ludovic Henry (ludovic.henry@xamarin.com)
6  *
7  * Copyright 2015 Xamarin, Inc (http://www.xamarin.com)
8  * Licensed under the MIT license. See LICENSE file in the project root for full license information.
9  */
10
11 //
12 // Copyright (c) Microsoft. All rights reserved.
13 // Licensed under the MIT license. See LICENSE file in the project root for full license information.
14 //
15 // Files:
16 //  - src/vm/comthreadpool.cpp
17 //  - src/vm/win32threadpoolcpp
18 //  - src/vm/threadpoolrequest.cpp
19 //  - src/vm/hillclimbing.cpp
20 //
21 // Ported from C++ to C and adjusted to Mono runtime
22
23 #include <stdlib.h>
24 #define _USE_MATH_DEFINES // needed by MSVC to define math constants
25 #include <math.h>
26 #include <config.h>
27 #include <glib.h>
28
29 #include <mono/metadata/class-internals.h>
30 #include <mono/metadata/exception.h>
31 #include <mono/metadata/gc-internals.h>
32 #include <mono/metadata/object.h>
33 #include <mono/metadata/object-internals.h>
34 #include <mono/metadata/threadpool-ms.h>
35 #include <mono/metadata/threadpool-ms-io.h>
36 #include <mono/metadata/w32event.h>
37 #include <mono/utils/atomic.h>
38 #include <mono/utils/mono-compiler.h>
39 #include <mono/utils/mono-complex.h>
40 #include <mono/utils/mono-lazy-init.h>
41 #include <mono/utils/mono-logger.h>
42 #include <mono/utils/mono-logger-internals.h>
43 #include <mono/utils/mono-proclib.h>
44 #include <mono/utils/mono-threads.h>
45 #include <mono/utils/mono-time.h>
46 #include <mono/utils/mono-rand.h>
47 #include <mono/io-layer/io-layer.h>
48
49 #define CPU_USAGE_LOW 80
50 #define CPU_USAGE_HIGH 95
51
52 #define MONITOR_INTERVAL 500 // ms
53 #define MONITOR_MINIMAL_LIFETIME 60 * 1000 // ms
54
55 #define WORKER_CREATION_MAX_PER_SEC 10
56
57 /* The exponent to apply to the gain. 1.0 means to use linear gain,
58  * higher values will enhance large moves and damp small ones.
59  * default: 2.0 */
60 #define HILL_CLIMBING_GAIN_EXPONENT 2.0
61
62 /* The 'cost' of a thread. 0 means drive for increased throughput regardless
63  * of thread count, higher values bias more against higher thread counts.
64  * default: 0.15 */
65 #define HILL_CLIMBING_BIAS 0.15
66
67 #define HILL_CLIMBING_WAVE_PERIOD 4
68 #define HILL_CLIMBING_MAX_WAVE_MAGNITUDE 20
69 #define HILL_CLIMBING_WAVE_MAGNITUDE_MULTIPLIER 1.0
70 #define HILL_CLIMBING_WAVE_HISTORY_SIZE 8
71 #define HILL_CLIMBING_TARGET_SIGNAL_TO_NOISE_RATIO 3.0
72 #define HILL_CLIMBING_MAX_CHANGE_PER_SECOND 4
73 #define HILL_CLIMBING_MAX_CHANGE_PER_SAMPLE 20
74 #define HILL_CLIMBING_SAMPLE_INTERVAL_LOW 10
75 #define HILL_CLIMBING_SAMPLE_INTERVAL_HIGH 200
76 #define HILL_CLIMBING_ERROR_SMOOTHING_FACTOR 0.01
77 #define HILL_CLIMBING_MAX_SAMPLE_ERROR_PERCENT 0.15
78
79 typedef union {
80         struct {
81                 gint16 max_working; /* determined by heuristic */
82                 gint16 active; /* executing worker_thread */
83                 gint16 working; /* actively executing worker_thread, not parked */
84                 gint16 parked; /* parked */
85         } _;
86         gint64 as_gint64;
87 } ThreadPoolCounter;
88
89 typedef struct {
90         MonoDomain *domain;
91         /* Number of outstanding jobs */
92         gint32 outstanding_request;
93         /* Number of currently executing jobs */
94         int     threadpool_jobs;
95         /* Signalled when threadpool_jobs + outstanding_request is 0 */
96         /* Protected by threadpool->domains_lock */
97         MonoCoopCond cleanup_cond;
98 } ThreadPoolDomain;
99
100 typedef MonoInternalThread ThreadPoolWorkingThread;
101
102 typedef struct {
103         gint32 wave_period;
104         gint32 samples_to_measure;
105         gdouble target_throughput_ratio;
106         gdouble target_signal_to_noise_ratio;
107         gdouble max_change_per_second;
108         gdouble max_change_per_sample;
109         gint32 max_thread_wave_magnitude;
110         gint32 sample_interval_low;
111         gdouble thread_magnitude_multiplier;
112         gint32 sample_interval_high;
113         gdouble throughput_error_smoothing_factor;
114         gdouble gain_exponent;
115         gdouble max_sample_error;
116
117         gdouble current_control_setting;
118         gint64 total_samples;
119         gint16 last_thread_count;
120         gdouble elapsed_since_last_change;
121         gdouble completions_since_last_change;
122
123         gdouble average_throughput_noise;
124
125         gdouble *samples;
126         gdouble *thread_counts;
127
128         guint32 current_sample_interval;
129         gpointer random_interval_generator;
130
131         gint32 accumulated_completion_count;
132         gdouble accumulated_sample_duration;
133 } ThreadPoolHillClimbing;
134
135 typedef struct {
136         ThreadPoolCounter counters;
137
138         GPtrArray *domains; // ThreadPoolDomain* []
139         MonoCoopMutex domains_lock;
140
141         GPtrArray *working_threads; // ThreadPoolWorkingThread* []
142         gint32 parked_threads_count;
143         MonoCoopCond parked_threads_cond;
144         MonoCoopMutex active_threads_lock; /* protect access to working_threads and parked_threads */
145
146         guint32 worker_creation_current_second;
147         guint32 worker_creation_current_count;
148         MonoCoopMutex worker_creation_lock;
149
150         gint32 heuristic_completions;
151         gint64 heuristic_sample_start;
152         gint64 heuristic_last_dequeue; // ms
153         gint64 heuristic_last_adjustment; // ms
154         gint64 heuristic_adjustment_interval; // ms
155         ThreadPoolHillClimbing heuristic_hill_climbing;
156         MonoCoopMutex heuristic_lock;
157
158         gint32 limit_worker_min;
159         gint32 limit_worker_max;
160         gint32 limit_io_min;
161         gint32 limit_io_max;
162
163         MonoCpuUsageState *cpu_usage_state;
164         gint32 cpu_usage;
165
166         /* suspended by the debugger */
167         gboolean suspended;
168 } ThreadPool;
169
170 typedef enum {
171         TRANSITION_WARMUP,
172         TRANSITION_INITIALIZING,
173         TRANSITION_RANDOM_MOVE,
174         TRANSITION_CLIMBING_MOVE,
175         TRANSITION_CHANGE_POINT,
176         TRANSITION_STABILIZING,
177         TRANSITION_STARVATION,
178         TRANSITION_THREAD_TIMED_OUT,
179         TRANSITION_UNDEFINED,
180 } ThreadPoolHeuristicStateTransition;
181
182 static mono_lazy_init_t status = MONO_LAZY_INIT_STATUS_NOT_INITIALIZED;
183
184 enum {
185         MONITOR_STATUS_REQUESTED,
186         MONITOR_STATUS_WAITING_FOR_REQUEST,
187         MONITOR_STATUS_NOT_RUNNING,
188 };
189
190 static gint32 monitor_status = MONITOR_STATUS_NOT_RUNNING;
191
192 static ThreadPool* threadpool;
193
194 #define COUNTER_CHECK(counter) \
195         do { \
196                 g_assert (counter._.max_working > 0); \
197                 g_assert (counter._.working >= 0); \
198                 g_assert (counter._.active >= 0); \
199         } while (0)
200
201 #define COUNTER_READ() (InterlockedRead64 (&threadpool->counters.as_gint64))
202
203 #define COUNTER_ATOMIC(var,block) \
204         do { \
205                 ThreadPoolCounter __old; \
206                 do { \
207                         g_assert (threadpool); \
208                         __old.as_gint64 = COUNTER_READ (); \
209                         (var) = __old; \
210                         { block; } \
211                         COUNTER_CHECK (var); \
212                 } while (InterlockedCompareExchange64 (&threadpool->counters.as_gint64, (var).as_gint64, __old.as_gint64) != __old.as_gint64); \
213         } while (0)
214
215 #define COUNTER_TRY_ATOMIC(res,var,block) \
216         do { \
217                 ThreadPoolCounter __old; \
218                 do { \
219                         g_assert (threadpool); \
220                         __old.as_gint64 = COUNTER_READ (); \
221                         (var) = __old; \
222                         (res) = FALSE; \
223                         { block; } \
224                         COUNTER_CHECK (var); \
225                         (res) = InterlockedCompareExchange64 (&threadpool->counters.as_gint64, (var).as_gint64, __old.as_gint64) == __old.as_gint64; \
226                 } while (0); \
227         } while (0)
228
229 static inline void
230 domains_lock (void)
231 {
232         mono_coop_mutex_lock (&threadpool->domains_lock);
233 }
234
235 static inline void
236 domains_unlock (void)
237 {
238         mono_coop_mutex_unlock (&threadpool->domains_lock);
239 }
240
241 static gpointer
242 rand_create (void)
243 {
244         mono_rand_open ();
245         return mono_rand_init (NULL, 0);
246 }
247
248 static guint32
249 rand_next (gpointer *handle, guint32 min, guint32 max)
250 {
251         MonoError error;
252         guint32 val;
253         mono_rand_try_get_uint32 (handle, &val, min, max, &error);
254         // FIXME handle error
255         mono_error_assert_ok (&error);
256         return val;
257 }
258
259 static void
260 rand_free (gpointer handle)
261 {
262         mono_rand_close (handle);
263 }
264
265 static void
266 initialize (void)
267 {
268         ThreadPoolHillClimbing *hc;
269         const char *threads_per_cpu_env;
270         gint threads_per_cpu;
271         gint threads_count;
272
273         g_assert (!threadpool);
274         threadpool = g_new0 (ThreadPool, 1);
275         g_assert (threadpool);
276
277         threadpool->domains = g_ptr_array_new ();
278         mono_coop_mutex_init (&threadpool->domains_lock);
279
280         threadpool->parked_threads_count = 0;
281         mono_coop_cond_init (&threadpool->parked_threads_cond);
282         threadpool->working_threads = g_ptr_array_new ();
283         mono_coop_mutex_init (&threadpool->active_threads_lock);
284
285         threadpool->worker_creation_current_second = -1;
286         mono_coop_mutex_init (&threadpool->worker_creation_lock);
287
288         threadpool->heuristic_adjustment_interval = 10;
289         mono_coop_mutex_init (&threadpool->heuristic_lock);
290
291         mono_rand_open ();
292
293         hc = &threadpool->heuristic_hill_climbing;
294
295         hc->wave_period = HILL_CLIMBING_WAVE_PERIOD;
296         hc->max_thread_wave_magnitude = HILL_CLIMBING_MAX_WAVE_MAGNITUDE;
297         hc->thread_magnitude_multiplier = (gdouble) HILL_CLIMBING_WAVE_MAGNITUDE_MULTIPLIER;
298         hc->samples_to_measure = hc->wave_period * HILL_CLIMBING_WAVE_HISTORY_SIZE;
299         hc->target_throughput_ratio = (gdouble) HILL_CLIMBING_BIAS;
300         hc->target_signal_to_noise_ratio = (gdouble) HILL_CLIMBING_TARGET_SIGNAL_TO_NOISE_RATIO;
301         hc->max_change_per_second = (gdouble) HILL_CLIMBING_MAX_CHANGE_PER_SECOND;
302         hc->max_change_per_sample = (gdouble) HILL_CLIMBING_MAX_CHANGE_PER_SAMPLE;
303         hc->sample_interval_low = HILL_CLIMBING_SAMPLE_INTERVAL_LOW;
304         hc->sample_interval_high = HILL_CLIMBING_SAMPLE_INTERVAL_HIGH;
305         hc->throughput_error_smoothing_factor = (gdouble) HILL_CLIMBING_ERROR_SMOOTHING_FACTOR;
306         hc->gain_exponent = (gdouble) HILL_CLIMBING_GAIN_EXPONENT;
307         hc->max_sample_error = (gdouble) HILL_CLIMBING_MAX_SAMPLE_ERROR_PERCENT;
308         hc->current_control_setting = 0;
309         hc->total_samples = 0;
310         hc->last_thread_count = 0;
311         hc->average_throughput_noise = 0;
312         hc->elapsed_since_last_change = 0;
313         hc->accumulated_completion_count = 0;
314         hc->accumulated_sample_duration = 0;
315         hc->samples = g_new0 (gdouble, hc->samples_to_measure);
316         hc->thread_counts = g_new0 (gdouble, hc->samples_to_measure);
317         hc->random_interval_generator = rand_create ();
318         hc->current_sample_interval = rand_next (&hc->random_interval_generator, hc->sample_interval_low, hc->sample_interval_high);
319
320         if (!(threads_per_cpu_env = g_getenv ("MONO_THREADS_PER_CPU")))
321                 threads_per_cpu = 1;
322         else
323                 threads_per_cpu = CLAMP (atoi (threads_per_cpu_env), 1, 50);
324
325         threads_count = mono_cpu_count () * threads_per_cpu;
326
327         threadpool->limit_worker_min = threadpool->limit_io_min = threads_count;
328
329 #if defined (PLATFORM_ANDROID) || defined (HOST_IOS)
330         threadpool->limit_worker_max = threadpool->limit_io_max = CLAMP (threads_count * 100, MIN (threads_count, 200), MAX (threads_count, 200));
331 #else
332         threadpool->limit_worker_max = threadpool->limit_io_max = threads_count * 100;
333 #endif
334
335         threadpool->counters._.max_working = threadpool->limit_worker_min;
336
337         threadpool->cpu_usage_state = g_new0 (MonoCpuUsageState, 1);
338
339         threadpool->suspended = FALSE;
340 }
341
342 static void worker_kill (ThreadPoolWorkingThread *thread);
343
344 static void
345 cleanup (void)
346 {
347         guint i;
348
349         /* we make the assumption along the code that we are
350          * cleaning up only if the runtime is shutting down */
351         g_assert (mono_runtime_is_shutting_down ());
352
353         while (monitor_status != MONITOR_STATUS_NOT_RUNNING)
354                 mono_thread_info_sleep (1, NULL);
355
356         mono_coop_mutex_lock (&threadpool->active_threads_lock);
357
358         /* stop all threadpool->working_threads */
359         for (i = 0; i < threadpool->working_threads->len; ++i)
360                 worker_kill ((ThreadPoolWorkingThread*) g_ptr_array_index (threadpool->working_threads, i));
361
362         /* unpark all threadpool->parked_threads */
363         mono_coop_cond_broadcast (&threadpool->parked_threads_cond);
364
365         mono_coop_mutex_unlock (&threadpool->active_threads_lock);
366 }
367
368 gboolean
369 mono_threadpool_ms_enqueue_work_item (MonoDomain *domain, MonoObject *work_item, MonoError *error)
370 {
371         static MonoClass *threadpool_class = NULL;
372         static MonoMethod *unsafe_queue_custom_work_item_method = NULL;
373         MonoDomain *current_domain;
374         MonoBoolean f;
375         gpointer args [2];
376
377         mono_error_init (error);
378         g_assert (work_item);
379
380         if (!threadpool_class)
381                 threadpool_class = mono_class_load_from_name (mono_defaults.corlib, "System.Threading", "ThreadPool");
382
383         if (!unsafe_queue_custom_work_item_method)
384                 unsafe_queue_custom_work_item_method = mono_class_get_method_from_name (threadpool_class, "UnsafeQueueCustomWorkItem", 2);
385         g_assert (unsafe_queue_custom_work_item_method);
386
387         f = FALSE;
388
389         args [0] = (gpointer) work_item;
390         args [1] = (gpointer) &f;
391
392         current_domain = mono_domain_get ();
393         if (current_domain == domain) {
394                 mono_runtime_invoke_checked (unsafe_queue_custom_work_item_method, NULL, args, error);
395                 return_val_if_nok (error, FALSE);
396         } else {
397                 mono_thread_push_appdomain_ref (domain);
398                 if (mono_domain_set (domain, FALSE)) {
399                         mono_runtime_invoke_checked (unsafe_queue_custom_work_item_method, NULL, args, error);
400                         if (!is_ok (error)) {
401                                 mono_thread_pop_appdomain_ref ();
402                                 return FALSE;
403                         }
404                         mono_domain_set (current_domain, TRUE);
405                 }
406                 mono_thread_pop_appdomain_ref ();
407         }
408         return TRUE;
409 }
410
411 /* LOCKING: domains_lock must be held */
412 static void
413 tpdomain_add (ThreadPoolDomain *tpdomain)
414 {
415         guint i, len;
416
417         g_assert (tpdomain);
418
419         len = threadpool->domains->len;
420         for (i = 0; i < len; ++i) {
421                 if (g_ptr_array_index (threadpool->domains, i) == tpdomain)
422                         break;
423         }
424
425         if (i == len)
426                 g_ptr_array_add (threadpool->domains, tpdomain);
427 }
428
429 /* LOCKING: domains_lock must be held. */
430 static gboolean
431 tpdomain_remove (ThreadPoolDomain *tpdomain)
432 {
433         g_assert (tpdomain);
434         return g_ptr_array_remove (threadpool->domains, tpdomain);
435 }
436
437 /* LOCKING: domains_lock must be held */
438 static ThreadPoolDomain *
439 tpdomain_get (MonoDomain *domain, gboolean create)
440 {
441         guint i;
442         ThreadPoolDomain *tpdomain;
443
444         g_assert (domain);
445
446         for (i = 0; i < threadpool->domains->len; ++i) {
447                 ThreadPoolDomain *tpdomain;
448
449                 tpdomain = (ThreadPoolDomain *)g_ptr_array_index (threadpool->domains, i);
450                 if (tpdomain->domain == domain)
451                         return tpdomain;
452         }
453
454         if (!create)
455                 return NULL;
456
457         tpdomain = g_new0 (ThreadPoolDomain, 1);
458         tpdomain->domain = domain;
459         mono_coop_cond_init (&tpdomain->cleanup_cond);
460
461         tpdomain_add (tpdomain);
462
463         return tpdomain;
464 }
465
466 static void
467 tpdomain_free (ThreadPoolDomain *tpdomain)
468 {
469         g_free (tpdomain);
470 }
471
472 /* LOCKING: domains_lock must be held */
473 static gboolean
474 domain_any_has_request (void)
475 {
476         guint i;
477
478         for (i = 0; i < threadpool->domains->len; ++i) {
479                 ThreadPoolDomain *tmp = (ThreadPoolDomain *)g_ptr_array_index (threadpool->domains, i);
480                 if (tmp->outstanding_request > 0)
481                         return TRUE;
482         }
483
484         return FALSE;
485 }
486
487 /* LOCKING: domains_lock must be held */
488 static ThreadPoolDomain *
489 tpdomain_get_next (ThreadPoolDomain *current)
490 {
491         ThreadPoolDomain *tpdomain = NULL;
492         guint len;
493
494         len = threadpool->domains->len;
495         if (len > 0) {
496                 guint i, current_idx = -1;
497                 if (current) {
498                         for (i = 0; i < len; ++i) {
499                                 if (current == g_ptr_array_index (threadpool->domains, i)) {
500                                         current_idx = i;
501                                         break;
502                                 }
503                         }
504                         g_assert (current_idx != (guint)-1);
505                 }
506                 for (i = current_idx + 1; i < len + current_idx + 1; ++i) {
507                         ThreadPoolDomain *tmp = (ThreadPoolDomain *)g_ptr_array_index (threadpool->domains, i % len);
508                         if (tmp->outstanding_request > 0) {
509                                 tpdomain = tmp;
510                                 break;
511                         }
512                 }
513         }
514
515         return tpdomain;
516 }
517
518 static void
519 worker_wait_interrupt (gpointer data)
520 {
521         mono_coop_mutex_lock (&threadpool->active_threads_lock);
522         mono_coop_cond_signal (&threadpool->parked_threads_cond);
523         mono_coop_mutex_unlock (&threadpool->active_threads_lock);
524 }
525
526 /* return TRUE if timeout, FALSE otherwise (worker unpark or interrupt) */
527 static gboolean
528 worker_park (void)
529 {
530         gboolean timeout = FALSE;
531
532         mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] current worker parking", mono_native_thread_id_get ());
533
534         mono_gc_set_skip_thread (TRUE);
535
536         mono_coop_mutex_lock (&threadpool->active_threads_lock);
537
538         if (!mono_runtime_is_shutting_down ()) {
539                 static gpointer rand_handle = NULL;
540                 MonoInternalThread *thread_internal;
541                 gboolean interrupted = FALSE;
542
543                 if (!rand_handle)
544                         rand_handle = rand_create ();
545                 g_assert (rand_handle);
546
547                 thread_internal = mono_thread_internal_current ();
548                 g_assert (thread_internal);
549
550                 threadpool->parked_threads_count += 1;
551                 g_ptr_array_remove_fast (threadpool->working_threads, thread_internal);
552
553                 mono_thread_info_install_interrupt (worker_wait_interrupt, NULL, &interrupted);
554                 if (interrupted)
555                         goto done;
556
557                 if (mono_coop_cond_timedwait (&threadpool->parked_threads_cond, &threadpool->active_threads_lock, rand_next (&rand_handle, 5 * 1000, 60 * 1000)) != 0)
558                         timeout = TRUE;
559
560                 mono_thread_info_uninstall_interrupt (&interrupted);
561
562 done:
563                 g_ptr_array_add (threadpool->working_threads, thread_internal);
564                 threadpool->parked_threads_count -= 1;
565         }
566
567         mono_coop_mutex_unlock (&threadpool->active_threads_lock);
568
569         mono_gc_set_skip_thread (FALSE);
570
571         mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] current worker unparking, timeout? %s", mono_native_thread_id_get (), timeout ? "yes" : "no");
572
573         return timeout;
574 }
575
576 static gboolean
577 worker_try_unpark (void)
578 {
579         gboolean res = FALSE;
580
581         mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try unpark worker", mono_native_thread_id_get ());
582
583         mono_coop_mutex_lock (&threadpool->active_threads_lock);
584         if (threadpool->parked_threads_count > 0) {
585                 mono_coop_cond_signal (&threadpool->parked_threads_cond);
586                 res = TRUE;
587         }
588         mono_coop_mutex_unlock (&threadpool->active_threads_lock);
589
590         mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try unpark worker, success? %s", mono_native_thread_id_get (), res ? "yes" : "no");
591
592         return res;
593 }
594
595 static void
596 worker_kill (ThreadPoolWorkingThread *thread)
597 {
598         if (thread == mono_thread_internal_current ())
599                 return;
600
601         mono_thread_internal_abort ((MonoInternalThread*) thread);
602 }
603
604 static void
605 worker_thread (gpointer data)
606 {
607         MonoError error;
608         MonoInternalThread *thread;
609         ThreadPoolDomain *tpdomain, *previous_tpdomain;
610         ThreadPoolCounter counter;
611         gboolean retire = FALSE;
612
613         mono_trace (G_LOG_LEVEL_INFO, MONO_TRACE_THREADPOOL, "[%p] worker starting", mono_native_thread_id_get ());
614
615         g_assert (threadpool);
616
617         thread = mono_thread_internal_current ();
618         g_assert (thread);
619
620         mono_thread_set_name_internal (thread, mono_string_new (mono_get_root_domain (), "Threadpool worker"), FALSE, &error);
621         mono_error_assert_ok (&error);
622
623         mono_coop_mutex_lock (&threadpool->active_threads_lock);
624         g_ptr_array_add (threadpool->working_threads, thread);
625         mono_coop_mutex_unlock (&threadpool->active_threads_lock);
626
627         previous_tpdomain = NULL;
628
629         domains_lock ();
630
631         while (!mono_runtime_is_shutting_down ()) {
632                 tpdomain = NULL;
633
634                 if ((thread->state & (ThreadState_AbortRequested | ThreadState_SuspendRequested)) != 0) {
635                         domains_unlock ();
636                         mono_thread_interruption_checkpoint ();
637                         domains_lock ();
638                 }
639
640                 if (retire || !(tpdomain = tpdomain_get_next (previous_tpdomain))) {
641                         gboolean timeout;
642
643                         COUNTER_ATOMIC (counter, {
644                                 counter._.working --;
645                                 counter._.parked ++;
646                         });
647
648                         domains_unlock ();
649                         timeout = worker_park ();
650                         domains_lock ();
651
652                         COUNTER_ATOMIC (counter, {
653                                 counter._.working ++;
654                                 counter._.parked --;
655                         });
656
657                         if (timeout)
658                                 break;
659
660                         if (retire)
661                                 retire = FALSE;
662
663                         /* The tpdomain->domain might have unloaded, while this thread was parked */
664                         previous_tpdomain = NULL;
665
666                         continue;
667                 }
668
669                 tpdomain->outstanding_request --;
670                 g_assert (tpdomain->outstanding_request >= 0);
671
672                 mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] worker running in domain %p (outstanding requests %d) ",
673                         mono_native_thread_id_get (), tpdomain->domain, tpdomain->outstanding_request);
674
675                 g_assert (tpdomain->domain);
676                 g_assert (tpdomain->threadpool_jobs >= 0);
677                 tpdomain->threadpool_jobs ++;
678
679                 /*
680                  * This is needed so there is always an lmf frame in the runtime invoke call below,
681                  * so ThreadAbortExceptions are caught even if the thread is in native code.
682                  */
683                 mono_defaults.threadpool_perform_wait_callback_method->save_lmf = TRUE;
684
685                 domains_unlock ();
686
687                 mono_thread_push_appdomain_ref (tpdomain->domain);
688                 if (mono_domain_set (tpdomain->domain, FALSE)) {
689                         MonoObject *exc = NULL, *res;
690
691                         res = mono_runtime_try_invoke (mono_defaults.threadpool_perform_wait_callback_method, NULL, NULL, &exc, &error);
692                         if (exc || !mono_error_ok(&error)) {
693                                 if (exc == NULL)
694                                         exc = (MonoObject *) mono_error_convert_to_exception (&error);
695                                 else
696                                         mono_error_cleanup (&error);
697                                 mono_thread_internal_unhandled_exception (exc);
698                         } else if (res && *(MonoBoolean*) mono_object_unbox (res) == FALSE)
699                                 retire = TRUE;
700
701                         mono_thread_clr_state (thread, (MonoThreadState)~ThreadState_Background);
702                         if (!mono_thread_test_state (thread , ThreadState_Background))
703                                 ves_icall_System_Threading_Thread_SetState (thread, ThreadState_Background);
704
705                         mono_domain_set (mono_get_root_domain (), TRUE);
706                 }
707                 mono_thread_pop_appdomain_ref ();
708
709                 domains_lock ();
710
711                 tpdomain->threadpool_jobs --;
712                 g_assert (tpdomain->threadpool_jobs >= 0);
713
714                 if (tpdomain->outstanding_request + tpdomain->threadpool_jobs == 0 && mono_domain_is_unloading (tpdomain->domain)) {
715                         gboolean removed;
716
717                         removed = tpdomain_remove (tpdomain);
718                         g_assert (removed);
719
720                         mono_coop_cond_signal (&tpdomain->cleanup_cond);
721                         tpdomain = NULL;
722                 }
723
724                 previous_tpdomain = tpdomain;
725         }
726
727         domains_unlock ();
728
729         mono_coop_mutex_lock (&threadpool->active_threads_lock);
730         g_ptr_array_remove_fast (threadpool->working_threads, thread);
731         mono_coop_mutex_unlock (&threadpool->active_threads_lock);
732
733         COUNTER_ATOMIC (counter, {
734                 counter._.working--;
735                 counter._.active --;
736         });
737
738         mono_trace (G_LOG_LEVEL_INFO, MONO_TRACE_THREADPOOL, "[%p] worker finishing", mono_native_thread_id_get ());
739 }
740
741 static gboolean
742 worker_try_create (void)
743 {
744         ThreadPoolCounter counter;
745         MonoInternalThread *thread;
746         gint64 current_ticks;
747         gint32 now;
748
749         mono_coop_mutex_lock (&threadpool->worker_creation_lock);
750
751         mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try create worker", mono_native_thread_id_get ());
752         current_ticks = mono_100ns_ticks ();
753         now = current_ticks / (10 * 1000 * 1000);
754         if (0 == current_ticks) {
755                 g_warning ("failed to get 100ns ticks");
756         } else {
757                 if (threadpool->worker_creation_current_second != now) {
758                         threadpool->worker_creation_current_second = now;
759                         threadpool->worker_creation_current_count = 0;
760                 } else {
761                         g_assert (threadpool->worker_creation_current_count <= WORKER_CREATION_MAX_PER_SEC);
762                         if (threadpool->worker_creation_current_count == WORKER_CREATION_MAX_PER_SEC) {
763                                 mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try create worker, failed: maximum number of worker created per second reached, current count = %d",
764                                         mono_native_thread_id_get (), threadpool->worker_creation_current_count);
765                                 mono_coop_mutex_unlock (&threadpool->worker_creation_lock);
766                                 return FALSE;
767                         }
768                 }
769         }
770
771         COUNTER_ATOMIC (counter, {
772                 if (counter._.working >= counter._.max_working) {
773                         mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try create worker, failed: maximum number of working threads reached",
774                                 mono_native_thread_id_get ());
775                         mono_coop_mutex_unlock (&threadpool->worker_creation_lock);
776                         return FALSE;
777                 }
778                 counter._.working ++;
779                 counter._.active ++;
780         });
781
782         MonoError error;
783         if ((thread = mono_thread_create_internal (mono_get_root_domain (), worker_thread, NULL, TRUE, 0, &error)) != NULL) {
784                 threadpool->worker_creation_current_count += 1;
785
786                 mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try create worker, created %p, now = %d count = %d", mono_native_thread_id_get (), GUINT_TO_POINTER(thread->tid), now, threadpool->worker_creation_current_count);
787                 mono_coop_mutex_unlock (&threadpool->worker_creation_lock);
788                 return TRUE;
789         }
790
791         mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try create worker, failed: could not create thread due to %s", mono_native_thread_id_get (), mono_error_get_message (&error));
792         mono_error_cleanup (&error);
793
794         COUNTER_ATOMIC (counter, {
795                 counter._.working --;
796                 counter._.active --;
797         });
798
799         mono_coop_mutex_unlock (&threadpool->worker_creation_lock);
800         return FALSE;
801 }
802
803 static void monitor_ensure_running (void);
804
805 static gboolean
806 worker_request (MonoDomain *domain)
807 {
808         ThreadPoolDomain *tpdomain;
809
810         g_assert (domain);
811         g_assert (threadpool);
812
813         if (mono_runtime_is_shutting_down ())
814                 return FALSE;
815
816         domains_lock ();
817
818         /* synchronize check with worker_thread */
819         if (mono_domain_is_unloading (domain)) {
820                 domains_unlock ();
821                 return FALSE;
822         }
823
824         tpdomain = tpdomain_get (domain, TRUE);
825         g_assert (tpdomain);
826         tpdomain->outstanding_request ++;
827
828         mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] request worker, domain = %p, outstanding_request = %d",
829                 mono_native_thread_id_get (), tpdomain->domain, tpdomain->outstanding_request);
830
831         domains_unlock ();
832
833         if (threadpool->suspended)
834                 return FALSE;
835
836         monitor_ensure_running ();
837
838         if (worker_try_unpark ()) {
839                 mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] request worker, unparked", mono_native_thread_id_get ());
840                 return TRUE;
841         }
842
843         if (worker_try_create ()) {
844                 mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] request worker, created", mono_native_thread_id_get ());
845                 return TRUE;
846         }
847
848         mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] request worker, failed", mono_native_thread_id_get ());
849         return FALSE;
850 }
851
852 static gboolean
853 monitor_should_keep_running (void)
854 {
855         static gint64 last_should_keep_running = -1;
856
857         g_assert (monitor_status == MONITOR_STATUS_WAITING_FOR_REQUEST || monitor_status == MONITOR_STATUS_REQUESTED);
858
859         if (InterlockedExchange (&monitor_status, MONITOR_STATUS_WAITING_FOR_REQUEST) == MONITOR_STATUS_WAITING_FOR_REQUEST) {
860                 gboolean should_keep_running = TRUE, force_should_keep_running = FALSE;
861
862                 if (mono_runtime_is_shutting_down ()) {
863                         should_keep_running = FALSE;
864                 } else {
865                         domains_lock ();
866                         if (!domain_any_has_request ())
867                                 should_keep_running = FALSE;
868                         domains_unlock ();
869
870                         if (!should_keep_running) {
871                                 if (last_should_keep_running == -1 || mono_100ns_ticks () - last_should_keep_running < MONITOR_MINIMAL_LIFETIME * 1000 * 10) {
872                                         should_keep_running = force_should_keep_running = TRUE;
873                                 }
874                         }
875                 }
876
877                 if (should_keep_running) {
878                         if (last_should_keep_running == -1 || !force_should_keep_running)
879                                 last_should_keep_running = mono_100ns_ticks ();
880                 } else {
881                         last_should_keep_running = -1;
882                         if (InterlockedCompareExchange (&monitor_status, MONITOR_STATUS_NOT_RUNNING, MONITOR_STATUS_WAITING_FOR_REQUEST) == MONITOR_STATUS_WAITING_FOR_REQUEST)
883                                 return FALSE;
884                 }
885         }
886
887         g_assert (monitor_status == MONITOR_STATUS_WAITING_FOR_REQUEST || monitor_status == MONITOR_STATUS_REQUESTED);
888
889         return TRUE;
890 }
891
892 static gboolean
893 monitor_sufficient_delay_since_last_dequeue (void)
894 {
895         gint64 threshold;
896
897         g_assert (threadpool);
898
899         if (threadpool->cpu_usage < CPU_USAGE_LOW) {
900                 threshold = MONITOR_INTERVAL;
901         } else {
902                 ThreadPoolCounter counter;
903                 counter.as_gint64 = COUNTER_READ();
904                 threshold = counter._.max_working * MONITOR_INTERVAL * 2;
905         }
906
907         return mono_msec_ticks () >= threadpool->heuristic_last_dequeue + threshold;
908 }
909
910 static void hill_climbing_force_change (gint16 new_thread_count, ThreadPoolHeuristicStateTransition transition);
911
912 static void
913 monitor_thread (void)
914 {
915         MonoInternalThread *current_thread = mono_thread_internal_current ();
916         guint i;
917
918         mono_cpu_usage (threadpool->cpu_usage_state);
919
920         mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] monitor thread, started", mono_native_thread_id_get ());
921
922         do {
923                 ThreadPoolCounter counter;
924                 gboolean limit_worker_max_reached;
925                 gint32 interval_left = MONITOR_INTERVAL;
926                 gint32 awake = 0; /* number of spurious awakes we tolerate before doing a round of rebalancing */
927
928                 g_assert (monitor_status != MONITOR_STATUS_NOT_RUNNING);
929
930                 mono_gc_set_skip_thread (TRUE);
931
932                 do {
933                         gint64 ts;
934                         gboolean alerted = FALSE;
935
936                         if (mono_runtime_is_shutting_down ())
937                                 break;
938
939                         ts = mono_msec_ticks ();
940                         if (mono_thread_info_sleep (interval_left, &alerted) == 0)
941                                 break;
942                         interval_left -= mono_msec_ticks () - ts;
943
944                         mono_gc_set_skip_thread (FALSE);
945                         if ((current_thread->state & (ThreadState_StopRequested | ThreadState_SuspendRequested)) != 0)
946                                 mono_thread_interruption_checkpoint ();
947                         mono_gc_set_skip_thread (TRUE);
948                 } while (interval_left > 0 && ++awake < 10);
949
950                 mono_gc_set_skip_thread (FALSE);
951
952                 if (threadpool->suspended)
953                         continue;
954
955                 if (mono_runtime_is_shutting_down ())
956                         continue;
957
958                 domains_lock ();
959                 if (!domain_any_has_request ()) {
960                         domains_unlock ();
961                         continue;
962                 }
963                 domains_unlock ();
964
965                 threadpool->cpu_usage = mono_cpu_usage (threadpool->cpu_usage_state);
966
967                 if (!monitor_sufficient_delay_since_last_dequeue ())
968                         continue;
969
970                 limit_worker_max_reached = FALSE;
971
972                 COUNTER_ATOMIC (counter, {
973                         if (counter._.max_working >= threadpool->limit_worker_max) {
974                                 limit_worker_max_reached = TRUE;
975                                 break;
976                         }
977                         counter._.max_working ++;
978                 });
979
980                 if (limit_worker_max_reached)
981                         continue;
982
983                 hill_climbing_force_change (counter._.max_working, TRANSITION_STARVATION);
984
985                 for (i = 0; i < 5; ++i) {
986                         if (mono_runtime_is_shutting_down ())
987                                 break;
988
989                         if (worker_try_unpark ()) {
990                                 mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] monitor thread, unparked", mono_native_thread_id_get ());
991                                 break;
992                         }
993
994                         if (worker_try_create ()) {
995                                 mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] monitor thread, created", mono_native_thread_id_get ());
996                                 break;
997                         }
998                 }
999         } while (monitor_should_keep_running ());
1000
1001         mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] monitor thread, finished", mono_native_thread_id_get ());
1002 }
1003
1004 static void
1005 monitor_ensure_running (void)
1006 {
1007         MonoError error;
1008         for (;;) {
1009                 switch (monitor_status) {
1010                 case MONITOR_STATUS_REQUESTED:
1011                         return;
1012                 case MONITOR_STATUS_WAITING_FOR_REQUEST:
1013                         InterlockedCompareExchange (&monitor_status, MONITOR_STATUS_REQUESTED, MONITOR_STATUS_WAITING_FOR_REQUEST);
1014                         break;
1015                 case MONITOR_STATUS_NOT_RUNNING:
1016                         if (mono_runtime_is_shutting_down ())
1017                                 return;
1018                         if (InterlockedCompareExchange (&monitor_status, MONITOR_STATUS_REQUESTED, MONITOR_STATUS_NOT_RUNNING) == MONITOR_STATUS_NOT_RUNNING) {
1019                                 if (!mono_thread_create_internal (mono_get_root_domain (), monitor_thread, NULL, TRUE, SMALL_STACK, &error)) {
1020                                         monitor_status = MONITOR_STATUS_NOT_RUNNING;
1021                                         mono_error_cleanup (&error);
1022                                 }
1023                                 return;
1024                         }
1025                         break;
1026                 default: g_assert_not_reached ();
1027                 }
1028         }
1029 }
1030
1031 static void
1032 hill_climbing_change_thread_count (gint16 new_thread_count, ThreadPoolHeuristicStateTransition transition)
1033 {
1034         ThreadPoolHillClimbing *hc;
1035
1036         g_assert (threadpool);
1037
1038         hc = &threadpool->heuristic_hill_climbing;
1039
1040         mono_trace (G_LOG_LEVEL_INFO, MONO_TRACE_THREADPOOL, "[%p] hill climbing, change max number of threads %d", mono_native_thread_id_get (), new_thread_count);
1041
1042         hc->last_thread_count = new_thread_count;
1043         hc->current_sample_interval = rand_next (&hc->random_interval_generator, hc->sample_interval_low, hc->sample_interval_high);
1044         hc->elapsed_since_last_change = 0;
1045         hc->completions_since_last_change = 0;
1046 }
1047
1048 static void
1049 hill_climbing_force_change (gint16 new_thread_count, ThreadPoolHeuristicStateTransition transition)
1050 {
1051         ThreadPoolHillClimbing *hc;
1052
1053         g_assert (threadpool);
1054
1055         hc = &threadpool->heuristic_hill_climbing;
1056
1057         if (new_thread_count != hc->last_thread_count) {
1058                 hc->current_control_setting += new_thread_count - hc->last_thread_count;
1059                 hill_climbing_change_thread_count (new_thread_count, transition);
1060         }
1061 }
1062
1063 static double_complex
1064 hill_climbing_get_wave_component (gdouble *samples, guint sample_count, gdouble period)
1065 {
1066         ThreadPoolHillClimbing *hc;
1067         gdouble w, cosine, sine, coeff, q0, q1, q2;
1068         guint i;
1069
1070         g_assert (threadpool);
1071         g_assert (sample_count >= period);
1072         g_assert (period >= 2);
1073
1074         hc = &threadpool->heuristic_hill_climbing;
1075
1076         w = 2.0 * M_PI / period;
1077         cosine = cos (w);
1078         sine = sin (w);
1079         coeff = 2.0 * cosine;
1080         q0 = q1 = q2 = 0;
1081
1082         for (i = 0; i < sample_count; ++i) {
1083                 q0 = coeff * q1 - q2 + samples [(hc->total_samples - sample_count + i) % hc->samples_to_measure];
1084                 q2 = q1;
1085                 q1 = q0;
1086         }
1087
1088         return mono_double_complex_scalar_div (mono_double_complex_make (q1 - q2 * cosine, (q2 * sine)), ((gdouble)sample_count));
1089 }
1090
1091 static gint16
1092 hill_climbing_update (gint16 current_thread_count, guint32 sample_duration, gint32 completions, gint64 *adjustment_interval)
1093 {
1094         ThreadPoolHillClimbing *hc;
1095         ThreadPoolHeuristicStateTransition transition;
1096         gdouble throughput;
1097         gdouble throughput_error_estimate;
1098         gdouble confidence;
1099         gdouble move;
1100         gdouble gain;
1101         gint sample_index;
1102         gint sample_count;
1103         gint new_thread_wave_magnitude;
1104         gint new_thread_count;
1105         double_complex thread_wave_component;
1106         double_complex throughput_wave_component;
1107         double_complex ratio;
1108
1109         g_assert (threadpool);
1110         g_assert (adjustment_interval);
1111
1112         hc = &threadpool->heuristic_hill_climbing;
1113
1114         /* If someone changed the thread count without telling us, update our records accordingly. */
1115         if (current_thread_count != hc->last_thread_count)
1116                 hill_climbing_force_change (current_thread_count, TRANSITION_INITIALIZING);
1117
1118         /* Update the cumulative stats for this thread count */
1119         hc->elapsed_since_last_change += sample_duration;
1120         hc->completions_since_last_change += completions;
1121
1122         /* Add in any data we've already collected about this sample */
1123         sample_duration += hc->accumulated_sample_duration;
1124         completions += hc->accumulated_completion_count;
1125
1126         /* We need to make sure we're collecting reasonably accurate data. Since we're just counting the end
1127          * of each work item, we are goinng to be missing some data about what really happened during the
1128          * sample interval. The count produced by each thread includes an initial work item that may have
1129          * started well before the start of the interval, and each thread may have been running some new
1130          * work item for some time before the end of the interval, which did not yet get counted. So
1131          * our count is going to be off by +/- threadCount workitems.
1132          *
1133          * The exception is that the thread that reported to us last time definitely wasn't running any work
1134          * at that time, and the thread that's reporting now definitely isn't running a work item now. So
1135          * we really only need to consider threadCount-1 threads.
1136          *
1137          * Thus the percent error in our count is +/- (threadCount-1)/numCompletions.
1138          *
1139          * We cannot rely on the frequency-domain analysis we'll be doing later to filter out this error, because
1140          * of the way it accumulates over time. If this sample is off by, say, 33% in the negative direction,
1141          * then the next one likely will be too. The one after that will include the sum of the completions
1142          * we missed in the previous samples, and so will be 33% positive. So every three samples we'll have
1143          * two "low" samples and one "high" sample. This will appear as periodic variation right in the frequency
1144          * range we're targeting, which will not be filtered by the frequency-domain translation. */
1145         if (hc->total_samples > 0 && ((current_thread_count - 1.0) / completions) >= hc->max_sample_error) {
1146                 /* Not accurate enough yet. Let's accumulate the data so
1147                  * far, and tell the ThreadPool to collect a little more. */
1148                 hc->accumulated_sample_duration = sample_duration;
1149                 hc->accumulated_completion_count = completions;
1150                 *adjustment_interval = 10;
1151                 return current_thread_count;
1152         }
1153
1154         /* We've got enouugh data for our sample; reset our accumulators for next time. */
1155         hc->accumulated_sample_duration = 0;
1156         hc->accumulated_completion_count = 0;
1157
1158         /* Add the current thread count and throughput sample to our history. */
1159         throughput = ((gdouble) completions) / sample_duration;
1160
1161         sample_index = hc->total_samples % hc->samples_to_measure;
1162         hc->samples [sample_index] = throughput;
1163         hc->thread_counts [sample_index] = current_thread_count;
1164         hc->total_samples ++;
1165
1166         /* Set up defaults for our metrics. */
1167         thread_wave_component = mono_double_complex_make(0, 0);
1168         throughput_wave_component = mono_double_complex_make(0, 0);
1169         throughput_error_estimate = 0;
1170         ratio = mono_double_complex_make(0, 0);
1171         confidence = 0;
1172
1173         transition = TRANSITION_WARMUP;
1174
1175         /* How many samples will we use? It must be at least the three wave periods we're looking for, and it must also
1176          * be a whole multiple of the primary wave's period; otherwise the frequency we're looking for will fall between
1177          * two frequency bands in the Fourier analysis, and we won't be able to measure it accurately. */
1178         sample_count = ((gint) MIN (hc->total_samples - 1, hc->samples_to_measure) / hc->wave_period) * hc->wave_period;
1179
1180         if (sample_count > hc->wave_period) {
1181                 guint i;
1182                 gdouble average_throughput;
1183                 gdouble average_thread_count;
1184                 gdouble sample_sum = 0;
1185                 gdouble thread_sum = 0;
1186
1187                 /* Average the throughput and thread count samples, so we can scale the wave magnitudes later. */
1188                 for (i = 0; i < sample_count; ++i) {
1189                         guint j = (hc->total_samples - sample_count + i) % hc->samples_to_measure;
1190                         sample_sum += hc->samples [j];
1191                         thread_sum += hc->thread_counts [j];
1192                 }
1193
1194                 average_throughput = sample_sum / sample_count;
1195                 average_thread_count = thread_sum / sample_count;
1196
1197                 if (average_throughput > 0 && average_thread_count > 0) {
1198                         gdouble noise_for_confidence, adjacent_period_1, adjacent_period_2;
1199
1200                         /* Calculate the periods of the adjacent frequency bands we'll be using to
1201                          * measure noise levels. We want the two adjacent Fourier frequency bands. */
1202                         adjacent_period_1 = sample_count / (((gdouble) sample_count) / ((gdouble) hc->wave_period) + 1);
1203                         adjacent_period_2 = sample_count / (((gdouble) sample_count) / ((gdouble) hc->wave_period) - 1);
1204
1205                         /* Get the the three different frequency components of the throughput (scaled by average
1206                          * throughput). Our "error" estimate (the amount of noise that might be present in the
1207                          * frequency band we're really interested in) is the average of the adjacent bands. */
1208                         throughput_wave_component = mono_double_complex_scalar_div (hill_climbing_get_wave_component (hc->samples, sample_count, hc->wave_period), average_throughput);
1209                         throughput_error_estimate = cabs (mono_double_complex_scalar_div (hill_climbing_get_wave_component (hc->samples, sample_count, adjacent_period_1), average_throughput));
1210
1211                         if (adjacent_period_2 <= sample_count) {
1212                                 throughput_error_estimate = MAX (throughput_error_estimate, cabs (mono_double_complex_scalar_div (hill_climbing_get_wave_component (
1213                                         hc->samples, sample_count, adjacent_period_2), average_throughput)));
1214                         }
1215
1216                         /* Do the same for the thread counts, so we have something to compare to. We don't
1217                          * measure thread count noise, because there is none; these are exact measurements. */
1218                         thread_wave_component = mono_double_complex_scalar_div (hill_climbing_get_wave_component (hc->thread_counts, sample_count, hc->wave_period), average_thread_count);
1219
1220                         /* Update our moving average of the throughput noise. We'll use this
1221                          * later as feedback to determine the new size of the thread wave. */
1222                         if (hc->average_throughput_noise == 0) {
1223                                 hc->average_throughput_noise = throughput_error_estimate;
1224                         } else {
1225                                 hc->average_throughput_noise = (hc->throughput_error_smoothing_factor * throughput_error_estimate)
1226                                         + ((1.0 + hc->throughput_error_smoothing_factor) * hc->average_throughput_noise);
1227                         }
1228
1229                         if (cabs (thread_wave_component) > 0) {
1230                                 /* Adjust the throughput wave so it's centered around the target wave,
1231                                  * and then calculate the adjusted throughput/thread ratio. */
1232                                 ratio = mono_double_complex_div (mono_double_complex_sub (throughput_wave_component, mono_double_complex_scalar_mul(thread_wave_component, hc->target_throughput_ratio)), thread_wave_component);
1233                                 transition = TRANSITION_CLIMBING_MOVE;
1234                         } else {
1235                                 ratio = mono_double_complex_make (0, 0);
1236                                 transition = TRANSITION_STABILIZING;
1237                         }
1238
1239                         noise_for_confidence = MAX (hc->average_throughput_noise, throughput_error_estimate);
1240                         if (noise_for_confidence > 0) {
1241                                 confidence = cabs (thread_wave_component) / noise_for_confidence / hc->target_signal_to_noise_ratio;
1242                         } else {
1243                                 /* there is no noise! */
1244                                 confidence = 1.0;
1245                         }
1246                 }
1247         }
1248
1249         /* We use just the real part of the complex ratio we just calculated. If the throughput signal
1250          * is exactly in phase with the thread signal, this will be the same as taking the magnitude of
1251          * the complex move and moving that far up. If they're 180 degrees out of phase, we'll move
1252          * backward (because this indicates that our changes are having the opposite of the intended effect).
1253          * If they're 90 degrees out of phase, we won't move at all, because we can't tell wether we're
1254          * having a negative or positive effect on throughput. */
1255         move = creal (ratio);
1256         move = CLAMP (move, -1.0, 1.0);
1257
1258         /* Apply our confidence multiplier. */
1259         move *= CLAMP (confidence, -1.0, 1.0);
1260
1261         /* Now apply non-linear gain, such that values around zero are attenuated, while higher values
1262          * are enhanced. This allows us to move quickly if we're far away from the target, but more slowly
1263         * if we're getting close, giving us rapid ramp-up without wild oscillations around the target. */
1264         gain = hc->max_change_per_second * sample_duration;
1265         move = pow (fabs (move), hc->gain_exponent) * (move >= 0.0 ? 1 : -1) * gain;
1266         move = MIN (move, hc->max_change_per_sample);
1267
1268         /* If the result was positive, and CPU is > 95%, refuse the move. */
1269         if (move > 0.0 && threadpool->cpu_usage > CPU_USAGE_HIGH)
1270                 move = 0.0;
1271
1272         /* Apply the move to our control setting. */
1273         hc->current_control_setting += move;
1274
1275         /* Calculate the new thread wave magnitude, which is based on the moving average we've been keeping of the
1276          * throughput error.  This average starts at zero, so we'll start with a nice safe little wave at first. */
1277         new_thread_wave_magnitude = (gint)(0.5 + (hc->current_control_setting * hc->average_throughput_noise
1278                 * hc->target_signal_to_noise_ratio * hc->thread_magnitude_multiplier * 2.0));
1279         new_thread_wave_magnitude = CLAMP (new_thread_wave_magnitude, 1, hc->max_thread_wave_magnitude);
1280
1281         /* Make sure our control setting is within the ThreadPool's limits. */
1282         hc->current_control_setting = CLAMP (hc->current_control_setting, threadpool->limit_worker_min, threadpool->limit_worker_max - new_thread_wave_magnitude);
1283
1284         /* Calculate the new thread count (control setting + square wave). */
1285         new_thread_count = (gint)(hc->current_control_setting + new_thread_wave_magnitude * ((hc->total_samples / (hc->wave_period / 2)) % 2));
1286
1287         /* Make sure the new thread count doesn't exceed the ThreadPool's limits. */
1288         new_thread_count = CLAMP (new_thread_count, threadpool->limit_worker_min, threadpool->limit_worker_max);
1289
1290         if (new_thread_count != current_thread_count)
1291                 hill_climbing_change_thread_count (new_thread_count, transition);
1292
1293         if (creal (ratio) < 0.0 && new_thread_count == threadpool->limit_worker_min)
1294                 *adjustment_interval = (gint)(0.5 + hc->current_sample_interval * (10.0 * MAX (-1.0 * creal (ratio), 1.0)));
1295         else
1296                 *adjustment_interval = hc->current_sample_interval;
1297
1298         return new_thread_count;
1299 }
1300
1301 static void
1302 heuristic_notify_work_completed (void)
1303 {
1304         g_assert (threadpool);
1305
1306         InterlockedIncrement (&threadpool->heuristic_completions);
1307         threadpool->heuristic_last_dequeue = mono_msec_ticks ();
1308 }
1309
1310 static gboolean
1311 heuristic_should_adjust (void)
1312 {
1313         g_assert (threadpool);
1314
1315         if (threadpool->heuristic_last_dequeue > threadpool->heuristic_last_adjustment + threadpool->heuristic_adjustment_interval) {
1316                 ThreadPoolCounter counter;
1317                 counter.as_gint64 = COUNTER_READ();
1318                 if (counter._.working <= counter._.max_working)
1319                         return TRUE;
1320         }
1321
1322         return FALSE;
1323 }
1324
1325 static void
1326 heuristic_adjust (void)
1327 {
1328         g_assert (threadpool);
1329
1330         if (mono_coop_mutex_trylock (&threadpool->heuristic_lock) == 0) {
1331                 gint32 completions = InterlockedExchange (&threadpool->heuristic_completions, 0);
1332                 gint64 sample_end = mono_msec_ticks ();
1333                 gint64 sample_duration = sample_end - threadpool->heuristic_sample_start;
1334
1335                 if (sample_duration >= threadpool->heuristic_adjustment_interval / 2) {
1336                         ThreadPoolCounter counter;
1337                         gint16 new_thread_count;
1338
1339                         counter.as_gint64 = COUNTER_READ ();
1340                         new_thread_count = hill_climbing_update (counter._.max_working, sample_duration, completions, &threadpool->heuristic_adjustment_interval);
1341
1342                         COUNTER_ATOMIC (counter, { counter._.max_working = new_thread_count; });
1343
1344                         if (new_thread_count > counter._.max_working)
1345                                 worker_request (mono_domain_get ());
1346
1347                         threadpool->heuristic_sample_start = sample_end;
1348                         threadpool->heuristic_last_adjustment = mono_msec_ticks ();
1349                 }
1350
1351                 mono_coop_mutex_unlock (&threadpool->heuristic_lock);
1352         }
1353 }
1354
1355 void
1356 mono_threadpool_ms_cleanup (void)
1357 {
1358 #ifndef DISABLE_SOCKETS
1359         mono_threadpool_ms_io_cleanup ();
1360 #endif
1361         mono_lazy_cleanup (&status, cleanup);
1362 }
1363
1364 MonoAsyncResult *
1365 mono_threadpool_ms_begin_invoke (MonoDomain *domain, MonoObject *target, MonoMethod *method, gpointer *params, MonoError *error)
1366 {
1367         static MonoClass *async_call_klass = NULL;
1368         MonoMethodMessage *message;
1369         MonoAsyncResult *async_result;
1370         MonoAsyncCall *async_call;
1371         MonoDelegate *async_callback = NULL;
1372         MonoObject *state = NULL;
1373
1374         if (!async_call_klass)
1375                 async_call_klass = mono_class_load_from_name (mono_defaults.corlib, "System", "MonoAsyncCall");
1376
1377         mono_lazy_initialize (&status, initialize);
1378
1379         mono_error_init (error);
1380
1381         message = mono_method_call_message_new (method, params, mono_get_delegate_invoke (method->klass), (params != NULL) ? (&async_callback) : NULL, (params != NULL) ? (&state) : NULL, error);
1382         return_val_if_nok (error, NULL);
1383
1384         async_call = (MonoAsyncCall*) mono_object_new_checked (domain, async_call_klass, error);
1385         return_val_if_nok (error, NULL);
1386
1387         MONO_OBJECT_SETREF (async_call, msg, message);
1388         MONO_OBJECT_SETREF (async_call, state, state);
1389
1390         if (async_callback) {
1391                 MONO_OBJECT_SETREF (async_call, cb_method, mono_get_delegate_invoke (((MonoObject*) async_callback)->vtable->klass));
1392                 MONO_OBJECT_SETREF (async_call, cb_target, async_callback);
1393         }
1394
1395         async_result = mono_async_result_new (domain, NULL, async_call->state, NULL, (MonoObject*) async_call, error);
1396         return_val_if_nok (error, NULL);
1397         MONO_OBJECT_SETREF (async_result, async_delegate, target);
1398
1399         mono_threadpool_ms_enqueue_work_item (domain, (MonoObject*) async_result, error);
1400         return_val_if_nok (error, NULL);
1401
1402         return async_result;
1403 }
1404
1405 MonoObject *
1406 mono_threadpool_ms_end_invoke (MonoAsyncResult *ares, MonoArray **out_args, MonoObject **exc, MonoError *error)
1407 {
1408         MonoAsyncCall *ac;
1409
1410         mono_error_init (error);
1411         g_assert (exc);
1412         g_assert (out_args);
1413
1414         *exc = NULL;
1415         *out_args = NULL;
1416
1417         /* check if already finished */
1418         mono_monitor_enter ((MonoObject*) ares);
1419
1420         if (ares->endinvoke_called) {
1421                 mono_error_set_invalid_operation(error, "Delegate EndInvoke method called more than once");
1422                 mono_monitor_exit ((MonoObject*) ares);
1423                 return NULL;
1424         }
1425
1426         ares->endinvoke_called = 1;
1427
1428         /* wait until we are really finished */
1429         if (ares->completed) {
1430                 mono_monitor_exit ((MonoObject *) ares);
1431         } else {
1432                 gpointer wait_event;
1433                 if (ares->handle) {
1434                         wait_event = mono_wait_handle_get_handle ((MonoWaitHandle*) ares->handle);
1435                 } else {
1436                         wait_event = mono_w32event_create (TRUE, FALSE);
1437                         g_assert(wait_event);
1438                         MonoWaitHandle *wait_handle = mono_wait_handle_new (mono_object_domain (ares), wait_event, error);
1439                         if (!is_ok (error)) {
1440                                 CloseHandle (wait_event);
1441                                 return NULL;
1442                         }
1443                         MONO_OBJECT_SETREF (ares, handle, (MonoObject*) wait_handle);
1444                 }
1445                 mono_monitor_exit ((MonoObject*) ares);
1446                 MONO_ENTER_GC_SAFE;
1447 #ifdef HOST_WIN32
1448                 WaitForSingleObjectEx (wait_event, INFINITE, TRUE);
1449 #else
1450                 mono_w32handle_wait_one (wait_event, MONO_INFINITE_WAIT, TRUE);
1451 #endif
1452                 MONO_EXIT_GC_SAFE;
1453         }
1454
1455         ac = (MonoAsyncCall*) ares->object_data;
1456         g_assert (ac);
1457
1458         *exc = ac->msg->exc; /* FIXME: GC add write barrier */
1459         *out_args = ac->out_args;
1460         return ac->res;
1461 }
1462
1463 gboolean
1464 mono_threadpool_ms_remove_domain_jobs (MonoDomain *domain, int timeout)
1465 {
1466         gint64 end;
1467         ThreadPoolDomain *tpdomain;
1468         gboolean ret;
1469
1470         g_assert (domain);
1471         g_assert (timeout >= -1);
1472
1473         g_assert (mono_domain_is_unloading (domain));
1474
1475         if (timeout != -1)
1476                 end = mono_msec_ticks () + timeout;
1477
1478 #ifndef DISABLE_SOCKETS
1479         mono_threadpool_ms_io_remove_domain_jobs (domain);
1480         if (timeout != -1) {
1481                 if (mono_msec_ticks () > end)
1482                         return FALSE;
1483         }
1484 #endif
1485
1486         /*
1487          * Wait for all threads which execute jobs in the domain to exit.
1488          * The is_unloading () check in worker_request () ensures that
1489          * no new jobs are added after we enter the lock below.
1490          */
1491         mono_lazy_initialize (&status, initialize);
1492         domains_lock ();
1493
1494         tpdomain = tpdomain_get (domain, FALSE);
1495         if (!tpdomain) {
1496                 domains_unlock ();
1497                 return TRUE;
1498         }
1499
1500         ret = TRUE;
1501
1502         while (tpdomain->outstanding_request + tpdomain->threadpool_jobs > 0) {
1503                 if (timeout == -1) {
1504                         mono_coop_cond_wait (&tpdomain->cleanup_cond, &threadpool->domains_lock);
1505                 } else {
1506                         gint64 now;
1507                         gint res;
1508
1509                         now = mono_msec_ticks();
1510                         if (now > end) {
1511                                 ret = FALSE;
1512                                 break;
1513                         }
1514
1515                         res = mono_coop_cond_timedwait (&tpdomain->cleanup_cond, &threadpool->domains_lock, end - now);
1516                         if (res != 0) {
1517                                 ret = FALSE;
1518                                 break;
1519                         }
1520                 }
1521         }
1522
1523         /* Remove from the list the worker threads look at */
1524         tpdomain_remove (tpdomain);
1525
1526         domains_unlock ();
1527
1528         mono_coop_cond_destroy (&tpdomain->cleanup_cond);
1529         tpdomain_free (tpdomain);
1530
1531         return ret;
1532 }
1533
1534 void
1535 mono_threadpool_ms_suspend (void)
1536 {
1537         if (threadpool)
1538                 threadpool->suspended = TRUE;
1539 }
1540
1541 void
1542 mono_threadpool_ms_resume (void)
1543 {
1544         if (threadpool)
1545                 threadpool->suspended = FALSE;
1546 }
1547
1548 void
1549 ves_icall_System_Threading_ThreadPool_GetAvailableThreadsNative (gint32 *worker_threads, gint32 *completion_port_threads)
1550 {
1551         ThreadPoolCounter counter;
1552
1553         if (!worker_threads || !completion_port_threads)
1554                 return;
1555
1556         mono_lazy_initialize (&status, initialize);
1557
1558         counter.as_gint64 = COUNTER_READ ();
1559
1560         *worker_threads = MAX (0, threadpool->limit_worker_max - counter._.active);
1561         *completion_port_threads = threadpool->limit_io_max;
1562 }
1563
1564 void
1565 ves_icall_System_Threading_ThreadPool_GetMinThreadsNative (gint32 *worker_threads, gint32 *completion_port_threads)
1566 {
1567         if (!worker_threads || !completion_port_threads)
1568                 return;
1569
1570         mono_lazy_initialize (&status, initialize);
1571
1572         *worker_threads = threadpool->limit_worker_min;
1573         *completion_port_threads = threadpool->limit_io_min;
1574 }
1575
1576 void
1577 ves_icall_System_Threading_ThreadPool_GetMaxThreadsNative (gint32 *worker_threads, gint32 *completion_port_threads)
1578 {
1579         if (!worker_threads || !completion_port_threads)
1580                 return;
1581
1582         mono_lazy_initialize (&status, initialize);
1583
1584         *worker_threads = threadpool->limit_worker_max;
1585         *completion_port_threads = threadpool->limit_io_max;
1586 }
1587
1588 MonoBoolean
1589 ves_icall_System_Threading_ThreadPool_SetMinThreadsNative (gint32 worker_threads, gint32 completion_port_threads)
1590 {
1591         mono_lazy_initialize (&status, initialize);
1592
1593         if (worker_threads <= 0 || worker_threads > threadpool->limit_worker_max)
1594                 return FALSE;
1595         if (completion_port_threads <= 0 || completion_port_threads > threadpool->limit_io_max)
1596                 return FALSE;
1597
1598         threadpool->limit_worker_min = worker_threads;
1599         threadpool->limit_io_min = completion_port_threads;
1600
1601         return TRUE;
1602 }
1603
1604 MonoBoolean
1605 ves_icall_System_Threading_ThreadPool_SetMaxThreadsNative (gint32 worker_threads, gint32 completion_port_threads)
1606 {
1607         gint cpu_count = mono_cpu_count ();
1608
1609         mono_lazy_initialize (&status, initialize);
1610
1611         if (worker_threads < threadpool->limit_worker_min || worker_threads < cpu_count)
1612                 return FALSE;
1613         if (completion_port_threads < threadpool->limit_io_min || completion_port_threads < cpu_count)
1614                 return FALSE;
1615
1616         threadpool->limit_worker_max = worker_threads;
1617         threadpool->limit_io_max = completion_port_threads;
1618
1619         return TRUE;
1620 }
1621
1622 void
1623 ves_icall_System_Threading_ThreadPool_InitializeVMTp (MonoBoolean *enable_worker_tracking)
1624 {
1625         if (enable_worker_tracking) {
1626                 // TODO implement some kind of switch to have the possibily to use it
1627                 *enable_worker_tracking = FALSE;
1628         }
1629
1630         mono_lazy_initialize (&status, initialize);
1631 }
1632
1633 MonoBoolean
1634 ves_icall_System_Threading_ThreadPool_NotifyWorkItemComplete (void)
1635 {
1636         ThreadPoolCounter counter;
1637
1638         if (mono_domain_is_unloading (mono_domain_get ()) || mono_runtime_is_shutting_down ())
1639                 return FALSE;
1640
1641         heuristic_notify_work_completed ();
1642
1643         if (heuristic_should_adjust ())
1644                 heuristic_adjust ();
1645
1646         counter.as_gint64 = COUNTER_READ ();
1647         return counter._.working <= counter._.max_working;
1648 }
1649
1650 void
1651 ves_icall_System_Threading_ThreadPool_NotifyWorkItemProgressNative (void)
1652 {
1653         heuristic_notify_work_completed ();
1654
1655         if (heuristic_should_adjust ())
1656                 heuristic_adjust ();
1657 }
1658
1659 void
1660 ves_icall_System_Threading_ThreadPool_ReportThreadStatus (MonoBoolean is_working)
1661 {
1662         // TODO
1663         MonoError error;
1664         mono_error_set_not_implemented (&error, "");
1665         mono_error_set_pending_exception (&error);
1666 }
1667
1668 MonoBoolean
1669 ves_icall_System_Threading_ThreadPool_RequestWorkerThread (void)
1670 {
1671         return worker_request (mono_domain_get ());
1672 }
1673
1674 MonoBoolean G_GNUC_UNUSED
1675 ves_icall_System_Threading_ThreadPool_PostQueuedCompletionStatus (MonoNativeOverlapped *native_overlapped)
1676 {
1677         /* This copy the behavior of the current Mono implementation */
1678         MonoError error;
1679         mono_error_set_not_implemented (&error, "");
1680         mono_error_set_pending_exception (&error);
1681         return FALSE;
1682 }
1683
1684 MonoBoolean G_GNUC_UNUSED
1685 ves_icall_System_Threading_ThreadPool_BindIOCompletionCallbackNative (gpointer file_handle)
1686 {
1687         /* This copy the behavior of the current Mono implementation */
1688         return TRUE;
1689 }
1690
1691 MonoBoolean G_GNUC_UNUSED
1692 ves_icall_System_Threading_ThreadPool_IsThreadPoolHosted (void)
1693 {
1694         return FALSE;
1695 }