[corlib] ParseNumber.StringToInt now check for overflows.
[mono.git] / mono / metadata / threadpool-microsoft.c
1 /*
2  * threadpool-microsoft.c: Microsoft threadpool runtime support
3  *
4  * Author:
5  *      Ludovic Henry (ludovic.henry@xamarin.com)
6  *
7  * Copyright 2015 Xamarin, Inc (http://www.xamarin.com)
8  */
9
10 //
11 // Copyright (c) Microsoft. All rights reserved.
12 // Licensed under the MIT license. See LICENSE file in the project root for full license information.
13 //
14 // Files:
15 //  - src/vm/comthreadpool.cpp
16 //  - src/vm/win32threadpoolcpp
17 //  - src/vm/threadpoolrequest.cpp
18 //  - src/vm/hillclimbing.cpp
19 //
20 // Ported from C++ to C and adjusted to Mono runtime
21
22 #include <config.h>
23 #include <stdlib.h>
24 #include <math.h>
25
26 #ifdef PLATFORM_ANDROID
27 #include <../../support/libm/complex.h>
28 #else
29 #include <complex.h>
30 #endif
31
32 #include <glib.h>
33
34 #include "class-internals.h"
35 #include "exception.h"
36 #include "object.h"
37 #include "object-internals.h"
38 #include "threadpool-microsoft.h"
39 #include "threadpool-internals.h"
40 #include "utils/atomic.h"
41 #include "utils/mono-compiler.h"
42 #include "utils/mono-proclib.h"
43 #include "utils/mono-threads.h"
44 #include "utils/mono-time.h"
45 #include "utils/mono-rand.h"
46
47 #define SMALL_STACK (sizeof (gpointer) * 32 * 1024)
48
49 #define CPU_USAGE_LOW 80
50 #define CPU_USAGE_HIGH 95
51
52 #define MONITOR_INTERVAL 100 // ms
53
54 /* The exponent to apply to the gain. 1.0 means to use linear gain,
55  * higher values will enhance large moves and damp small ones.
56  * default: 2.0 */
57 #define HILL_CLIMBING_GAIN_EXPONENT 2.0
58
59 /* The 'cost' of a thread. 0 means drive for increased throughput regardless
60  * of thread count, higher values bias more against higher thread counts.
61  * default: 0.15 */
62 #define HILL_CLIMBING_BIAS 0.15
63
64 #define HILL_CLIMBING_WAVE_PERIOD 4
65 #define HILL_CLIMBING_MAX_WAVE_MAGNITUDE 20
66 #define HILL_CLIMBING_WAVE_MAGNITUDE_MULTIPLIER 1.0
67 #define HILL_CLIMBING_WAVE_HISTORY_SIZE 8
68 #define HILL_CLIMBING_TARGET_SIGNAL_TO_NOISE_RATIO 3.0
69 #define HILL_CLIMBING_MAX_CHANGE_PER_SECOND 4
70 #define HILL_CLIMBING_MAX_CHANGE_PER_SAMPLE 20
71 #define HILL_CLIMBING_SAMPLE_INTERVAL_LOW 10
72 #define HILL_CLIMBING_SAMPLE_INTERVAL_HIGH 200
73 #define HILL_CLIMBING_ERROR_SMOOTHING_FACTOR 0.01
74 #define HILL_CLIMBING_MAX_SAMPLE_ERROR_PERCENT 0.15
75
76 /* Keep in sync with System.Threading.NativeOverlapped */
77 struct _MonoNativeOverlapped {
78         gpointer internal_low;
79         gpointer internal_high;
80         gint offset_low;
81         gint offset_high;
82         gpointer event_handle;
83 };
84
85 /* Keep in sync with System.Threading.RegisteredWaitHandleSafe */
86 typedef struct _MonoRegisteredWaitHandleSafe MonoRegisteredWaitHandleSafe;
87 struct _MonoRegisteredWaitHandleSafe {
88         MonoObject object;
89         gpointer registered_wait_handle;
90         MonoWaitHandle *internal_wait_object;
91         gboolean release_needed; // init: false
92         volatile gint32 lock; // initt: 0
93 };
94
95 /* Keep in sync with System.Threading.RegisteredWaitHandle */
96 struct _MonoRegisteredWaitHandle {
97         MonoObject object;
98         MonoRegisteredWaitHandleSafe *internal_registered_wait;
99 };
100
101 /* Keep in sync with the System.MonoAsyncCall class which provides GC tracking */
102 typedef struct _MonoAsyncCall MonoAsyncCall;
103 struct _MonoAsyncCall {
104         MonoObject object;
105         MonoMethodMessage *msg;
106         MonoMethod *cb_method;
107         MonoDelegate *cb_target;
108         MonoObject *state;
109         MonoObject *res;
110         MonoArray *out_args;
111 };
112
113 /* Keep in sync with System.Threading.RuntimeWorkItem */
114 struct _MonoRuntimeWorkItem {
115         MonoObject object;
116         MonoAsyncResult *ares;
117 };
118
119 typedef union _ThreadPoolCounter ThreadPoolCounter;
120 union _ThreadPoolCounter {
121         struct {
122                 gint16 max_working; /* determined by heuristic */
123                 gint16 active; /* working or waiting on thread_work_sem; warm threads */
124                 gint16 working;
125                 gint16 parked;
126         } _;
127         gint64 as_gint64;
128 };
129
130 typedef struct _ThreadPoolDomain ThreadPoolDomain;
131 struct _ThreadPoolDomain {
132         MonoDomain *domain;
133         gint32 outstanding_request;
134 };
135
136 typedef struct _ThreadPoolThread ThreadPoolThread;
137 struct _ThreadPoolThread {
138         mono_cond_t cond;
139 };
140
141 typedef struct _ThreadPoolHillClimbing ThreadPoolHillClimbing;
142 struct _ThreadPoolHillClimbing {
143         gint32 wave_period;
144         gint32 samples_to_measure;
145         gdouble target_throughput_ratio;
146         gdouble target_signal_to_noise_ratio;
147         gdouble max_change_per_second;
148         gdouble max_change_per_sample;
149         gint32 max_thread_wave_magnitude;
150         gint32 sample_interval_low;
151         gdouble thread_magnitude_multiplier;
152         gint32 sample_interval_high;
153         gdouble throughput_error_smoothing_factor;
154         gdouble gain_exponent;
155         gdouble max_sample_error;
156
157         gdouble current_control_setting;
158         gint64 total_samples;
159         gint16 last_thread_count;
160         gdouble elapsed_since_last_change;
161         gdouble completions_since_last_change;
162
163         gdouble average_throughput_noise;
164
165         gdouble *samples;
166         gdouble *thread_counts;
167
168         guint32 current_sample_interval;
169         gpointer random_interval_generator;
170
171         gint32 accumulated_completion_count;
172         gdouble accumulated_sample_duration;
173 };
174
175 typedef struct _ThreadPool ThreadPool;
176 struct _ThreadPool {
177         ThreadPoolCounter counters;
178
179         GPtrArray *domains; // ThreadPoolDomain* []
180         mono_mutex_t domains_lock;
181
182         GPtrArray *working_threads; // MonoInternalThread* []
183         mono_mutex_t working_threads_lock;
184
185         GPtrArray *parked_threads; // ThreadPoolThread* []
186         mono_mutex_t parked_threads_lock;
187
188         gint32 heuristic_completions;
189         guint32 heuristic_sample_start;
190         guint32 heuristic_last_dequeue; // ms
191         guint32 heuristic_last_adjustment; // ms
192         guint32 heuristic_adjustment_interval; // ms
193         ThreadPoolHillClimbing heuristic_hill_climbing;
194         mono_mutex_t heuristic_lock;
195
196         gint32 limit_worker_min;
197         gint32 limit_worker_max;
198         gint32 limit_io_min;
199         gint32 limit_io_max;
200
201         MonoCpuUsageState *cpu_usage_state;
202         gint32 cpu_usage;
203
204         /* suspended by the debugger */
205         gboolean suspended;
206 };
207
208 typedef enum _ThreadPoolHeuristicStateTransition ThreadPoolHeuristicStateTransition;
209 enum _ThreadPoolHeuristicStateTransition {
210         TRANSITION_WARMUP,
211         TRANSITION_INITIALIZING,
212         TRANSITION_RANDOM_MOVE,
213         TRANSITION_CLIMBING_MOVE,
214         TRANSITION_CHANGE_POINT,
215         TRANSITION_STABILIZING,
216         TRANSITION_STARVATION,
217         TRANSITION_THREAD_TIMED_OUT,
218         TRANSITION_UNDEFINED,
219 };
220
221 enum {
222         STATUS_NOT_INITIALIZED,
223         STATUS_INITIALIZING,
224         STATUS_INITIALIZED,
225         STATUS_CLEANING_UP,
226         STATUS_CLEANED_UP,
227 };
228
229 enum {
230         MONITOR_STATUS_REQUESTED,
231         MONITOR_STATUS_WAITING_FOR_REQUEST,
232         MONITOR_STATUS_NOT_RUNNING,
233 };
234
235 static gint32 status = STATUS_NOT_INITIALIZED;
236 static gint32 monitor_status = MONITOR_STATUS_NOT_RUNNING;
237
238 static ThreadPool* threadpool;
239
240 #define TP_COUNTER_CHECK(counter) \
241         do { \
242                 g_assert (counter._.max_working > 0); \
243                 g_assert (counter._.active >= 0); \
244         } while (0)
245
246 #define TP_COUNTER_READ() ((ThreadPoolCounter) InterlockedRead64 (&threadpool->counters.as_gint64))
247
248 #define TP_COUNTER_ATOMIC(var,block) \
249         do { \
250                 ThreadPoolCounter __old; \
251                 do { \
252                         g_assert (threadpool); \
253                         (var) = __old = TP_COUNTER_READ (); \
254                         { block; } \
255                         TP_COUNTER_CHECK (var); \
256                 } while (InterlockedCompareExchange64 (&threadpool->counters.as_gint64, (var).as_gint64, __old.as_gint64) != __old.as_gint64); \
257         } while (0)
258
259 #define TP_COUNTER_TRY_ATOMIC(res,var,block) \
260         do { \
261                 ThreadPoolCounter __old; \
262                 do { \
263                         g_assert (threadpool); \
264                         (var) = __old = TP_COUNTER_READ (); \
265                         (res) = FALSE; \
266                         { block; } \
267                         TP_COUNTER_CHECK (var); \
268                         (res) = InterlockedCompareExchange64 (&threadpool->counters.as_gint64, (var).as_gint64, __old.as_gint64) == __old.as_gint64; \
269                 } while (0); \
270         } while (0)
271
272 static gpointer
273 tp_rand_create (void)
274 {
275         mono_rand_open ();
276         return mono_rand_init (NULL, 0);
277 }
278
279 static guint32
280 tp_rand_next (gpointer *handle, guint32 min, guint32 max)
281 {
282         guint32 val;
283         if (!mono_rand_try_get_uint32 (handle, &val, min, max)) {
284                 // FIXME handle error
285                 g_assert_not_reached ();
286         }
287         return val;
288 }
289
290 static void
291 tp_rand_free (gpointer handle)
292 {
293         mono_rand_close (handle);
294 }
295
296 static void
297 tp_ensure_initialized (gboolean *enable_worker_tracking)
298 {
299         ThreadPoolHillClimbing *hc;
300         const char *threads_per_cpu_env;
301         gint threads_per_cpu;
302         gint threads_count;
303
304         if (enable_worker_tracking) {
305                 // TODO implement some kind of switch to have the possibily to use it
306                 *enable_worker_tracking = FALSE;
307         }
308
309         if (status >= STATUS_INITIALIZED)
310                 return;
311         if (status == STATUS_INITIALIZING || InterlockedCompareExchange (&status, STATUS_INITIALIZING, STATUS_NOT_INITIALIZED) != STATUS_NOT_INITIALIZED) {
312                 while (status == STATUS_INITIALIZING)
313                         mono_thread_info_yield ();
314                 g_assert (status >= STATUS_INITIALIZED);
315                 return;
316         }
317
318         g_assert (!threadpool);
319         threadpool = g_new0 (ThreadPool, 1);
320         g_assert (threadpool);
321
322         threadpool->domains = g_ptr_array_new ();
323         mono_mutex_init_recursive (&threadpool->domains_lock);
324
325         threadpool->parked_threads = g_ptr_array_new ();
326         mono_mutex_init (&threadpool->parked_threads_lock);
327
328         threadpool->working_threads = g_ptr_array_new ();
329         mono_mutex_init (&threadpool->working_threads_lock);
330
331         threadpool->heuristic_adjustment_interval = 10;
332         mono_mutex_init (&threadpool->heuristic_lock);
333
334         mono_rand_open ();
335
336         hc = &threadpool->heuristic_hill_climbing;
337
338         hc->wave_period = HILL_CLIMBING_WAVE_PERIOD;
339         hc->max_thread_wave_magnitude = HILL_CLIMBING_MAX_WAVE_MAGNITUDE;
340         hc->thread_magnitude_multiplier = (gdouble) HILL_CLIMBING_WAVE_MAGNITUDE_MULTIPLIER;
341         hc->samples_to_measure = hc->wave_period * HILL_CLIMBING_WAVE_HISTORY_SIZE;
342         hc->target_throughput_ratio = (gdouble) HILL_CLIMBING_BIAS;
343         hc->target_signal_to_noise_ratio = (gdouble) HILL_CLIMBING_TARGET_SIGNAL_TO_NOISE_RATIO;
344         hc->max_change_per_second = (gdouble) HILL_CLIMBING_MAX_CHANGE_PER_SECOND;
345         hc->max_change_per_sample = (gdouble) HILL_CLIMBING_MAX_CHANGE_PER_SAMPLE;
346         hc->sample_interval_low = HILL_CLIMBING_SAMPLE_INTERVAL_LOW;
347         hc->sample_interval_high = HILL_CLIMBING_SAMPLE_INTERVAL_HIGH;
348         hc->throughput_error_smoothing_factor = (gdouble) HILL_CLIMBING_ERROR_SMOOTHING_FACTOR;
349         hc->gain_exponent = (gdouble) HILL_CLIMBING_GAIN_EXPONENT;
350         hc->max_sample_error = (gdouble) HILL_CLIMBING_MAX_SAMPLE_ERROR_PERCENT;
351         hc->current_control_setting = 0;
352         hc->total_samples = 0;
353         hc->last_thread_count = 0;
354         hc->average_throughput_noise = 0;
355         hc->elapsed_since_last_change = 0;
356         hc->accumulated_completion_count = 0;
357         hc->accumulated_sample_duration = 0;
358         hc->samples = g_new0 (gdouble, hc->samples_to_measure);
359         hc->thread_counts = g_new0 (gdouble, hc->samples_to_measure);
360         hc->random_interval_generator = tp_rand_create ();
361         hc->current_sample_interval = tp_rand_next (&hc->random_interval_generator, hc->sample_interval_low, hc->sample_interval_high);
362
363         if (!(threads_per_cpu_env = g_getenv ("MONO_THREADS_PER_CPU")))
364                 threads_per_cpu = 1;
365         else
366                 threads_per_cpu = CLAMP (atoi (threads_per_cpu_env), 1, 50);
367
368         threads_count = mono_cpu_count () * threads_per_cpu;
369
370         threadpool->limit_worker_min = threadpool->limit_io_min = threads_count;
371         threadpool->limit_worker_max = threadpool->limit_io_max = threads_count * 100;
372
373         threadpool->counters._.max_working = threadpool->limit_worker_min;
374
375         threadpool->cpu_usage_state = g_new0 (MonoCpuUsageState, 1);
376
377         threadpool->suspended = FALSE;
378
379         status = STATUS_INITIALIZED;
380 }
381
382 static void
383 tp_ensure_cleanedup (void)
384 {
385         if (status == STATUS_NOT_INITIALIZED && InterlockedCompareExchange (&status, STATUS_CLEANED_UP, STATUS_NOT_INITIALIZED) == STATUS_NOT_INITIALIZED)
386                 return;
387         if (status == STATUS_INITIALIZING) {
388                 while (status == STATUS_INITIALIZING)
389                         mono_thread_info_yield ();
390         }
391         if (status == STATUS_CLEANED_UP)
392                 return;
393         if (status == STATUS_CLEANING_UP || InterlockedCompareExchange (&status, STATUS_CLEANING_UP, STATUS_INITIALIZED) != STATUS_INITIALIZED) {
394                 while (status == STATUS_CLEANING_UP)
395                         mono_thread_info_yield ();
396                 g_assert (status == STATUS_CLEANED_UP);
397                 return;
398         }
399
400         /* we make the assumption along the code that we are
401          * cleaning up only if the runtime is shutting down */
402         g_assert (mono_runtime_is_shutting_down ());
403
404         /* Unpark all worker threads */
405         mono_mutex_lock (&threadpool->parked_threads_lock);
406         for (;;) {
407                 guint i;
408                 ThreadPoolCounter counter = TP_COUNTER_READ ();
409                 if (counter._.active == 0 && counter._.parked == 0)
410                         break;
411                 for (i = 0; i < threadpool->parked_threads->len; ++i) {
412                         ThreadPoolThread *tpthread = g_ptr_array_index (threadpool->parked_threads, i);
413                         mono_cond_signal (&tpthread->cond);
414                 }
415                 mono_mutex_unlock (&threadpool->parked_threads_lock);
416                 usleep (1000);
417                 mono_mutex_lock (&threadpool->parked_threads_lock);
418         }
419         mono_mutex_unlock (&threadpool->parked_threads_lock);
420
421         while (monitor_status != MONITOR_STATUS_NOT_RUNNING)
422                 usleep (1000);
423
424         g_ptr_array_free (threadpool->domains, TRUE);
425         mono_mutex_destroy (&threadpool->domains_lock);
426
427         g_ptr_array_free (threadpool->parked_threads, TRUE);
428         mono_mutex_destroy (&threadpool->parked_threads_lock);
429
430         g_ptr_array_free (threadpool->working_threads, TRUE);
431         mono_mutex_destroy (&threadpool->working_threads_lock);
432
433         mono_mutex_destroy (&threadpool->heuristic_lock);
434         g_free (threadpool->heuristic_hill_climbing.samples);
435         g_free (threadpool->heuristic_hill_climbing.thread_counts);
436         tp_rand_free (threadpool->heuristic_hill_climbing.random_interval_generator);
437
438         g_free (threadpool->cpu_usage_state);
439
440         g_assert (threadpool);
441         g_free (threadpool);
442         threadpool = NULL;
443         g_assert (!threadpool);
444
445         status = STATUS_CLEANED_UP;
446 }
447
448 static void
449 tp_queue_work_item (MonoRuntimeWorkItem *rwi)
450 {
451         static MonoClass *threadpool_class = NULL;
452         static MonoMethod *unsafe_queue_custom_work_item_method = NULL;
453         MonoObject *exc = NULL;
454         MonoBoolean f;
455         gpointer args [2];
456
457         g_assert (rwi);
458
459         if (!threadpool_class)
460                 threadpool_class = mono_class_from_name (mono_defaults.corlib, "System.Threading", "ThreadPool");
461         g_assert (threadpool_class);
462
463         if (!unsafe_queue_custom_work_item_method)
464                 unsafe_queue_custom_work_item_method = mono_class_get_method_from_name (threadpool_class, "UnsafeQueueCustomWorkItem", 2);
465         g_assert (unsafe_queue_custom_work_item_method);
466
467         f = FALSE;
468
469         args [0] = (gpointer) rwi;
470         args [1] = (gpointer) mono_value_box (mono_domain_get (), mono_defaults.boolean_class, &f);
471
472         mono_runtime_invoke (unsafe_queue_custom_work_item_method, rwi, args, &exc);
473         if (exc)
474                 mono_raise_exception ((MonoException*) exc);
475 }
476
477 static void
478 tp_domain_add (ThreadPoolDomain *tpdomain)
479 {
480         guint i, len;
481
482         g_assert (tpdomain);
483
484         mono_mutex_lock (&threadpool->domains_lock);
485         len = threadpool->domains->len;
486         for (i = 0; i < len; ++i) {
487                 if (g_ptr_array_index (threadpool->domains, i) == tpdomain)
488                         break;
489         }
490         if (i == len)
491                 g_ptr_array_add (threadpool->domains, tpdomain);
492         mono_mutex_unlock (&threadpool->domains_lock);
493 }
494
495 static gboolean
496 tp_domain_remove (ThreadPoolDomain *tpdomain)
497 {
498         gboolean res;
499
500         g_assert (tpdomain);
501
502         mono_mutex_lock (&threadpool->domains_lock);
503         res = g_ptr_array_remove (threadpool->domains, tpdomain);
504         mono_mutex_unlock (&threadpool->domains_lock);
505
506         return res;
507 }
508
509 static ThreadPoolDomain *
510 tp_domain_get_or_create (MonoDomain *domain)
511 {
512         ThreadPoolDomain *tpdomain = NULL;
513         guint i;
514
515         g_assert (domain);
516
517         mono_mutex_lock (&threadpool->domains_lock);
518         for (i = 0; i < threadpool->domains->len; ++i) {
519                 ThreadPoolDomain *tmp = g_ptr_array_index (threadpool->domains, i);
520                 if (tmp->domain == domain) {
521                         tpdomain = tmp;
522                         break;
523                 }
524         }
525         if (!tpdomain) {
526                 tpdomain = g_new0 (ThreadPoolDomain, 1);
527                 tpdomain->domain = domain;
528                 tp_domain_add (tpdomain);
529         }
530         mono_mutex_unlock (&threadpool->domains_lock);
531         return tpdomain;
532 }
533
534 static gboolean
535 tp_domain_any_has_request ()
536 {
537         gboolean res = FALSE;
538         guint i;
539
540         mono_mutex_lock (&threadpool->domains_lock);
541         for (i = 0; i < threadpool->domains->len; ++i) {
542                 ThreadPoolDomain *tmp = g_ptr_array_index (threadpool->domains, i);
543                 if (tmp->outstanding_request > 0) {
544                         res = TRUE;
545                         break;
546                 }
547         }
548         mono_mutex_unlock (&threadpool->domains_lock);
549         return res;
550 }
551
552 static ThreadPoolDomain *
553 tp_domain_get_next (ThreadPoolDomain *current)
554 {
555         ThreadPoolDomain *tpdomain = NULL;
556         guint len;
557
558         mono_mutex_lock (&threadpool->domains_lock);
559         len = threadpool->domains->len;
560         if (len > 0) {
561                 guint i, current_idx = -1;
562                 if (current) {
563                         for (i = 0; i < len; ++i) {
564                                 if (current == g_ptr_array_index (threadpool->domains, i)) {
565                                         current_idx = i;
566                                         break;
567                                 }
568                         }
569                         g_assert (current_idx >= 0);
570                 }
571                 for (i = current_idx + 1; i < len + current_idx + 1; ++i) {
572                         ThreadPoolDomain *tmp = g_ptr_array_index (threadpool->domains, i % len);
573                         if (tmp->outstanding_request > 0) {
574                                 tpdomain = tmp;
575                                 tpdomain->outstanding_request --;
576                                 g_assert (tpdomain->outstanding_request >= 0);
577                                 break;
578                         }
579                 }
580         }
581         mono_mutex_unlock (&threadpool->domains_lock);
582         return tpdomain;
583 }
584
585 static void
586 tp_worker_park (void)
587 {
588         ThreadPoolThread tpthread;
589         mono_cond_init (&tpthread.cond, NULL);
590
591         mono_mutex_lock (&threadpool->parked_threads_lock);
592         g_ptr_array_add (threadpool->parked_threads, &tpthread);
593         mono_cond_wait (&tpthread.cond, &threadpool->parked_threads_lock);
594         g_ptr_array_remove (threadpool->parked_threads, &tpthread);
595         mono_mutex_unlock (&threadpool->parked_threads_lock);
596
597         mono_cond_destroy (&tpthread.cond);
598 }
599
600 static gboolean
601 tp_worker_try_unpark (void)
602 {
603         gboolean res = FALSE;
604         guint len;
605
606         mono_mutex_lock (&threadpool->parked_threads_lock);
607         len = threadpool->parked_threads->len;
608         if (len > 0) {
609                 ThreadPoolThread *tpthread = g_ptr_array_index (threadpool->parked_threads, len - 1);
610                 mono_cond_signal (&tpthread->cond);
611                 res = TRUE;
612         }
613         mono_mutex_unlock (&threadpool->parked_threads_lock);
614         return res;
615 }
616
617 static void
618 tp_worker_thread (gpointer data)
619 {
620         static MonoClass *threadpool_wait_callback_class = NULL;
621         static MonoMethod *perform_wait_callback_method = NULL;
622         MonoInternalThread *thread;
623         ThreadPoolDomain *tpdomain;
624         ThreadPoolCounter counter;
625         gboolean retire = FALSE;
626
627         g_assert (status >= STATUS_INITIALIZED);
628
629         tpdomain = data;
630         g_assert (tpdomain);
631         g_assert (tpdomain->domain);
632
633         if (mono_runtime_is_shutting_down () || mono_domain_is_unloading (tpdomain->domain)) {
634                 TP_COUNTER_ATOMIC (counter, { counter._.active --; });
635                 return;
636         }
637
638         if (!threadpool_wait_callback_class)
639                 threadpool_wait_callback_class = mono_class_from_name (mono_defaults.corlib, "System.Threading.Microsoft", "_ThreadPoolWaitCallback");
640         g_assert (threadpool_wait_callback_class);
641
642         if (!perform_wait_callback_method)
643                 perform_wait_callback_method = mono_class_get_method_from_name (threadpool_wait_callback_class, "PerformWaitCallback", 0);
644         g_assert (perform_wait_callback_method);
645
646         g_assert (threadpool);
647
648         thread = mono_thread_internal_current ();
649         g_assert (thread);
650
651         mono_mutex_lock (&threadpool->domains_lock);
652
653         do {
654                 guint i, c;
655
656                 g_assert (tpdomain);
657                 g_assert (tpdomain->domain);
658
659                 tpdomain->domain->threadpool_jobs ++;
660
661                 mono_mutex_unlock (&threadpool->domains_lock);
662
663                 mono_mutex_lock (&threadpool->working_threads_lock);
664                 g_ptr_array_add (threadpool->working_threads, thread);
665                 mono_mutex_unlock (&threadpool->working_threads_lock);
666
667                 TP_COUNTER_ATOMIC (counter, { counter._.working ++; });
668
669                 mono_thread_push_appdomain_ref (tpdomain->domain);
670                 if (mono_domain_set (tpdomain->domain, FALSE)) {
671                         MonoObject *exc = NULL;
672                         MonoObject *res = mono_runtime_invoke (perform_wait_callback_method, NULL, NULL, &exc);
673                         if (exc)
674                                 mono_internal_thread_unhandled_exception (exc);
675                         else if (res && *(MonoBoolean*) mono_object_unbox (res) == FALSE)
676                                 retire = TRUE;
677
678                         mono_thread_clr_state (thread , ~ThreadState_Background);
679                         if (!mono_thread_test_state (thread , ThreadState_Background))
680                                 ves_icall_System_Threading_Thread_SetState (thread, ThreadState_Background);
681                 }
682                 mono_thread_pop_appdomain_ref ();
683
684                 TP_COUNTER_ATOMIC (counter, { counter._.working --; });
685
686                 mono_mutex_lock (&threadpool->working_threads_lock);
687                 g_ptr_array_remove_fast (threadpool->working_threads, thread);
688                 mono_mutex_unlock (&threadpool->working_threads_lock);
689
690                 mono_mutex_lock (&threadpool->domains_lock);
691
692                 tpdomain->domain->threadpool_jobs --;
693                 g_assert (tpdomain->domain->threadpool_jobs >= 0);
694
695                 if (tpdomain->domain->threadpool_jobs == 0 && mono_domain_is_unloading (tpdomain->domain)) {
696                         gboolean removed = tp_domain_remove (tpdomain);
697                         g_assert (removed);
698                         if (tpdomain->domain->cleanup_semaphore)
699                                 ReleaseSemaphore (tpdomain->domain->cleanup_semaphore, 1, NULL);
700                         g_free (tpdomain);
701                         tpdomain = NULL;
702                 }
703
704                 for (i = 0, c = 5; i < c; ++i) {
705                         if (mono_runtime_is_shutting_down ())
706                                 break;
707
708                         if (!retire) {
709                                 tpdomain = tp_domain_get_next (tpdomain);
710                                 if (tpdomain)
711                                         break;
712                         }
713
714                         if (i < c - 1) {
715                                 gboolean park = TRUE;
716
717                                 TP_COUNTER_ATOMIC (counter, {
718                                         if (counter._.active <= counter._.max_working) {
719                                                 park = FALSE;
720                                                 break;
721                                         }
722                                         counter._.active --;
723                                         counter._.parked ++;
724                                 });
725
726                                 if (park) {
727                                         mono_mutex_unlock (&threadpool->domains_lock);
728                                         tp_worker_park ();
729                                         mono_mutex_lock (&threadpool->domains_lock);
730
731                                         TP_COUNTER_ATOMIC (counter, {
732                                                 counter._.active ++;
733                                                 counter._.parked --;
734                                         });
735                                 }
736                         }
737
738                         retire = FALSE;
739                 }
740         } while (tpdomain && !mono_runtime_is_shutting_down ());
741
742         mono_mutex_unlock (&threadpool->domains_lock);
743
744         TP_COUNTER_ATOMIC (counter, { counter._.active --; });
745 }
746
747 static gboolean
748 tp_worker_try_create (ThreadPoolDomain *tpdomain)
749 {
750         g_assert (tpdomain);
751         g_assert (tpdomain->domain);
752
753         return mono_thread_create_internal (tpdomain->domain, tp_worker_thread, tpdomain, TRUE, 0) != NULL;
754 }
755
756 static void tp_monitor_ensure_running (void);
757
758 static gboolean
759 tp_worker_request (MonoDomain *domain)
760 {
761         ThreadPoolDomain *tpdomain;
762         ThreadPoolCounter counter;
763
764         g_assert (domain);
765         g_assert (threadpool);
766
767         if (mono_runtime_is_shutting_down () || mono_domain_is_unloading (domain))
768                 return FALSE;
769
770         mono_mutex_lock (&threadpool->domains_lock);
771         tpdomain = tp_domain_get_or_create (domain);
772         g_assert (tpdomain);
773         tpdomain->outstanding_request ++;
774         mono_mutex_unlock (&threadpool->domains_lock);
775
776         if (threadpool->suspended)
777                 return FALSE;
778
779         tp_monitor_ensure_running ();
780
781         if (tp_worker_try_unpark ())
782                 return TRUE;
783
784         TP_COUNTER_ATOMIC (counter, {
785                 if (counter._.active >= counter._.max_working)
786                         return FALSE;
787                 counter._.active ++;
788         });
789
790         if (tp_worker_try_create (tpdomain))
791                 return TRUE;
792
793         TP_COUNTER_ATOMIC (counter, { counter._.active --; });
794         return FALSE;
795 }
796
797 static gboolean
798 tp_monitor_should_keep_running (void)
799 {
800         g_assert (monitor_status == MONITOR_STATUS_WAITING_FOR_REQUEST || monitor_status == MONITOR_STATUS_REQUESTED);
801
802         if (InterlockedExchange (&monitor_status, MONITOR_STATUS_WAITING_FOR_REQUEST) == MONITOR_STATUS_WAITING_FOR_REQUEST) {
803                 if (mono_runtime_is_shutting_down () || !tp_domain_any_has_request ()) {
804                         if (InterlockedExchange (&monitor_status, MONITOR_STATUS_NOT_RUNNING) == MONITOR_STATUS_WAITING_FOR_REQUEST)
805                                 return FALSE;
806                 }
807         }
808
809         g_assert (monitor_status == MONITOR_STATUS_WAITING_FOR_REQUEST || monitor_status == MONITOR_STATUS_REQUESTED);
810
811         return TRUE;
812 }
813
814 static gboolean
815 tp_monitor_sufficient_delay_since_last_dequeue (void)
816 {
817         guint32 threshold;
818
819         g_assert (threadpool);
820
821         if (threadpool->cpu_usage < CPU_USAGE_LOW) {
822                 threshold = MONITOR_INTERVAL;
823         } else {
824                 ThreadPoolCounter counter = TP_COUNTER_READ ();
825                 threshold = counter._.max_working * MONITOR_INTERVAL * 2;
826         }
827
828         return mono_msec_ticks () >= threadpool->heuristic_last_dequeue + threshold;
829 }
830
831 static void tp_hill_climbing_force_change (gint16 new_thread_count, ThreadPoolHeuristicStateTransition transition);
832
833 static void
834 tp_monitor_thread (void)
835 {
836         MonoInternalThread *current_thread = mono_thread_internal_current ();
837         guint i;
838
839         mono_cpu_usage (threadpool->cpu_usage_state);
840
841         do {
842                 MonoInternalThread *thread;
843                 gboolean all_waitsleepjoin = TRUE;
844                 gint32 interval_left = MONITOR_INTERVAL;
845                 gint32 awake = 0; /* number of spurious awakes we tolerate before doing a round of rebalancing */
846
847                 g_assert (monitor_status != MONITOR_STATUS_NOT_RUNNING);
848
849                 do {
850                         guint32 ts;
851
852                         if (mono_runtime_is_shutting_down ())
853                                 break;
854
855                         ts = mono_msec_ticks ();
856                         if (SleepEx (interval_left, TRUE) == 0)
857                                 break;
858                         interval_left -= mono_msec_ticks () - ts;
859
860                         if ((current_thread->state & (ThreadState_StopRequested | ThreadState_SuspendRequested)) != 0)
861                                 mono_thread_interruption_checkpoint ();
862                 } while (interval_left > 0 && ++awake < 10);
863
864                 if (threadpool->suspended)
865                         continue;
866
867                 if (mono_runtime_is_shutting_down () || !tp_domain_any_has_request ())
868                         continue;
869
870                 mono_mutex_lock (&threadpool->working_threads_lock);
871                 for (i = 0; i < threadpool->working_threads->len; ++i) {
872                         thread = g_ptr_array_index (threadpool->working_threads, i);
873                         if ((thread->state & ThreadState_WaitSleepJoin) == 0) {
874                                 all_waitsleepjoin = FALSE;
875                                 break;
876                         }
877                 }
878                 mono_mutex_unlock (&threadpool->working_threads_lock);
879
880                 if (all_waitsleepjoin) {
881                         ThreadPoolCounter counter;
882                         TP_COUNTER_ATOMIC (counter, { counter._.max_working ++; });
883                         tp_hill_climbing_force_change (counter._.max_working, TRANSITION_STARVATION);
884                 }
885
886                 threadpool->cpu_usage = mono_cpu_usage (threadpool->cpu_usage_state);
887
888                 if (tp_monitor_sufficient_delay_since_last_dequeue ()) {
889                         for (i = 0; i < 5; ++i) {
890                                 ThreadPoolDomain *tpdomain;
891                                 ThreadPoolCounter counter;
892                                 gboolean success;
893
894                                 if (mono_runtime_is_shutting_down ())
895                                         break;
896
897                                 if (tp_worker_try_unpark ())
898                                         break;
899
900                                 TP_COUNTER_TRY_ATOMIC (success, counter, {
901                                         if (counter._.active >= counter._.max_working)
902                                                 break;
903                                         counter._.active ++;
904                                 });
905
906                                 if (!success)
907                                         continue;
908
909                                 tpdomain = tp_domain_get_next (NULL);
910                                 if (tpdomain && tp_worker_try_create (tpdomain))
911                                         break;
912
913                                 TP_COUNTER_ATOMIC (counter, { counter._.active --; });
914                         }
915                 }
916         } while (tp_monitor_should_keep_running ());
917 }
918
919 static void
920 tp_monitor_ensure_running (void)
921 {
922         for (;;) {
923                 switch (monitor_status) {
924                 case MONITOR_STATUS_REQUESTED:
925                         return;
926                 case MONITOR_STATUS_WAITING_FOR_REQUEST:
927                         InterlockedCompareExchange (&monitor_status, MONITOR_STATUS_REQUESTED, MONITOR_STATUS_WAITING_FOR_REQUEST);
928                         break;
929                 case MONITOR_STATUS_NOT_RUNNING:
930                         if (InterlockedCompareExchange (&monitor_status, MONITOR_STATUS_REQUESTED, MONITOR_STATUS_NOT_RUNNING) == MONITOR_STATUS_NOT_RUNNING) {
931                                 if (!mono_thread_create_internal (mono_get_root_domain (), tp_monitor_thread, NULL, TRUE, SMALL_STACK))
932                                         monitor_status = MONITOR_STATUS_NOT_RUNNING;
933                                 return;
934                         }
935                         break;
936                 default: g_assert_not_reached ();
937                 }
938         }
939 }
940
941 static void
942 tp_hill_climbing_change_thread_count (gint16 new_thread_count, ThreadPoolHeuristicStateTransition transition)
943 {
944         ThreadPoolHillClimbing *hc;
945
946         g_assert (threadpool);
947
948         hc = &threadpool->heuristic_hill_climbing;
949
950         hc->last_thread_count = new_thread_count;
951         hc->current_sample_interval = tp_rand_next (&hc->random_interval_generator, hc->sample_interval_low, hc->sample_interval_high);
952         hc->elapsed_since_last_change = 0;
953         hc->completions_since_last_change = 0;
954 }
955
956 static void
957 tp_hill_climbing_force_change (gint16 new_thread_count, ThreadPoolHeuristicStateTransition transition)
958 {
959         ThreadPoolHillClimbing *hc;
960
961         g_assert (threadpool);
962
963         hc = &threadpool->heuristic_hill_climbing;
964
965         if (new_thread_count != hc->last_thread_count) {
966                 hc->current_control_setting += new_thread_count - hc->last_thread_count;
967                 tp_hill_climbing_change_thread_count (new_thread_count, transition);
968         }
969 }
970
971 static double complex
972 tp_hill_climbing_get_wave_component (gdouble *samples, guint sample_count, gdouble period)
973 {
974         ThreadPoolHillClimbing *hc;
975         gdouble w, cosine, sine, coeff, q0, q1, q2;
976         guint i;
977
978         g_assert (threadpool);
979         g_assert (sample_count >= period);
980         g_assert (period >= 2);
981
982         hc = &threadpool->heuristic_hill_climbing;
983
984         w = 2.0 * M_PI / period;
985         cosine = cos (w);
986         sine = sin (w);
987         coeff = 2.0 * cosine;
988         q0 = q1 = q2 = 0;
989
990         for (i = 0; i < sample_count; ++i) {
991                 q0 = coeff * q1 - q2 + samples [(hc->total_samples - sample_count + i) % hc->samples_to_measure];
992                 q2 = q1;
993                 q1 = q0;
994         }
995
996         return ((q1 - q2 * cosine) + (q2 * sine) * I) / ((gdouble) sample_count);
997 }
998
999 static gint16
1000 tp_hill_climbing_update (gint16 current_thread_count, guint32 sample_duration, gint32 completions, guint32 *adjustment_interval)
1001 {
1002         ThreadPoolHillClimbing *hc;
1003         ThreadPoolHeuristicStateTransition transition;
1004         gdouble throughput;
1005         gdouble throughput_error_estimate;
1006         gdouble confidence;
1007         gdouble move;
1008         gdouble gain;
1009         gint sample_index;
1010         gint sample_count;
1011         gint new_thread_wave_magnitude;
1012         gint new_thread_count;
1013         double complex thread_wave_component;
1014         double complex throughput_wave_component;
1015         double complex ratio;
1016
1017         g_assert (threadpool);
1018         g_assert (adjustment_interval);
1019
1020         hc = &threadpool->heuristic_hill_climbing;
1021
1022         /* If someone changed the thread count without telling us, update our records accordingly. */
1023         if (current_thread_count != hc->last_thread_count)
1024                 tp_hill_climbing_force_change (current_thread_count, TRANSITION_INITIALIZING);
1025
1026         /* Update the cumulative stats for this thread count */
1027         hc->elapsed_since_last_change += sample_duration;
1028         hc->completions_since_last_change += completions;
1029
1030         /* Add in any data we've already collected about this sample */
1031         sample_duration += hc->accumulated_sample_duration;
1032         completions += hc->accumulated_completion_count;
1033
1034         /* We need to make sure we're collecting reasonably accurate data. Since we're just counting the end
1035          * of each work item, we are goinng to be missing some data about what really happened during the
1036          * sample interval. The count produced by each thread includes an initial work item that may have
1037          * started well before the start of the interval, and each thread may have been running some new
1038          * work item for some time before the end of the interval, which did not yet get counted. So
1039          * our count is going to be off by +/- threadCount workitems.
1040          *
1041          * The exception is that the thread that reported to us last time definitely wasn't running any work
1042          * at that time, and the thread that's reporting now definitely isn't running a work item now. So
1043          * we really only need to consider threadCount-1 threads.
1044          *
1045          * Thus the percent error in our count is +/- (threadCount-1)/numCompletions.
1046          *
1047          * We cannot rely on the frequency-domain analysis we'll be doing later to filter out this error, because
1048          * of the way it accumulates over time. If this sample is off by, say, 33% in the negative direction,
1049          * then the next one likely will be too. The one after that will include the sum of the completions
1050          * we missed in the previous samples, and so will be 33% positive. So every three samples we'll have
1051          * two "low" samples and one "high" sample. This will appear as periodic variation right in the frequency
1052          * range we're targeting, which will not be filtered by the frequency-domain translation. */
1053         if (hc->total_samples > 0 && ((current_thread_count - 1.0) / completions) >= hc->max_sample_error) {
1054                 /* Not accurate enough yet. Let's accumulate the data so
1055                  * far, and tell the ThreadPool to collect a little more. */
1056                 hc->accumulated_sample_duration = sample_duration;
1057                 hc->accumulated_completion_count = completions;
1058                 *adjustment_interval = 10;
1059                 return current_thread_count;
1060         }
1061
1062         /* We've got enouugh data for our sample; reset our accumulators for next time. */
1063         hc->accumulated_sample_duration = 0;
1064         hc->accumulated_completion_count = 0;
1065
1066         /* Add the current thread count and throughput sample to our history. */
1067         throughput = ((gdouble) completions) / sample_duration;
1068
1069         sample_index = hc->total_samples % hc->samples_to_measure;
1070         hc->samples [sample_index] = throughput;
1071         hc->thread_counts [sample_index] = current_thread_count;
1072         hc->total_samples ++;
1073
1074         /* Set up defaults for our metrics. */
1075         thread_wave_component = 0;
1076         throughput_wave_component = 0;
1077         throughput_error_estimate = 0;
1078         ratio = 0;
1079         confidence = 0;
1080
1081         transition = TRANSITION_WARMUP;
1082
1083         /* How many samples will we use? It must be at least the three wave periods we're looking for, and it must also
1084          * be a whole multiple of the primary wave's period; otherwise the frequency we're looking for will fall between
1085          * two frequency bands in the Fourier analysis, and we won't be able to measure it accurately. */
1086         sample_count = ((gint) MIN (hc->total_samples - 1, hc->samples_to_measure) / hc->wave_period) * hc->wave_period;
1087
1088         if (sample_count > hc->wave_period) {
1089                 guint i;
1090                 gdouble average_throughput;
1091                 gdouble average_thread_count;
1092                 gdouble sample_sum = 0;
1093                 gdouble thread_sum = 0;
1094
1095                 /* Average the throughput and thread count samples, so we can scale the wave magnitudes later. */
1096                 for (i = 0; i < sample_count; ++i) {
1097                         guint j = (hc->total_samples - sample_count + i) % hc->samples_to_measure;
1098                         sample_sum += hc->samples [j];
1099                         thread_sum += hc->thread_counts [j];
1100                 }
1101
1102                 average_throughput = sample_sum / sample_count;
1103                 average_thread_count = thread_sum / sample_count;
1104
1105                 if (average_throughput > 0 && average_thread_count > 0) {
1106                         gdouble noise_for_confidence, adjacent_period_1, adjacent_period_2;
1107
1108                         /* Calculate the periods of the adjacent frequency bands we'll be using to
1109                          * measure noise levels. We want the two adjacent Fourier frequency bands. */
1110                         adjacent_period_1 = sample_count / (((gdouble) sample_count) / ((gdouble) hc->wave_period) + 1);
1111                         adjacent_period_2 = sample_count / (((gdouble) sample_count) / ((gdouble) hc->wave_period) - 1);
1112
1113                         /* Get the the three different frequency components of the throughput (scaled by average
1114                          * throughput). Our "error" estimate (the amount of noise that might be present in the
1115                          * frequency band we're really interested in) is the average of the adjacent bands. */
1116                         throughput_wave_component = tp_hill_climbing_get_wave_component (hc->samples, sample_count, hc->wave_period) / average_throughput;
1117                         throughput_error_estimate = cabs (tp_hill_climbing_get_wave_component (hc->samples, sample_count, adjacent_period_1) / average_throughput);
1118
1119                         if (adjacent_period_2 <= sample_count) {
1120                                 throughput_error_estimate = MAX (throughput_error_estimate, cabs (tp_hill_climbing_get_wave_component (
1121                                         hc->samples, sample_count, adjacent_period_2) / average_throughput));
1122                         }
1123
1124                         /* Do the same for the thread counts, so we have something to compare to. We don't
1125                          * measure thread count noise, because there is none; these are exact measurements. */
1126                         thread_wave_component = tp_hill_climbing_get_wave_component (hc->thread_counts, sample_count, hc->wave_period) / average_thread_count;
1127
1128                         /* Update our moving average of the throughput noise. We'll use this
1129                          * later as feedback to determine the new size of the thread wave. */
1130                         if (hc->average_throughput_noise == 0) {
1131                                 hc->average_throughput_noise = throughput_error_estimate;
1132                         } else {
1133                                 hc->average_throughput_noise = (hc->throughput_error_smoothing_factor * throughput_error_estimate)
1134                                         + ((1.0 + hc->throughput_error_smoothing_factor) * hc->average_throughput_noise);
1135                         }
1136
1137                         if (cabs (thread_wave_component) > 0) {
1138                                 /* Adjust the throughput wave so it's centered around the target wave,
1139                                  * and then calculate the adjusted throughput/thread ratio. */
1140                                 ratio = (throughput_wave_component - (hc->target_throughput_ratio * thread_wave_component)) / thread_wave_component;
1141                                 transition = TRANSITION_CLIMBING_MOVE;
1142                         } else {
1143                                 ratio = 0;
1144                                 transition = TRANSITION_STABILIZING;
1145                         }
1146
1147                         noise_for_confidence = MAX (hc->average_throughput_noise, throughput_error_estimate);
1148                         if (noise_for_confidence > 0) {
1149                                 confidence = cabs (thread_wave_component) / noise_for_confidence / hc->target_signal_to_noise_ratio;
1150                         } else {
1151                                 /* there is no noise! */
1152                                 confidence = 1.0;
1153                         }
1154                 }
1155         }
1156
1157         /* We use just the real part of the complex ratio we just calculated. If the throughput signal
1158          * is exactly in phase with the thread signal, this will be the same as taking the magnitude of
1159          * the complex move and moving that far up. If they're 180 degrees out of phase, we'll move
1160          * backward (because this indicates that our changes are having the opposite of the intended effect).
1161          * If they're 90 degrees out of phase, we won't move at all, because we can't tell wether we're
1162          * having a negative or positive effect on throughput. */
1163         move = creal (ratio);
1164         move = CLAMP (move, -1.0, 1.0);
1165
1166         /* Apply our confidence multiplier. */
1167         move *= CLAMP (confidence, -1.0, 1.0);
1168
1169         /* Now apply non-linear gain, such that values around zero are attenuated, while higher values
1170          * are enhanced. This allows us to move quickly if we're far away from the target, but more slowly
1171         * if we're getting close, giving us rapid ramp-up without wild oscillations around the target. */
1172         gain = hc->max_change_per_second * sample_duration;
1173         move = pow (fabs (move), hc->gain_exponent) * (move >= 0.0 ? 1 : -1) * gain;
1174         move = MIN (move, hc->max_change_per_sample);
1175
1176         /* If the result was positive, and CPU is > 95%, refuse the move. */
1177         if (move > 0.0 && threadpool->cpu_usage > CPU_USAGE_HIGH)
1178                 move = 0.0;
1179
1180         /* Apply the move to our control setting. */
1181         hc->current_control_setting += move;
1182
1183         /* Calculate the new thread wave magnitude, which is based on the moving average we've been keeping of the
1184          * throughput error.  This average starts at zero, so we'll start with a nice safe little wave at first. */
1185         new_thread_wave_magnitude = (gint)(0.5 + (hc->current_control_setting * hc->average_throughput_noise
1186                 * hc->target_signal_to_noise_ratio * hc->thread_magnitude_multiplier * 2.0));
1187         new_thread_wave_magnitude = CLAMP (new_thread_wave_magnitude, 1, hc->max_thread_wave_magnitude);
1188
1189         /* Make sure our control setting is within the ThreadPool's limits. */
1190         hc->current_control_setting = CLAMP (hc->current_control_setting, threadpool->limit_worker_min, threadpool->limit_worker_max - new_thread_wave_magnitude);
1191
1192         /* Calculate the new thread count (control setting + square wave). */
1193         new_thread_count = (gint)(hc->current_control_setting + new_thread_wave_magnitude * ((hc->total_samples / (hc->wave_period / 2)) % 2));
1194
1195         /* Make sure the new thread count doesn't exceed the ThreadPool's limits. */
1196         new_thread_count = CLAMP (new_thread_count, threadpool->limit_worker_min, threadpool->limit_worker_max);
1197
1198         if (new_thread_count != current_thread_count)
1199                 tp_hill_climbing_change_thread_count (new_thread_count, transition);
1200
1201         if (creal (ratio) < 0.0 && new_thread_count == threadpool->limit_worker_min)
1202                 *adjustment_interval = (gint)(0.5 + hc->current_sample_interval * (10.0 * MAX (-1.0 * creal (ratio), 1.0)));
1203         else
1204                 *adjustment_interval = hc->current_sample_interval;
1205
1206         return new_thread_count;
1207 }
1208
1209 static void
1210 tp_heuristic_notify_work_completed (void)
1211 {
1212         g_assert (threadpool);
1213
1214         InterlockedIncrement (&threadpool->heuristic_completions);
1215         threadpool->heuristic_last_dequeue = mono_msec_ticks ();
1216 }
1217
1218 static gboolean
1219 tp_heuristic_should_adjust ()
1220 {
1221         g_assert (threadpool);
1222
1223         if (threadpool->heuristic_last_dequeue > threadpool->heuristic_last_adjustment + threadpool->heuristic_adjustment_interval) {
1224                 ThreadPoolCounter counter = TP_COUNTER_READ ();
1225                 if (counter._.active <= counter._.max_working)
1226                         return TRUE;
1227         }
1228
1229         return FALSE;
1230 }
1231
1232 static void
1233 tp_heuristic_adjust ()
1234 {
1235         g_assert (threadpool);
1236
1237         if (mono_mutex_trylock (&threadpool->heuristic_lock) == 0) {
1238                 gint32 completions = InterlockedExchange (&threadpool->heuristic_completions, 0);
1239                 guint32 sample_end = mono_msec_ticks ();
1240                 guint32 sample_duration = sample_end - threadpool->heuristic_sample_start;
1241
1242                 if (sample_duration >= threadpool->heuristic_adjustment_interval / 2) {
1243                         ThreadPoolCounter counter;
1244                         gint16 new_thread_count;
1245
1246                         counter = TP_COUNTER_READ ();
1247                         new_thread_count = tp_hill_climbing_update (counter._.max_working, sample_duration, completions, &threadpool->heuristic_adjustment_interval);
1248
1249                         TP_COUNTER_ATOMIC (counter, { counter._.max_working = new_thread_count; });
1250
1251                         if (new_thread_count > counter._.max_working)
1252                                 tp_worker_request (mono_domain_get ());
1253
1254                         threadpool->heuristic_sample_start = sample_end;
1255                         threadpool->heuristic_last_adjustment = mono_msec_ticks ();
1256                 }
1257
1258                 mono_mutex_unlock (&threadpool->heuristic_lock);
1259         }
1260 }
1261
1262 void
1263 mono_thread_pool_ms_cleanup (void)
1264 {
1265         tp_ensure_cleanedup ();
1266 }
1267
1268 MonoAsyncResult *
1269 mono_thread_pool_ms_add (MonoObject *target, MonoMethodMessage *msg, MonoDelegate *async_callback, MonoObject *state)
1270 {
1271         static MonoClass *async_call_klass = NULL;
1272         static MonoClass *runtime_work_item_class = NULL;
1273         MonoDomain *domain;
1274         MonoAsyncResult *ares;
1275         MonoAsyncCall *ac;
1276         MonoRuntimeWorkItem *rwi;
1277
1278         if (!async_call_klass)
1279                 async_call_klass = mono_class_from_name (mono_defaults.corlib, "System", "MonoAsyncCall");
1280         g_assert (async_call_klass);
1281
1282         if (!runtime_work_item_class)
1283                 runtime_work_item_class = mono_class_from_name (mono_defaults.corlib, "System.Threading", "MonoRuntimeWorkItem");
1284         g_assert (runtime_work_item_class);
1285
1286         tp_ensure_initialized (NULL);
1287
1288         domain = mono_domain_get ();
1289
1290         ac = (MonoAsyncCall*) mono_object_new (domain, async_call_klass);
1291         MONO_OBJECT_SETREF (ac, msg, msg);
1292         MONO_OBJECT_SETREF (ac, state, state);
1293
1294         if (async_callback) {
1295                 MONO_OBJECT_SETREF (ac, cb_method, mono_get_delegate_invoke (((MonoObject*) async_callback)->vtable->klass));
1296                 MONO_OBJECT_SETREF (ac, cb_target, async_callback);
1297         }
1298
1299         ares = mono_async_result_new (domain, NULL, ac->state, NULL, (MonoObject*) ac);
1300         MONO_OBJECT_SETREF (ares, async_delegate, target);
1301
1302         rwi = (MonoRuntimeWorkItem*) mono_object_new (domain, runtime_work_item_class);
1303         MONO_OBJECT_SETREF (rwi, ares, ares);
1304
1305         tp_queue_work_item (rwi);
1306
1307         return ares;
1308 }
1309
1310 MonoObject *
1311 mono_thread_pool_ms_finish (MonoAsyncResult *ares, MonoArray **out_args, MonoObject **exc)
1312 {
1313         MonoAsyncCall *ac;
1314
1315         g_assert (exc);
1316         g_assert (out_args);
1317
1318         *exc = NULL;
1319         *out_args = NULL;
1320
1321         /* check if already finished */
1322         mono_monitor_enter ((MonoObject*) ares);
1323
1324         if (ares->endinvoke_called) {
1325                 *exc = (MonoObject*) mono_get_exception_invalid_operation (NULL);
1326                 mono_monitor_exit ((MonoObject*) ares);
1327                 return NULL;
1328         }
1329
1330         MONO_OBJECT_SETREF (ares, endinvoke_called, 1);
1331
1332         /* wait until we are really finished */
1333         if (ares->completed) {
1334                 mono_monitor_exit ((MonoObject *) ares);
1335         } else {
1336                 gpointer wait_event;
1337                 if (ares->handle) {
1338                         wait_event = mono_wait_handle_get_handle ((MonoWaitHandle*) ares->handle);
1339                 } else {
1340                         wait_event = CreateEvent (NULL, TRUE, FALSE, NULL);
1341                         g_assert(wait_event);
1342                         MONO_OBJECT_SETREF (ares, handle, (MonoObject*) mono_wait_handle_new (mono_object_domain (ares), wait_event));
1343                 }
1344                 mono_monitor_exit ((MonoObject*) ares);
1345                 WaitForSingleObjectEx (wait_event, INFINITE, TRUE);
1346         }
1347
1348         ac = (MonoAsyncCall*) ares->object_data;
1349         g_assert (ac);
1350
1351         *exc = ac->msg->exc; /* FIXME: GC add write barrier */
1352         *out_args = ac->out_args;
1353         return ac->res;
1354 }
1355
1356 gboolean
1357 mono_thread_pool_ms_remove_domain_jobs (MonoDomain *domain, int timeout)
1358 {
1359         gboolean res = TRUE;
1360         guint32 start, now;
1361         gpointer sem;
1362
1363         sem = domain->cleanup_semaphore = CreateSemaphore (NULL, 0, 1, NULL);
1364         /*
1365          * The memory barrier here is required to have global ordering between assigning to cleanup_semaphone
1366          * and reading threadpool_jobs. Otherwise this thread could read a stale version of threadpool_jobs
1367          * and wait forever.
1368          */
1369         mono_memory_write_barrier ();
1370
1371         if (timeout != -1 && domain->threadpool_jobs)
1372                 start = mono_msec_ticks ();
1373         while (domain->threadpool_jobs) {
1374                 WaitForSingleObject (sem, timeout);
1375                 if (timeout != -1) {
1376                         now = mono_msec_ticks ();
1377                         if (now - start > timeout) {
1378                                 res = FALSE;
1379                                 break;
1380                         }
1381                         timeout -= now - start;
1382                 }
1383         }
1384
1385         domain->cleanup_semaphore = NULL;
1386         CloseHandle (sem);
1387         return res;
1388 }
1389
1390 void
1391 mono_thread_pool_ms_remove_socket (int sock)
1392 {
1393         // FIXME
1394         mono_raise_exception (mono_get_exception_not_implemented (NULL));
1395 }
1396
1397 void
1398 mono_thread_pool_ms_suspend (void)
1399 {
1400         threadpool->suspended = TRUE;
1401 }
1402
1403 void
1404 mono_thread_pool_ms_resume (void)
1405 {
1406         threadpool->suspended = FALSE;
1407 }
1408
1409 void
1410 ves_icall_System_Threading_MonoRuntimeWorkItem_ExecuteWorkItem (MonoRuntimeWorkItem *rwi)
1411 {
1412         MonoAsyncResult *ares;
1413         MonoAsyncCall *ac;
1414         MonoObject *res;
1415         MonoObject *exc = NULL;
1416         MonoArray *out_args = NULL;
1417         gpointer wait_event = NULL;
1418         MonoInternalThread *thread = mono_thread_internal_current ();
1419
1420         g_assert (rwi);
1421         ares = rwi->ares;
1422         g_assert (ares);
1423
1424         if (!ares->execution_context) {
1425                 MONO_OBJECT_SETREF (ares, original_context, NULL);
1426         } else {
1427                 /* use captured ExecutionContext (if available) */
1428                 MONO_OBJECT_SETREF (ares, original_context, mono_thread_get_execution_context ());
1429                 mono_thread_set_execution_context (ares->execution_context);
1430         }
1431
1432         ac = (MonoAsyncCall*) ares->object_data;
1433
1434         if (!ac) {
1435                 g_assert (ares->async_delegate);
1436                 /* The debugger needs this */
1437                 thread->async_invoke_method = ((MonoDelegate*) ares->async_delegate)->method;
1438                 res = mono_runtime_delegate_invoke (ares->async_delegate, (gpointer*) &ares->async_state, &exc);
1439                 thread->async_invoke_method = NULL;
1440         } else {
1441                 MONO_OBJECT_SETREF (ac, msg->exc, NULL);
1442                 res = mono_message_invoke (ares->async_delegate, ac->msg, &exc, &out_args);
1443                 MONO_OBJECT_SETREF (ac, res, res);
1444                 MONO_OBJECT_SETREF (ac, msg->exc, exc);
1445                 MONO_OBJECT_SETREF (ac, out_args, out_args);
1446
1447                 mono_monitor_enter ((MonoObject*) ares);
1448                 MONO_OBJECT_SETREF (ares, completed, 1);
1449                 if (ares->handle)
1450                         wait_event = mono_wait_handle_get_handle ((MonoWaitHandle*) ares->handle);
1451                 mono_monitor_exit ((MonoObject*) ares);
1452
1453                 if (wait_event != NULL)
1454                         SetEvent (wait_event);
1455
1456                 if (!ac->cb_method) {
1457                         exc = NULL;
1458                 } else {
1459                         thread->async_invoke_method = ac->cb_method;
1460                         mono_runtime_invoke (ac->cb_method, ac->cb_target, (gpointer*) &ares, &exc);
1461                         thread->async_invoke_method = NULL;
1462                 }
1463         }
1464
1465         /* restore original thread execution context if flow isn't suppressed, i.e. non null */
1466         if (ares->original_context) {
1467                 mono_thread_set_execution_context (ares->original_context);
1468                 MONO_OBJECT_SETREF (ares, original_context, NULL);
1469         }
1470
1471         if (exc)
1472                 mono_raise_exception ((MonoException*) exc);
1473 }
1474
1475 void
1476 ves_icall_System_Threading_Microsoft_ThreadPool_GetAvailableThreadsNative (gint *worker_threads, gint *completion_port_threads)
1477 {
1478         if (!worker_threads || !completion_port_threads)
1479                 return;
1480
1481         tp_ensure_initialized (NULL);
1482
1483         *worker_threads = threadpool->limit_worker_max;
1484         *completion_port_threads = threadpool->limit_io_max;
1485 }
1486
1487 void
1488 ves_icall_System_Threading_Microsoft_ThreadPool_GetMinThreadsNative (gint *worker_threads, gint *completion_port_threads)
1489 {
1490         if (!worker_threads || !completion_port_threads)
1491                 return;
1492
1493         tp_ensure_initialized (NULL);
1494
1495         *worker_threads = threadpool->limit_worker_min;
1496         *completion_port_threads = threadpool->limit_io_min;
1497 }
1498
1499 void
1500 ves_icall_System_Threading_Microsoft_ThreadPool_GetMaxThreadsNative (gint *worker_threads, gint *completion_port_threads)
1501 {
1502         if (!worker_threads || !completion_port_threads)
1503                 return;
1504
1505         tp_ensure_initialized (NULL);
1506
1507         *worker_threads = threadpool->limit_worker_max;
1508         *completion_port_threads = threadpool->limit_io_max;
1509 }
1510
1511 gboolean
1512 ves_icall_System_Threading_Microsoft_ThreadPool_SetMinThreadsNative (gint worker_threads, gint completion_port_threads)
1513 {
1514         tp_ensure_initialized (NULL);
1515
1516         if (worker_threads <= 0 || worker_threads > threadpool->limit_worker_max)
1517                 return FALSE;
1518         if (completion_port_threads <= 0 || completion_port_threads > threadpool->limit_io_max)
1519                 return FALSE;
1520
1521         threadpool->limit_worker_max = worker_threads;
1522         threadpool->limit_io_max = completion_port_threads;
1523
1524         return TRUE;
1525 }
1526
1527 gboolean
1528 ves_icall_System_Threading_Microsoft_ThreadPool_SetMaxThreadsNative (gint worker_threads, gint completion_port_threads)
1529 {
1530         gint cpu_count = mono_cpu_count ();
1531
1532         tp_ensure_initialized (NULL);
1533
1534         if (worker_threads < threadpool->limit_worker_min || worker_threads < cpu_count)
1535                 return FALSE;
1536         if (completion_port_threads < threadpool->limit_io_min || completion_port_threads < cpu_count)
1537                 return FALSE;
1538
1539         threadpool->limit_worker_max = worker_threads;
1540         threadpool->limit_io_max = completion_port_threads;
1541
1542         return TRUE;
1543 }
1544
1545 void
1546 ves_icall_System_Threading_Microsoft_ThreadPool_InitializeVMTp (gboolean *enable_worker_tracking)
1547 {
1548         tp_ensure_initialized (enable_worker_tracking);
1549 }
1550
1551 gboolean
1552 ves_icall_System_Threading_Microsoft_ThreadPool_NotifyWorkItemComplete (void)
1553 {
1554         ThreadPoolCounter counter;
1555
1556         if (mono_domain_is_unloading (mono_domain_get ()) || mono_runtime_is_shutting_down ())
1557                 return FALSE;
1558
1559         tp_heuristic_notify_work_completed ();
1560
1561         if (tp_heuristic_should_adjust ())
1562                 tp_heuristic_adjust ();
1563
1564         counter = TP_COUNTER_READ ();
1565         return counter._.active <= counter._.max_working;
1566 }
1567
1568 void
1569 ves_icall_System_Threading_Microsoft_ThreadPool_NotifyWorkItemProgressNative (void)
1570 {
1571         tp_heuristic_notify_work_completed ();
1572
1573         if (tp_heuristic_should_adjust ())
1574                 tp_heuristic_adjust ();
1575 }
1576
1577 void
1578 ves_icall_System_Threading_Microsoft_ThreadPool_ReportThreadStatus (gboolean is_working)
1579 {
1580         // TODO
1581         mono_raise_exception (mono_get_exception_not_implemented (NULL));
1582 }
1583
1584 gboolean
1585 ves_icall_System_Threading_Microsoft_ThreadPool_RequestWorkerThread (void)
1586 {
1587         return tp_worker_request (mono_domain_get ());
1588 }
1589
1590 gboolean
1591 ves_icall_System_Threading_Microsoft_ThreadPool_PostQueuedCompletionStatus (MonoNativeOverlapped *native_overlapped)
1592 {
1593         /* This copy the behavior of the current Mono implementation */
1594         mono_raise_exception (mono_get_exception_not_implemented (NULL));
1595         return FALSE;
1596 }
1597
1598 gpointer
1599 ves_icall_System_Threading_Microsoft_ThreadPool_RegisterWaitForSingleObjectNative (MonoWaitHandle *wait_handle, MonoObject *state, guint timeout_internal, gboolean execute_only_once,
1600         MonoRegisteredWaitHandle *registered_wait_handle, gint stack_mark, gboolean compress_stack)
1601 {
1602         // FIXME
1603         mono_raise_exception (mono_get_exception_not_implemented (NULL));
1604         return NULL;
1605 }
1606
1607 gboolean
1608 ves_icall_System_Threading_Microsoft_ThreadPool_BindIOCompletionCallbackNative (gpointer file_handle)
1609 {
1610         /* This copy the behavior of the current Mono implementation */
1611         return TRUE;
1612 }