Merge pull request #1685 from esdrubal/touint64
[mono.git] / mono / metadata / threadpool-ms.c
1 /*
2  * threadpool-ms.c: Microsoft threadpool runtime support
3  *
4  * Author:
5  *      Ludovic Henry (ludovic.henry@xamarin.com)
6  *
7  * Copyright 2015 Xamarin, Inc (http://www.xamarin.com)
8  */
9
10 //
11 // Copyright (c) Microsoft. All rights reserved.
12 // Licensed under the MIT license. See LICENSE file in the project root for full license information.
13 //
14 // Files:
15 //  - src/vm/comthreadpool.cpp
16 //  - src/vm/win32threadpoolcpp
17 //  - src/vm/threadpoolrequest.cpp
18 //  - src/vm/hillclimbing.cpp
19 //
20 // Ported from C++ to C and adjusted to Mono runtime
21
22 #include <stdlib.h>
23 #include <math.h>
24 #include <config.h>
25 #include <glib.h>
26
27 #if !defined (HAVE_COMPLEX_H)
28 #include <../../support/libm/complex.h>
29 #else
30 #include <complex.h>
31 #endif
32
33 #include <mono/metadata/class-internals.h>
34 #include <mono/metadata/exception.h>
35 #include <mono/metadata/object.h>
36 #include <mono/metadata/object-internals.h>
37 #include <mono/metadata/threadpool-ms.h>
38 #include <mono/metadata/threadpool-ms-io.h>
39 #include <mono/metadata/threadpool-internals.h>
40 #include <mono/utils/atomic.h>
41 #include <mono/utils/mono-compiler.h>
42 #include <mono/utils/mono-proclib.h>
43 #include <mono/utils/mono-threads.h>
44 #include <mono/utils/mono-time.h>
45 #include <mono/utils/mono-rand.h>
46
47 #define CPU_USAGE_LOW 80
48 #define CPU_USAGE_HIGH 95
49
50 #define MONITOR_INTERVAL 100 // ms
51
52 /* The exponent to apply to the gain. 1.0 means to use linear gain,
53  * higher values will enhance large moves and damp small ones.
54  * default: 2.0 */
55 #define HILL_CLIMBING_GAIN_EXPONENT 2.0
56
57 /* The 'cost' of a thread. 0 means drive for increased throughput regardless
58  * of thread count, higher values bias more against higher thread counts.
59  * default: 0.15 */
60 #define HILL_CLIMBING_BIAS 0.15
61
62 #define HILL_CLIMBING_WAVE_PERIOD 4
63 #define HILL_CLIMBING_MAX_WAVE_MAGNITUDE 20
64 #define HILL_CLIMBING_WAVE_MAGNITUDE_MULTIPLIER 1.0
65 #define HILL_CLIMBING_WAVE_HISTORY_SIZE 8
66 #define HILL_CLIMBING_TARGET_SIGNAL_TO_NOISE_RATIO 3.0
67 #define HILL_CLIMBING_MAX_CHANGE_PER_SECOND 4
68 #define HILL_CLIMBING_MAX_CHANGE_PER_SAMPLE 20
69 #define HILL_CLIMBING_SAMPLE_INTERVAL_LOW 10
70 #define HILL_CLIMBING_SAMPLE_INTERVAL_HIGH 200
71 #define HILL_CLIMBING_ERROR_SMOOTHING_FACTOR 0.01
72 #define HILL_CLIMBING_MAX_SAMPLE_ERROR_PERCENT 0.15
73
74 /* Keep in sync with System.Threading.RuntimeWorkItem */
75 struct _MonoRuntimeWorkItem {
76         MonoObject object;
77         MonoAsyncResult *async_result;
78 };
79
80 typedef union {
81         struct {
82                 gint16 max_working; /* determined by heuristic */
83                 gint16 active; /* working or waiting on thread_work_sem; warm threads */
84                 gint16 working;
85                 gint16 parked;
86         } _;
87         gint64 as_gint64;
88 } ThreadPoolCounter;
89
90 typedef struct {
91         MonoDomain *domain;
92         gint32 outstanding_request;
93 } ThreadPoolDomain;
94
95 typedef struct {
96         gint32 wave_period;
97         gint32 samples_to_measure;
98         gdouble target_throughput_ratio;
99         gdouble target_signal_to_noise_ratio;
100         gdouble max_change_per_second;
101         gdouble max_change_per_sample;
102         gint32 max_thread_wave_magnitude;
103         gint32 sample_interval_low;
104         gdouble thread_magnitude_multiplier;
105         gint32 sample_interval_high;
106         gdouble throughput_error_smoothing_factor;
107         gdouble gain_exponent;
108         gdouble max_sample_error;
109
110         gdouble current_control_setting;
111         gint64 total_samples;
112         gint16 last_thread_count;
113         gdouble elapsed_since_last_change;
114         gdouble completions_since_last_change;
115
116         gdouble average_throughput_noise;
117
118         gdouble *samples;
119         gdouble *thread_counts;
120
121         guint32 current_sample_interval;
122         gpointer random_interval_generator;
123
124         gint32 accumulated_completion_count;
125         gdouble accumulated_sample_duration;
126 } ThreadPoolHillClimbing;
127
128 typedef struct {
129         ThreadPoolCounter counters;
130
131         GPtrArray *domains; // ThreadPoolDomain* []
132         mono_mutex_t domains_lock;
133
134         GPtrArray *working_threads; // MonoInternalThread* []
135         mono_mutex_t working_threads_lock;
136
137         GPtrArray *parked_threads; // mono_cond_t* []
138         mono_mutex_t parked_threads_lock;
139
140         gint32 heuristic_completions;
141         guint32 heuristic_sample_start;
142         guint32 heuristic_last_dequeue; // ms
143         guint32 heuristic_last_adjustment; // ms
144         guint32 heuristic_adjustment_interval; // ms
145         ThreadPoolHillClimbing heuristic_hill_climbing;
146         mono_mutex_t heuristic_lock;
147
148         gint32 limit_worker_min;
149         gint32 limit_worker_max;
150         gint32 limit_io_min;
151         gint32 limit_io_max;
152
153         MonoCpuUsageState *cpu_usage_state;
154         gint32 cpu_usage;
155
156         /* suspended by the debugger */
157         gboolean suspended;
158 } ThreadPool;
159
160 typedef enum {
161         TRANSITION_WARMUP,
162         TRANSITION_INITIALIZING,
163         TRANSITION_RANDOM_MOVE,
164         TRANSITION_CLIMBING_MOVE,
165         TRANSITION_CHANGE_POINT,
166         TRANSITION_STABILIZING,
167         TRANSITION_STARVATION,
168         TRANSITION_THREAD_TIMED_OUT,
169         TRANSITION_UNDEFINED,
170 } ThreadPoolHeuristicStateTransition;
171
172 enum {
173         MONITOR_STATUS_REQUESTED,
174         MONITOR_STATUS_WAITING_FOR_REQUEST,
175         MONITOR_STATUS_NOT_RUNNING,
176 };
177
178 static gint32 status = STATUS_NOT_INITIALIZED;
179 static gint32 monitor_status = MONITOR_STATUS_NOT_RUNNING;
180
181 static ThreadPool* threadpool;
182
183 #define COUNTER_CHECK(counter) \
184         do { \
185                 g_assert (counter._.max_working > 0); \
186                 g_assert (counter._.active >= 0); \
187         } while (0)
188
189 #define COUNTER_READ() ((ThreadPoolCounter) InterlockedRead64 (&threadpool->counters.as_gint64))
190
191 #define COUNTER_ATOMIC(var,block) \
192         do { \
193                 ThreadPoolCounter __old; \
194                 do { \
195                         g_assert (threadpool); \
196                         (var) = __old = COUNTER_READ (); \
197                         { block; } \
198                         COUNTER_CHECK (var); \
199                 } while (InterlockedCompareExchange64 (&threadpool->counters.as_gint64, (var).as_gint64, __old.as_gint64) != __old.as_gint64); \
200         } while (0)
201
202 #define COUNTER_TRY_ATOMIC(res,var,block) \
203         do { \
204                 ThreadPoolCounter __old; \
205                 do { \
206                         g_assert (threadpool); \
207                         (var) = __old = COUNTER_READ (); \
208                         (res) = FALSE; \
209                         { block; } \
210                         COUNTER_CHECK (var); \
211                         (res) = InterlockedCompareExchange64 (&threadpool->counters.as_gint64, (var).as_gint64, __old.as_gint64) == __old.as_gint64; \
212                 } while (0); \
213         } while (0)
214
215 static gpointer
216 rand_create (void)
217 {
218         mono_rand_open ();
219         return mono_rand_init (NULL, 0);
220 }
221
222 static guint32
223 rand_next (gpointer *handle, guint32 min, guint32 max)
224 {
225         guint32 val;
226         if (!mono_rand_try_get_uint32 (handle, &val, min, max)) {
227                 // FIXME handle error
228                 g_assert_not_reached ();
229         }
230         return val;
231 }
232
233 static void
234 rand_free (gpointer handle)
235 {
236         mono_rand_close (handle);
237 }
238
239 static void
240 ensure_initialized (gboolean *enable_worker_tracking)
241 {
242         ThreadPoolHillClimbing *hc;
243         const char *threads_per_cpu_env;
244         gint threads_per_cpu;
245         gint threads_count;
246
247         if (enable_worker_tracking) {
248                 // TODO implement some kind of switch to have the possibily to use it
249                 *enable_worker_tracking = FALSE;
250         }
251
252         if (status >= STATUS_INITIALIZED)
253                 return;
254         if (status == STATUS_INITIALIZING || InterlockedCompareExchange (&status, STATUS_INITIALIZING, STATUS_NOT_INITIALIZED) != STATUS_NOT_INITIALIZED) {
255                 while (status == STATUS_INITIALIZING)
256                         mono_thread_info_yield ();
257                 g_assert (status >= STATUS_INITIALIZED);
258                 return;
259         }
260
261         g_assert (!threadpool);
262         threadpool = g_new0 (ThreadPool, 1);
263         g_assert (threadpool);
264
265         threadpool->domains = g_ptr_array_new ();
266         mono_mutex_init_recursive (&threadpool->domains_lock);
267
268         threadpool->parked_threads = g_ptr_array_new ();
269         mono_mutex_init (&threadpool->parked_threads_lock);
270
271         threadpool->working_threads = g_ptr_array_new ();
272         mono_mutex_init (&threadpool->working_threads_lock);
273
274         threadpool->heuristic_adjustment_interval = 10;
275         mono_mutex_init (&threadpool->heuristic_lock);
276
277         mono_rand_open ();
278
279         hc = &threadpool->heuristic_hill_climbing;
280
281         hc->wave_period = HILL_CLIMBING_WAVE_PERIOD;
282         hc->max_thread_wave_magnitude = HILL_CLIMBING_MAX_WAVE_MAGNITUDE;
283         hc->thread_magnitude_multiplier = (gdouble) HILL_CLIMBING_WAVE_MAGNITUDE_MULTIPLIER;
284         hc->samples_to_measure = hc->wave_period * HILL_CLIMBING_WAVE_HISTORY_SIZE;
285         hc->target_throughput_ratio = (gdouble) HILL_CLIMBING_BIAS;
286         hc->target_signal_to_noise_ratio = (gdouble) HILL_CLIMBING_TARGET_SIGNAL_TO_NOISE_RATIO;
287         hc->max_change_per_second = (gdouble) HILL_CLIMBING_MAX_CHANGE_PER_SECOND;
288         hc->max_change_per_sample = (gdouble) HILL_CLIMBING_MAX_CHANGE_PER_SAMPLE;
289         hc->sample_interval_low = HILL_CLIMBING_SAMPLE_INTERVAL_LOW;
290         hc->sample_interval_high = HILL_CLIMBING_SAMPLE_INTERVAL_HIGH;
291         hc->throughput_error_smoothing_factor = (gdouble) HILL_CLIMBING_ERROR_SMOOTHING_FACTOR;
292         hc->gain_exponent = (gdouble) HILL_CLIMBING_GAIN_EXPONENT;
293         hc->max_sample_error = (gdouble) HILL_CLIMBING_MAX_SAMPLE_ERROR_PERCENT;
294         hc->current_control_setting = 0;
295         hc->total_samples = 0;
296         hc->last_thread_count = 0;
297         hc->average_throughput_noise = 0;
298         hc->elapsed_since_last_change = 0;
299         hc->accumulated_completion_count = 0;
300         hc->accumulated_sample_duration = 0;
301         hc->samples = g_new0 (gdouble, hc->samples_to_measure);
302         hc->thread_counts = g_new0 (gdouble, hc->samples_to_measure);
303         hc->random_interval_generator = rand_create ();
304         hc->current_sample_interval = rand_next (&hc->random_interval_generator, hc->sample_interval_low, hc->sample_interval_high);
305
306         if (!(threads_per_cpu_env = g_getenv ("MONO_THREADS_PER_CPU")))
307                 threads_per_cpu = 1;
308         else
309                 threads_per_cpu = CLAMP (atoi (threads_per_cpu_env), 1, 50);
310
311         threads_count = mono_cpu_count () * threads_per_cpu;
312
313         threadpool->limit_worker_min = threadpool->limit_io_min = threads_count;
314         threadpool->limit_worker_max = threadpool->limit_io_max = threads_count * 100;
315
316         threadpool->counters._.max_working = threadpool->limit_worker_min;
317
318         threadpool->cpu_usage_state = g_new0 (MonoCpuUsageState, 1);
319
320         threadpool->suspended = FALSE;
321
322         status = STATUS_INITIALIZED;
323 }
324
325 static void
326 ensure_cleanedup (void)
327 {
328         if (status == STATUS_NOT_INITIALIZED && InterlockedCompareExchange (&status, STATUS_CLEANED_UP, STATUS_NOT_INITIALIZED) == STATUS_NOT_INITIALIZED)
329                 return;
330         if (status == STATUS_INITIALIZING) {
331                 while (status == STATUS_INITIALIZING)
332                         mono_thread_info_yield ();
333         }
334         if (status == STATUS_CLEANED_UP)
335                 return;
336         if (status == STATUS_CLEANING_UP || InterlockedCompareExchange (&status, STATUS_CLEANING_UP, STATUS_INITIALIZED) != STATUS_INITIALIZED) {
337                 while (status == STATUS_CLEANING_UP)
338                         mono_thread_info_yield ();
339                 g_assert (status == STATUS_CLEANED_UP);
340                 return;
341         }
342
343         /* we make the assumption along the code that we are
344          * cleaning up only if the runtime is shutting down */
345         g_assert (mono_runtime_is_shutting_down ());
346
347         /* Unpark all worker threads */
348         mono_mutex_lock (&threadpool->parked_threads_lock);
349         for (;;) {
350                 guint i;
351                 ThreadPoolCounter counter = COUNTER_READ ();
352                 if (counter._.active == 0 && counter._.parked == 0)
353                         break;
354                 if (counter._.active == 1) {
355                         MonoInternalThread *thread = mono_thread_internal_current ();
356                         if (thread->threadpool_thread) {
357                                 /* if there is only one active thread
358                                  * left and it's the current one */
359                                 break;
360                         }
361                 }
362                 for (i = 0; i < threadpool->parked_threads->len; ++i) {
363                         mono_cond_t *cond = (mono_cond_t*) g_ptr_array_index (threadpool->parked_threads, i);
364                         mono_cond_signal (cond);
365                 }
366                 mono_mutex_unlock (&threadpool->parked_threads_lock);
367                 usleep (1000);
368                 mono_mutex_lock (&threadpool->parked_threads_lock);
369         }
370         mono_mutex_unlock (&threadpool->parked_threads_lock);
371
372         while (monitor_status != MONITOR_STATUS_NOT_RUNNING)
373                 usleep (1000);
374
375         g_ptr_array_free (threadpool->domains, TRUE);
376         mono_mutex_destroy (&threadpool->domains_lock);
377
378         g_ptr_array_free (threadpool->parked_threads, TRUE);
379         mono_mutex_destroy (&threadpool->parked_threads_lock);
380
381         g_ptr_array_free (threadpool->working_threads, TRUE);
382         mono_mutex_destroy (&threadpool->working_threads_lock);
383
384         mono_mutex_destroy (&threadpool->heuristic_lock);
385         g_free (threadpool->heuristic_hill_climbing.samples);
386         g_free (threadpool->heuristic_hill_climbing.thread_counts);
387         rand_free (threadpool->heuristic_hill_climbing.random_interval_generator);
388
389         g_free (threadpool->cpu_usage_state);
390
391         g_assert (threadpool);
392         g_free (threadpool);
393         threadpool = NULL;
394         g_assert (!threadpool);
395
396         status = STATUS_CLEANED_UP;
397 }
398
399 void
400 mono_threadpool_ms_enqueue_work_item (MonoDomain *domain, MonoObject *work_item)
401 {
402         static MonoClass *threadpool_class = NULL;
403         static MonoMethod *unsafe_queue_custom_work_item_method = NULL;
404         MonoDomain *current_domain;
405         MonoBoolean f;
406         gpointer args [2];
407
408         g_assert (work_item);
409
410         if (!threadpool_class)
411                 threadpool_class = mono_class_from_name (mono_defaults.corlib, "System.Threading", "ThreadPool");
412         g_assert (threadpool_class);
413
414         if (!unsafe_queue_custom_work_item_method)
415                 unsafe_queue_custom_work_item_method = mono_class_get_method_from_name (threadpool_class, "UnsafeQueueCustomWorkItem", 2);
416         g_assert (unsafe_queue_custom_work_item_method);
417
418         f = FALSE;
419
420         args [0] = (gpointer) work_item;
421         args [1] = (gpointer) mono_value_box (domain, mono_defaults.boolean_class, &f);
422
423         current_domain = mono_domain_get ();
424         if (current_domain == domain) {
425                 mono_runtime_invoke (unsafe_queue_custom_work_item_method, NULL, args, NULL);
426         } else {
427                 mono_thread_push_appdomain_ref (domain);
428                 if (mono_domain_set (domain, FALSE)) {
429                         mono_runtime_invoke (unsafe_queue_custom_work_item_method, NULL, args, NULL);
430                         mono_domain_set (current_domain, TRUE);
431                 }
432                 mono_thread_pop_appdomain_ref ();
433         }
434 }
435
436 void
437 mono_threadpool_ms_enqueue_async_result (MonoDomain *domain, MonoAsyncResult *ares)
438 {
439         static MonoClass *runtime_work_item_class = NULL;
440         MonoRuntimeWorkItem *rwi;
441
442         g_assert (ares);
443
444         if (!runtime_work_item_class)
445                 runtime_work_item_class = mono_class_from_name (mono_defaults.corlib, "System.Threading", "MonoRuntimeWorkItem");
446         g_assert (runtime_work_item_class);
447
448         rwi = (MonoRuntimeWorkItem*) mono_object_new (domain, runtime_work_item_class);
449         MONO_OBJECT_SETREF (rwi, async_result, ares);
450
451         mono_threadpool_ms_enqueue_work_item (domain, (MonoObject*) rwi);
452 }
453
454 static void
455 domain_add (ThreadPoolDomain *tpdomain)
456 {
457         guint i, len;
458
459         g_assert (tpdomain);
460
461         mono_mutex_lock (&threadpool->domains_lock);
462         len = threadpool->domains->len;
463         for (i = 0; i < len; ++i) {
464                 if (g_ptr_array_index (threadpool->domains, i) == tpdomain)
465                         break;
466         }
467         if (i == len)
468                 g_ptr_array_add (threadpool->domains, tpdomain);
469         mono_mutex_unlock (&threadpool->domains_lock);
470 }
471
472 static gboolean
473 domain_remove (ThreadPoolDomain *tpdomain)
474 {
475         gboolean res;
476
477         g_assert (tpdomain);
478
479         mono_mutex_lock (&threadpool->domains_lock);
480         res = g_ptr_array_remove (threadpool->domains, tpdomain);
481         mono_mutex_unlock (&threadpool->domains_lock);
482
483         return res;
484 }
485
486 static ThreadPoolDomain *
487 domain_get_or_create (MonoDomain *domain)
488 {
489         ThreadPoolDomain *tpdomain = NULL;
490         guint i;
491
492         g_assert (domain);
493
494         mono_mutex_lock (&threadpool->domains_lock);
495         for (i = 0; i < threadpool->domains->len; ++i) {
496                 ThreadPoolDomain *tmp = g_ptr_array_index (threadpool->domains, i);
497                 if (tmp->domain == domain) {
498                         tpdomain = tmp;
499                         break;
500                 }
501         }
502         if (!tpdomain) {
503                 tpdomain = g_new0 (ThreadPoolDomain, 1);
504                 tpdomain->domain = domain;
505                 domain_add (tpdomain);
506         }
507         mono_mutex_unlock (&threadpool->domains_lock);
508         return tpdomain;
509 }
510
511 static gboolean
512 domain_any_has_request ()
513 {
514         gboolean res = FALSE;
515         guint i;
516
517         mono_mutex_lock (&threadpool->domains_lock);
518         for (i = 0; i < threadpool->domains->len; ++i) {
519                 ThreadPoolDomain *tmp = g_ptr_array_index (threadpool->domains, i);
520                 if (tmp->outstanding_request > 0) {
521                         res = TRUE;
522                         break;
523                 }
524         }
525         mono_mutex_unlock (&threadpool->domains_lock);
526         return res;
527 }
528
529 static ThreadPoolDomain *
530 domain_get_next (ThreadPoolDomain *current)
531 {
532         ThreadPoolDomain *tpdomain = NULL;
533         guint len;
534
535         mono_mutex_lock (&threadpool->domains_lock);
536         len = threadpool->domains->len;
537         if (len > 0) {
538                 guint i, current_idx = -1;
539                 if (current) {
540                         for (i = 0; i < len; ++i) {
541                                 if (current == g_ptr_array_index (threadpool->domains, i)) {
542                                         current_idx = i;
543                                         break;
544                                 }
545                         }
546                         g_assert (current_idx >= 0);
547                 }
548                 for (i = current_idx + 1; i < len + current_idx + 1; ++i) {
549                         ThreadPoolDomain *tmp = g_ptr_array_index (threadpool->domains, i % len);
550                         if (tmp->outstanding_request > 0) {
551                                 tpdomain = tmp;
552                                 tpdomain->outstanding_request --;
553                                 g_assert (tpdomain->outstanding_request >= 0);
554                                 break;
555                         }
556                 }
557         }
558         mono_mutex_unlock (&threadpool->domains_lock);
559         return tpdomain;
560 }
561
562 static void
563 worker_park (void)
564 {
565         mono_cond_t cond;
566         mono_cond_init (&cond, NULL);
567
568         mono_mutex_lock (&threadpool->parked_threads_lock);
569         g_ptr_array_add (threadpool->parked_threads, &cond);
570         mono_cond_wait (&cond, &threadpool->parked_threads_lock);
571         g_ptr_array_remove (threadpool->parked_threads, &cond);
572         mono_mutex_unlock (&threadpool->parked_threads_lock);
573
574         mono_cond_destroy (&cond);
575 }
576
577 static gboolean
578 worker_try_unpark (void)
579 {
580         gboolean res = FALSE;
581         guint len;
582
583         mono_mutex_lock (&threadpool->parked_threads_lock);
584         len = threadpool->parked_threads->len;
585         if (len > 0) {
586                 mono_cond_t *cond = (mono_cond_t*) g_ptr_array_index (threadpool->parked_threads, len - 1);
587                 mono_cond_signal (cond);
588                 res = TRUE;
589         }
590         mono_mutex_unlock (&threadpool->parked_threads_lock);
591         return res;
592 }
593
594 static void
595 worker_thread (gpointer data)
596 {
597         static MonoClass *threadpool_wait_callback_class = NULL;
598         static MonoMethod *perform_wait_callback_method = NULL;
599         MonoInternalThread *thread;
600         ThreadPoolDomain *tpdomain;
601         ThreadPoolCounter counter;
602         gboolean retire = FALSE;
603
604         g_assert (status >= STATUS_INITIALIZED);
605
606         tpdomain = data;
607         g_assert (tpdomain);
608         g_assert (tpdomain->domain);
609
610         if (mono_runtime_is_shutting_down () || mono_domain_is_unloading (tpdomain->domain)) {
611                 COUNTER_ATOMIC (counter, { counter._.active --; });
612                 return;
613         }
614
615         if (!threadpool_wait_callback_class)
616                 threadpool_wait_callback_class = mono_class_from_name (mono_defaults.corlib, "System.Threading.Microsoft", "_ThreadPoolWaitCallback");
617         g_assert (threadpool_wait_callback_class);
618
619         if (!perform_wait_callback_method)
620                 perform_wait_callback_method = mono_class_get_method_from_name (threadpool_wait_callback_class, "PerformWaitCallback", 0);
621         g_assert (perform_wait_callback_method);
622
623         g_assert (threadpool);
624
625         thread = mono_thread_internal_current ();
626         g_assert (thread);
627
628         mono_mutex_lock (&threadpool->domains_lock);
629
630         do {
631                 guint i, c;
632
633                 g_assert (tpdomain);
634                 g_assert (tpdomain->domain);
635
636                 tpdomain->domain->threadpool_jobs ++;
637
638                 mono_mutex_unlock (&threadpool->domains_lock);
639
640                 mono_mutex_lock (&threadpool->working_threads_lock);
641                 g_ptr_array_add (threadpool->working_threads, thread);
642                 mono_mutex_unlock (&threadpool->working_threads_lock);
643
644                 COUNTER_ATOMIC (counter, { counter._.working ++; });
645
646                 mono_thread_push_appdomain_ref (tpdomain->domain);
647                 if (mono_domain_set (tpdomain->domain, FALSE)) {
648                         MonoObject *exc = NULL;
649                         MonoObject *res = mono_runtime_invoke (perform_wait_callback_method, NULL, NULL, &exc);
650                         if (exc)
651                                 mono_internal_thread_unhandled_exception (exc);
652                         else if (res && *(MonoBoolean*) mono_object_unbox (res) == FALSE)
653                                 retire = TRUE;
654
655                         mono_thread_clr_state (thread , ~ThreadState_Background);
656                         if (!mono_thread_test_state (thread , ThreadState_Background))
657                                 ves_icall_System_Threading_Thread_SetState (thread, ThreadState_Background);
658                 }
659                 mono_thread_pop_appdomain_ref ();
660
661                 COUNTER_ATOMIC (counter, { counter._.working --; });
662
663                 mono_mutex_lock (&threadpool->working_threads_lock);
664                 g_ptr_array_remove_fast (threadpool->working_threads, thread);
665                 mono_mutex_unlock (&threadpool->working_threads_lock);
666
667                 mono_mutex_lock (&threadpool->domains_lock);
668
669                 tpdomain->domain->threadpool_jobs --;
670                 g_assert (tpdomain->domain->threadpool_jobs >= 0);
671
672                 if (tpdomain->domain->threadpool_jobs == 0 && mono_domain_is_unloading (tpdomain->domain)) {
673                         gboolean removed = domain_remove (tpdomain);
674                         g_assert (removed);
675                         if (tpdomain->domain->cleanup_semaphore)
676                                 ReleaseSemaphore (tpdomain->domain->cleanup_semaphore, 1, NULL);
677                         g_free (tpdomain);
678                         tpdomain = NULL;
679                 }
680
681                 for (i = 0, c = 5; i < c; ++i) {
682                         if (mono_runtime_is_shutting_down ())
683                                 break;
684
685                         if (!retire) {
686                                 tpdomain = domain_get_next (tpdomain);
687                                 if (tpdomain)
688                                         break;
689                         }
690
691                         if (i < c - 1) {
692                                 gboolean park = TRUE;
693
694                                 COUNTER_ATOMIC (counter, {
695                                         if (counter._.active <= counter._.max_working) {
696                                                 park = FALSE;
697                                                 break;
698                                         }
699                                         counter._.active --;
700                                         counter._.parked ++;
701                                 });
702
703                                 if (park) {
704                                         mono_mutex_unlock (&threadpool->domains_lock);
705                                         worker_park ();
706                                         mono_mutex_lock (&threadpool->domains_lock);
707
708                                         COUNTER_ATOMIC (counter, {
709                                                 counter._.active ++;
710                                                 counter._.parked --;
711                                         });
712                                 }
713                         }
714
715                         retire = FALSE;
716                 }
717         } while (tpdomain && !mono_runtime_is_shutting_down ());
718
719         mono_mutex_unlock (&threadpool->domains_lock);
720
721         COUNTER_ATOMIC (counter, { counter._.active --; });
722 }
723
724 static gboolean
725 worker_try_create (ThreadPoolDomain *tpdomain)
726 {
727         g_assert (tpdomain);
728         g_assert (tpdomain->domain);
729
730         return mono_thread_create_internal (tpdomain->domain, worker_thread, tpdomain, TRUE, 0) != NULL;
731 }
732
733 static void monitor_ensure_running (void);
734
735 static gboolean
736 worker_request (MonoDomain *domain)
737 {
738         ThreadPoolDomain *tpdomain;
739         ThreadPoolCounter counter;
740
741         g_assert (domain);
742         g_assert (threadpool);
743
744         if (mono_runtime_is_shutting_down () || mono_domain_is_unloading (domain))
745                 return FALSE;
746
747         mono_mutex_lock (&threadpool->domains_lock);
748         tpdomain = domain_get_or_create (domain);
749         g_assert (tpdomain);
750         tpdomain->outstanding_request ++;
751         mono_mutex_unlock (&threadpool->domains_lock);
752
753         if (threadpool->suspended)
754                 return FALSE;
755
756         monitor_ensure_running ();
757
758         if (worker_try_unpark ())
759                 return TRUE;
760
761         COUNTER_ATOMIC (counter, {
762                 if (counter._.active >= counter._.max_working)
763                         return FALSE;
764                 counter._.active ++;
765         });
766
767         if (worker_try_create (tpdomain))
768                 return TRUE;
769
770         COUNTER_ATOMIC (counter, { counter._.active --; });
771         return FALSE;
772 }
773
774 static gboolean
775 monitor_should_keep_running (void)
776 {
777         g_assert (monitor_status == MONITOR_STATUS_WAITING_FOR_REQUEST || monitor_status == MONITOR_STATUS_REQUESTED);
778
779         if (InterlockedExchange (&monitor_status, MONITOR_STATUS_WAITING_FOR_REQUEST) == MONITOR_STATUS_WAITING_FOR_REQUEST) {
780                 if (mono_runtime_is_shutting_down () || !domain_any_has_request ()) {
781                         if (InterlockedExchange (&monitor_status, MONITOR_STATUS_NOT_RUNNING) == MONITOR_STATUS_WAITING_FOR_REQUEST)
782                                 return FALSE;
783                 }
784         }
785
786         g_assert (monitor_status == MONITOR_STATUS_WAITING_FOR_REQUEST || monitor_status == MONITOR_STATUS_REQUESTED);
787
788         return TRUE;
789 }
790
791 static gboolean
792 monitor_sufficient_delay_since_last_dequeue (void)
793 {
794         guint32 threshold;
795
796         g_assert (threadpool);
797
798         if (threadpool->cpu_usage < CPU_USAGE_LOW) {
799                 threshold = MONITOR_INTERVAL;
800         } else {
801                 ThreadPoolCounter counter = COUNTER_READ ();
802                 threshold = counter._.max_working * MONITOR_INTERVAL * 2;
803         }
804
805         return mono_msec_ticks () >= threadpool->heuristic_last_dequeue + threshold;
806 }
807
808 static void hill_climbing_force_change (gint16 new_thread_count, ThreadPoolHeuristicStateTransition transition);
809
810 static void
811 monitor_thread (void)
812 {
813         MonoInternalThread *current_thread = mono_thread_internal_current ();
814         guint i;
815
816         mono_cpu_usage (threadpool->cpu_usage_state);
817
818         do {
819                 MonoInternalThread *thread;
820                 gboolean all_waitsleepjoin = TRUE;
821                 gint32 interval_left = MONITOR_INTERVAL;
822                 gint32 awake = 0; /* number of spurious awakes we tolerate before doing a round of rebalancing */
823
824                 g_assert (monitor_status != MONITOR_STATUS_NOT_RUNNING);
825
826                 do {
827                         guint32 ts;
828
829                         if (mono_runtime_is_shutting_down ())
830                                 break;
831
832                         ts = mono_msec_ticks ();
833                         if (SleepEx (interval_left, TRUE) == 0)
834                                 break;
835                         interval_left -= mono_msec_ticks () - ts;
836
837                         if ((current_thread->state & (ThreadState_StopRequested | ThreadState_SuspendRequested)) != 0)
838                                 mono_thread_interruption_checkpoint ();
839                 } while (interval_left > 0 && ++awake < 10);
840
841                 if (threadpool->suspended)
842                         continue;
843
844                 if (mono_runtime_is_shutting_down () || !domain_any_has_request ())
845                         continue;
846
847                 mono_mutex_lock (&threadpool->working_threads_lock);
848                 for (i = 0; i < threadpool->working_threads->len; ++i) {
849                         thread = g_ptr_array_index (threadpool->working_threads, i);
850                         if ((thread->state & ThreadState_WaitSleepJoin) == 0) {
851                                 all_waitsleepjoin = FALSE;
852                                 break;
853                         }
854                 }
855                 mono_mutex_unlock (&threadpool->working_threads_lock);
856
857                 if (all_waitsleepjoin) {
858                         ThreadPoolCounter counter;
859                         COUNTER_ATOMIC (counter, { counter._.max_working ++; });
860                         hill_climbing_force_change (counter._.max_working, TRANSITION_STARVATION);
861                 }
862
863                 threadpool->cpu_usage = mono_cpu_usage (threadpool->cpu_usage_state);
864
865                 if (monitor_sufficient_delay_since_last_dequeue ()) {
866                         for (i = 0; i < 5; ++i) {
867                                 ThreadPoolDomain *tpdomain;
868                                 ThreadPoolCounter counter;
869                                 gboolean success;
870
871                                 if (mono_runtime_is_shutting_down ())
872                                         break;
873
874                                 if (worker_try_unpark ())
875                                         break;
876
877                                 COUNTER_TRY_ATOMIC (success, counter, {
878                                         if (counter._.active >= counter._.max_working)
879                                                 break;
880                                         counter._.active ++;
881                                 });
882
883                                 if (!success)
884                                         continue;
885
886                                 tpdomain = domain_get_next (NULL);
887                                 if (tpdomain && worker_try_create (tpdomain))
888                                         break;
889
890                                 COUNTER_ATOMIC (counter, { counter._.active --; });
891                         }
892                 }
893         } while (monitor_should_keep_running ());
894 }
895
896 static void
897 monitor_ensure_running (void)
898 {
899         for (;;) {
900                 switch (monitor_status) {
901                 case MONITOR_STATUS_REQUESTED:
902                         return;
903                 case MONITOR_STATUS_WAITING_FOR_REQUEST:
904                         InterlockedCompareExchange (&monitor_status, MONITOR_STATUS_REQUESTED, MONITOR_STATUS_WAITING_FOR_REQUEST);
905                         break;
906                 case MONITOR_STATUS_NOT_RUNNING:
907                         if (InterlockedCompareExchange (&monitor_status, MONITOR_STATUS_REQUESTED, MONITOR_STATUS_NOT_RUNNING) == MONITOR_STATUS_NOT_RUNNING) {
908                                 if (!mono_thread_create_internal (mono_get_root_domain (), monitor_thread, NULL, TRUE, SMALL_STACK))
909                                         monitor_status = MONITOR_STATUS_NOT_RUNNING;
910                                 return;
911                         }
912                         break;
913                 default: g_assert_not_reached ();
914                 }
915         }
916 }
917
918 static void
919 hill_climbing_change_thread_count (gint16 new_thread_count, ThreadPoolHeuristicStateTransition transition)
920 {
921         ThreadPoolHillClimbing *hc;
922
923         g_assert (threadpool);
924
925         hc = &threadpool->heuristic_hill_climbing;
926
927         hc->last_thread_count = new_thread_count;
928         hc->current_sample_interval = rand_next (&hc->random_interval_generator, hc->sample_interval_low, hc->sample_interval_high);
929         hc->elapsed_since_last_change = 0;
930         hc->completions_since_last_change = 0;
931 }
932
933 static void
934 hill_climbing_force_change (gint16 new_thread_count, ThreadPoolHeuristicStateTransition transition)
935 {
936         ThreadPoolHillClimbing *hc;
937
938         g_assert (threadpool);
939
940         hc = &threadpool->heuristic_hill_climbing;
941
942         if (new_thread_count != hc->last_thread_count) {
943                 hc->current_control_setting += new_thread_count - hc->last_thread_count;
944                 hill_climbing_change_thread_count (new_thread_count, transition);
945         }
946 }
947
948 static double complex
949 hill_climbing_get_wave_component (gdouble *samples, guint sample_count, gdouble period)
950 {
951         ThreadPoolHillClimbing *hc;
952         gdouble w, cosine, sine, coeff, q0, q1, q2;
953         guint i;
954
955         g_assert (threadpool);
956         g_assert (sample_count >= period);
957         g_assert (period >= 2);
958
959         hc = &threadpool->heuristic_hill_climbing;
960
961         w = 2.0 * M_PI / period;
962         cosine = cos (w);
963         sine = sin (w);
964         coeff = 2.0 * cosine;
965         q0 = q1 = q2 = 0;
966
967         for (i = 0; i < sample_count; ++i) {
968                 q0 = coeff * q1 - q2 + samples [(hc->total_samples - sample_count + i) % hc->samples_to_measure];
969                 q2 = q1;
970                 q1 = q0;
971         }
972
973         return ((q1 - q2 * cosine) + (q2 * sine) * I) / ((gdouble) sample_count);
974 }
975
976 static gint16
977 hill_climbing_update (gint16 current_thread_count, guint32 sample_duration, gint32 completions, guint32 *adjustment_interval)
978 {
979         ThreadPoolHillClimbing *hc;
980         ThreadPoolHeuristicStateTransition transition;
981         gdouble throughput;
982         gdouble throughput_error_estimate;
983         gdouble confidence;
984         gdouble move;
985         gdouble gain;
986         gint sample_index;
987         gint sample_count;
988         gint new_thread_wave_magnitude;
989         gint new_thread_count;
990         double complex thread_wave_component;
991         double complex throughput_wave_component;
992         double complex ratio;
993
994         g_assert (threadpool);
995         g_assert (adjustment_interval);
996
997         hc = &threadpool->heuristic_hill_climbing;
998
999         /* If someone changed the thread count without telling us, update our records accordingly. */
1000         if (current_thread_count != hc->last_thread_count)
1001                 hill_climbing_force_change (current_thread_count, TRANSITION_INITIALIZING);
1002
1003         /* Update the cumulative stats for this thread count */
1004         hc->elapsed_since_last_change += sample_duration;
1005         hc->completions_since_last_change += completions;
1006
1007         /* Add in any data we've already collected about this sample */
1008         sample_duration += hc->accumulated_sample_duration;
1009         completions += hc->accumulated_completion_count;
1010
1011         /* We need to make sure we're collecting reasonably accurate data. Since we're just counting the end
1012          * of each work item, we are goinng to be missing some data about what really happened during the
1013          * sample interval. The count produced by each thread includes an initial work item that may have
1014          * started well before the start of the interval, and each thread may have been running some new
1015          * work item for some time before the end of the interval, which did not yet get counted. So
1016          * our count is going to be off by +/- threadCount workitems.
1017          *
1018          * The exception is that the thread that reported to us last time definitely wasn't running any work
1019          * at that time, and the thread that's reporting now definitely isn't running a work item now. So
1020          * we really only need to consider threadCount-1 threads.
1021          *
1022          * Thus the percent error in our count is +/- (threadCount-1)/numCompletions.
1023          *
1024          * We cannot rely on the frequency-domain analysis we'll be doing later to filter out this error, because
1025          * of the way it accumulates over time. If this sample is off by, say, 33% in the negative direction,
1026          * then the next one likely will be too. The one after that will include the sum of the completions
1027          * we missed in the previous samples, and so will be 33% positive. So every three samples we'll have
1028          * two "low" samples and one "high" sample. This will appear as periodic variation right in the frequency
1029          * range we're targeting, which will not be filtered by the frequency-domain translation. */
1030         if (hc->total_samples > 0 && ((current_thread_count - 1.0) / completions) >= hc->max_sample_error) {
1031                 /* Not accurate enough yet. Let's accumulate the data so
1032                  * far, and tell the ThreadPool to collect a little more. */
1033                 hc->accumulated_sample_duration = sample_duration;
1034                 hc->accumulated_completion_count = completions;
1035                 *adjustment_interval = 10;
1036                 return current_thread_count;
1037         }
1038
1039         /* We've got enouugh data for our sample; reset our accumulators for next time. */
1040         hc->accumulated_sample_duration = 0;
1041         hc->accumulated_completion_count = 0;
1042
1043         /* Add the current thread count and throughput sample to our history. */
1044         throughput = ((gdouble) completions) / sample_duration;
1045
1046         sample_index = hc->total_samples % hc->samples_to_measure;
1047         hc->samples [sample_index] = throughput;
1048         hc->thread_counts [sample_index] = current_thread_count;
1049         hc->total_samples ++;
1050
1051         /* Set up defaults for our metrics. */
1052         thread_wave_component = 0;
1053         throughput_wave_component = 0;
1054         throughput_error_estimate = 0;
1055         ratio = 0;
1056         confidence = 0;
1057
1058         transition = TRANSITION_WARMUP;
1059
1060         /* How many samples will we use? It must be at least the three wave periods we're looking for, and it must also
1061          * be a whole multiple of the primary wave's period; otherwise the frequency we're looking for will fall between
1062          * two frequency bands in the Fourier analysis, and we won't be able to measure it accurately. */
1063         sample_count = ((gint) MIN (hc->total_samples - 1, hc->samples_to_measure) / hc->wave_period) * hc->wave_period;
1064
1065         if (sample_count > hc->wave_period) {
1066                 guint i;
1067                 gdouble average_throughput;
1068                 gdouble average_thread_count;
1069                 gdouble sample_sum = 0;
1070                 gdouble thread_sum = 0;
1071
1072                 /* Average the throughput and thread count samples, so we can scale the wave magnitudes later. */
1073                 for (i = 0; i < sample_count; ++i) {
1074                         guint j = (hc->total_samples - sample_count + i) % hc->samples_to_measure;
1075                         sample_sum += hc->samples [j];
1076                         thread_sum += hc->thread_counts [j];
1077                 }
1078
1079                 average_throughput = sample_sum / sample_count;
1080                 average_thread_count = thread_sum / sample_count;
1081
1082                 if (average_throughput > 0 && average_thread_count > 0) {
1083                         gdouble noise_for_confidence, adjacent_period_1, adjacent_period_2;
1084
1085                         /* Calculate the periods of the adjacent frequency bands we'll be using to
1086                          * measure noise levels. We want the two adjacent Fourier frequency bands. */
1087                         adjacent_period_1 = sample_count / (((gdouble) sample_count) / ((gdouble) hc->wave_period) + 1);
1088                         adjacent_period_2 = sample_count / (((gdouble) sample_count) / ((gdouble) hc->wave_period) - 1);
1089
1090                         /* Get the the three different frequency components of the throughput (scaled by average
1091                          * throughput). Our "error" estimate (the amount of noise that might be present in the
1092                          * frequency band we're really interested in) is the average of the adjacent bands. */
1093                         throughput_wave_component = hill_climbing_get_wave_component (hc->samples, sample_count, hc->wave_period) / average_throughput;
1094                         throughput_error_estimate = cabs (hill_climbing_get_wave_component (hc->samples, sample_count, adjacent_period_1) / average_throughput);
1095
1096                         if (adjacent_period_2 <= sample_count) {
1097                                 throughput_error_estimate = MAX (throughput_error_estimate, cabs (hill_climbing_get_wave_component (
1098                                         hc->samples, sample_count, adjacent_period_2) / average_throughput));
1099                         }
1100
1101                         /* Do the same for the thread counts, so we have something to compare to. We don't
1102                          * measure thread count noise, because there is none; these are exact measurements. */
1103                         thread_wave_component = hill_climbing_get_wave_component (hc->thread_counts, sample_count, hc->wave_period) / average_thread_count;
1104
1105                         /* Update our moving average of the throughput noise. We'll use this
1106                          * later as feedback to determine the new size of the thread wave. */
1107                         if (hc->average_throughput_noise == 0) {
1108                                 hc->average_throughput_noise = throughput_error_estimate;
1109                         } else {
1110                                 hc->average_throughput_noise = (hc->throughput_error_smoothing_factor * throughput_error_estimate)
1111                                         + ((1.0 + hc->throughput_error_smoothing_factor) * hc->average_throughput_noise);
1112                         }
1113
1114                         if (cabs (thread_wave_component) > 0) {
1115                                 /* Adjust the throughput wave so it's centered around the target wave,
1116                                  * and then calculate the adjusted throughput/thread ratio. */
1117                                 ratio = (throughput_wave_component - (hc->target_throughput_ratio * thread_wave_component)) / thread_wave_component;
1118                                 transition = TRANSITION_CLIMBING_MOVE;
1119                         } else {
1120                                 ratio = 0;
1121                                 transition = TRANSITION_STABILIZING;
1122                         }
1123
1124                         noise_for_confidence = MAX (hc->average_throughput_noise, throughput_error_estimate);
1125                         if (noise_for_confidence > 0) {
1126                                 confidence = cabs (thread_wave_component) / noise_for_confidence / hc->target_signal_to_noise_ratio;
1127                         } else {
1128                                 /* there is no noise! */
1129                                 confidence = 1.0;
1130                         }
1131                 }
1132         }
1133
1134         /* We use just the real part of the complex ratio we just calculated. If the throughput signal
1135          * is exactly in phase with the thread signal, this will be the same as taking the magnitude of
1136          * the complex move and moving that far up. If they're 180 degrees out of phase, we'll move
1137          * backward (because this indicates that our changes are having the opposite of the intended effect).
1138          * If they're 90 degrees out of phase, we won't move at all, because we can't tell wether we're
1139          * having a negative or positive effect on throughput. */
1140         move = creal (ratio);
1141         move = CLAMP (move, -1.0, 1.0);
1142
1143         /* Apply our confidence multiplier. */
1144         move *= CLAMP (confidence, -1.0, 1.0);
1145
1146         /* Now apply non-linear gain, such that values around zero are attenuated, while higher values
1147          * are enhanced. This allows us to move quickly if we're far away from the target, but more slowly
1148         * if we're getting close, giving us rapid ramp-up without wild oscillations around the target. */
1149         gain = hc->max_change_per_second * sample_duration;
1150         move = pow (fabs (move), hc->gain_exponent) * (move >= 0.0 ? 1 : -1) * gain;
1151         move = MIN (move, hc->max_change_per_sample);
1152
1153         /* If the result was positive, and CPU is > 95%, refuse the move. */
1154         if (move > 0.0 && threadpool->cpu_usage > CPU_USAGE_HIGH)
1155                 move = 0.0;
1156
1157         /* Apply the move to our control setting. */
1158         hc->current_control_setting += move;
1159
1160         /* Calculate the new thread wave magnitude, which is based on the moving average we've been keeping of the
1161          * throughput error.  This average starts at zero, so we'll start with a nice safe little wave at first. */
1162         new_thread_wave_magnitude = (gint)(0.5 + (hc->current_control_setting * hc->average_throughput_noise
1163                 * hc->target_signal_to_noise_ratio * hc->thread_magnitude_multiplier * 2.0));
1164         new_thread_wave_magnitude = CLAMP (new_thread_wave_magnitude, 1, hc->max_thread_wave_magnitude);
1165
1166         /* Make sure our control setting is within the ThreadPool's limits. */
1167         hc->current_control_setting = CLAMP (hc->current_control_setting, threadpool->limit_worker_min, threadpool->limit_worker_max - new_thread_wave_magnitude);
1168
1169         /* Calculate the new thread count (control setting + square wave). */
1170         new_thread_count = (gint)(hc->current_control_setting + new_thread_wave_magnitude * ((hc->total_samples / (hc->wave_period / 2)) % 2));
1171
1172         /* Make sure the new thread count doesn't exceed the ThreadPool's limits. */
1173         new_thread_count = CLAMP (new_thread_count, threadpool->limit_worker_min, threadpool->limit_worker_max);
1174
1175         if (new_thread_count != current_thread_count)
1176                 hill_climbing_change_thread_count (new_thread_count, transition);
1177
1178         if (creal (ratio) < 0.0 && new_thread_count == threadpool->limit_worker_min)
1179                 *adjustment_interval = (gint)(0.5 + hc->current_sample_interval * (10.0 * MAX (-1.0 * creal (ratio), 1.0)));
1180         else
1181                 *adjustment_interval = hc->current_sample_interval;
1182
1183         return new_thread_count;
1184 }
1185
1186 static void
1187 heuristic_notify_work_completed (void)
1188 {
1189         g_assert (threadpool);
1190
1191         InterlockedIncrement (&threadpool->heuristic_completions);
1192         threadpool->heuristic_last_dequeue = mono_msec_ticks ();
1193 }
1194
1195 static gboolean
1196 heuristic_should_adjust ()
1197 {
1198         g_assert (threadpool);
1199
1200         if (threadpool->heuristic_last_dequeue > threadpool->heuristic_last_adjustment + threadpool->heuristic_adjustment_interval) {
1201                 ThreadPoolCounter counter = COUNTER_READ ();
1202                 if (counter._.active <= counter._.max_working)
1203                         return TRUE;
1204         }
1205
1206         return FALSE;
1207 }
1208
1209 static void
1210 heuristic_adjust ()
1211 {
1212         g_assert (threadpool);
1213
1214         if (mono_mutex_trylock (&threadpool->heuristic_lock) == 0) {
1215                 gint32 completions = InterlockedExchange (&threadpool->heuristic_completions, 0);
1216                 guint32 sample_end = mono_msec_ticks ();
1217                 guint32 sample_duration = sample_end - threadpool->heuristic_sample_start;
1218
1219                 if (sample_duration >= threadpool->heuristic_adjustment_interval / 2) {
1220                         ThreadPoolCounter counter;
1221                         gint16 new_thread_count;
1222
1223                         counter = COUNTER_READ ();
1224                         new_thread_count = hill_climbing_update (counter._.max_working, sample_duration, completions, &threadpool->heuristic_adjustment_interval);
1225
1226                         COUNTER_ATOMIC (counter, { counter._.max_working = new_thread_count; });
1227
1228                         if (new_thread_count > counter._.max_working)
1229                                 worker_request (mono_domain_get ());
1230
1231                         threadpool->heuristic_sample_start = sample_end;
1232                         threadpool->heuristic_last_adjustment = mono_msec_ticks ();
1233                 }
1234
1235                 mono_mutex_unlock (&threadpool->heuristic_lock);
1236         }
1237 }
1238
1239 void
1240 mono_threadpool_ms_cleanup (void)
1241 {
1242 #ifndef DISABLE_SOCKETS
1243         mono_threadpool_ms_io_cleanup ();
1244 #endif
1245         ensure_cleanedup ();
1246 }
1247
1248 MonoAsyncResult *
1249 mono_threadpool_ms_add (MonoObject *target, MonoMethodMessage *msg, MonoDelegate *async_callback, MonoObject *state)
1250 {
1251         static MonoClass *async_call_klass = NULL;
1252         MonoDomain *domain;
1253         MonoAsyncResult *ares;
1254         MonoAsyncCall *ac;
1255
1256         if (!async_call_klass)
1257                 async_call_klass = mono_class_from_name (mono_defaults.corlib, "System", "MonoAsyncCall");
1258         g_assert (async_call_klass);
1259
1260         ensure_initialized (NULL);
1261
1262         domain = mono_domain_get ();
1263
1264         ac = (MonoAsyncCall*) mono_object_new (domain, async_call_klass);
1265         MONO_OBJECT_SETREF (ac, msg, msg);
1266         MONO_OBJECT_SETREF (ac, state, state);
1267
1268         if (async_callback) {
1269                 MONO_OBJECT_SETREF (ac, cb_method, mono_get_delegate_invoke (((MonoObject*) async_callback)->vtable->klass));
1270                 MONO_OBJECT_SETREF (ac, cb_target, async_callback);
1271         }
1272
1273         ares = mono_async_result_new (domain, NULL, ac->state, NULL, (MonoObject*) ac);
1274         MONO_OBJECT_SETREF (ares, async_delegate, target);
1275
1276 #ifndef DISABLE_SOCKETS
1277         if (mono_threadpool_ms_is_io (target, state))
1278                 return mono_threadpool_ms_io_add (ares, (MonoSocketAsyncResult*) state);
1279 #endif
1280
1281         mono_threadpool_ms_enqueue_async_result (domain, ares);
1282         return ares;
1283 }
1284
1285 MonoObject *
1286 mono_threadpool_ms_finish (MonoAsyncResult *ares, MonoArray **out_args, MonoObject **exc)
1287 {
1288         MonoAsyncCall *ac;
1289
1290         g_assert (exc);
1291         g_assert (out_args);
1292
1293         *exc = NULL;
1294         *out_args = NULL;
1295
1296         /* check if already finished */
1297         mono_monitor_enter ((MonoObject*) ares);
1298
1299         if (ares->endinvoke_called) {
1300                 *exc = (MonoObject*) mono_get_exception_invalid_operation (NULL);
1301                 mono_monitor_exit ((MonoObject*) ares);
1302                 return NULL;
1303         }
1304
1305         MONO_OBJECT_SETREF (ares, endinvoke_called, 1);
1306
1307         /* wait until we are really finished */
1308         if (ares->completed) {
1309                 mono_monitor_exit ((MonoObject *) ares);
1310         } else {
1311                 gpointer wait_event;
1312                 if (ares->handle) {
1313                         wait_event = mono_wait_handle_get_handle ((MonoWaitHandle*) ares->handle);
1314                 } else {
1315                         wait_event = CreateEvent (NULL, TRUE, FALSE, NULL);
1316                         g_assert(wait_event);
1317                         MONO_OBJECT_SETREF (ares, handle, (MonoObject*) mono_wait_handle_new (mono_object_domain (ares), wait_event));
1318                 }
1319                 mono_monitor_exit ((MonoObject*) ares);
1320                 WaitForSingleObjectEx (wait_event, INFINITE, TRUE);
1321         }
1322
1323         ac = (MonoAsyncCall*) ares->object_data;
1324         g_assert (ac);
1325
1326         *exc = ac->msg->exc; /* FIXME: GC add write barrier */
1327         *out_args = ac->out_args;
1328         return ac->res;
1329 }
1330
1331 gboolean
1332 mono_threadpool_ms_remove_domain_jobs (MonoDomain *domain, int timeout)
1333 {
1334         gboolean res = TRUE;
1335         guint32 start;
1336         gpointer sem;
1337
1338         g_assert (domain);
1339         g_assert (timeout >= -1);
1340
1341         if (timeout != -1)
1342                 start = mono_msec_ticks ();
1343
1344 #ifndef DISABLE_SOCKETS
1345         mono_threadpool_ms_io_remove_domain_jobs (domain);
1346         if (timeout != -1) {
1347                 timeout -= mono_msec_ticks () - start;
1348                 if (timeout < 0)
1349                         return FALSE;
1350         }
1351 #endif
1352         /*
1353          * There might be some threads out that could be about to execute stuff from the given domain.
1354          * We avoid that by setting up a semaphore to be pulsed by the thread that reaches zero.
1355          */
1356         sem = domain->cleanup_semaphore = CreateSemaphore (NULL, 0, 1, NULL);
1357
1358         /*
1359          * The memory barrier here is required to have global ordering between assigning to cleanup_semaphone
1360          * and reading threadpool_jobs. Otherwise this thread could read a stale version of threadpool_jobs
1361          * and wait forever.
1362          */
1363         mono_memory_write_barrier ();
1364
1365         while (domain->threadpool_jobs) {
1366                 WaitForSingleObject (sem, timeout);
1367                 if (timeout != -1) {
1368                         timeout -= mono_msec_ticks () - start;
1369                         if (timeout <= 0) {
1370                                 res = FALSE;
1371                                 break;
1372                         }
1373                 }
1374         }
1375
1376         domain->cleanup_semaphore = NULL;
1377         CloseHandle (sem);
1378
1379         return res;
1380 }
1381
1382 void
1383 mono_threadpool_ms_suspend (void)
1384 {
1385         threadpool->suspended = TRUE;
1386 }
1387
1388 void
1389 mono_threadpool_ms_resume (void)
1390 {
1391         threadpool->suspended = FALSE;
1392 }
1393
1394 void
1395 ves_icall_System_Threading_MonoRuntimeWorkItem_ExecuteWorkItem (MonoRuntimeWorkItem *rwi)
1396 {
1397         MonoAsyncResult *ares;
1398         MonoObject *exc = NULL;
1399
1400         g_assert (rwi);
1401         ares = rwi->async_result;
1402         g_assert (ares);
1403
1404         mono_async_result_invoke (ares, &exc);
1405         if (exc)
1406                 mono_raise_exception ((MonoException*) exc);
1407 }
1408
1409 void
1410 ves_icall_System_Threading_Microsoft_ThreadPool_GetAvailableThreadsNative (gint *worker_threads, gint *completion_port_threads)
1411 {
1412         if (!worker_threads || !completion_port_threads)
1413                 return;
1414
1415         ensure_initialized (NULL);
1416
1417         *worker_threads = threadpool->limit_worker_max;
1418         *completion_port_threads = threadpool->limit_io_max;
1419 }
1420
1421 void
1422 ves_icall_System_Threading_Microsoft_ThreadPool_GetMinThreadsNative (gint *worker_threads, gint *completion_port_threads)
1423 {
1424         if (!worker_threads || !completion_port_threads)
1425                 return;
1426
1427         ensure_initialized (NULL);
1428
1429         *worker_threads = threadpool->limit_worker_min;
1430         *completion_port_threads = threadpool->limit_io_min;
1431 }
1432
1433 void
1434 ves_icall_System_Threading_Microsoft_ThreadPool_GetMaxThreadsNative (gint *worker_threads, gint *completion_port_threads)
1435 {
1436         if (!worker_threads || !completion_port_threads)
1437                 return;
1438
1439         ensure_initialized (NULL);
1440
1441         *worker_threads = threadpool->limit_worker_max;
1442         *completion_port_threads = threadpool->limit_io_max;
1443 }
1444
1445 gboolean
1446 ves_icall_System_Threading_Microsoft_ThreadPool_SetMinThreadsNative (gint worker_threads, gint completion_port_threads)
1447 {
1448         ensure_initialized (NULL);
1449
1450         if (worker_threads <= 0 || worker_threads > threadpool->limit_worker_max)
1451                 return FALSE;
1452         if (completion_port_threads <= 0 || completion_port_threads > threadpool->limit_io_max)
1453                 return FALSE;
1454
1455         threadpool->limit_worker_max = worker_threads;
1456         threadpool->limit_io_max = completion_port_threads;
1457
1458         return TRUE;
1459 }
1460
1461 gboolean
1462 ves_icall_System_Threading_Microsoft_ThreadPool_SetMaxThreadsNative (gint worker_threads, gint completion_port_threads)
1463 {
1464         gint cpu_count = mono_cpu_count ();
1465
1466         ensure_initialized (NULL);
1467
1468         if (worker_threads < threadpool->limit_worker_min || worker_threads < cpu_count)
1469                 return FALSE;
1470         if (completion_port_threads < threadpool->limit_io_min || completion_port_threads < cpu_count)
1471                 return FALSE;
1472
1473         threadpool->limit_worker_max = worker_threads;
1474         threadpool->limit_io_max = completion_port_threads;
1475
1476         return TRUE;
1477 }
1478
1479 void
1480 ves_icall_System_Threading_Microsoft_ThreadPool_InitializeVMTp (gboolean *enable_worker_tracking)
1481 {
1482         ensure_initialized (enable_worker_tracking);
1483 }
1484
1485 gboolean
1486 ves_icall_System_Threading_Microsoft_ThreadPool_NotifyWorkItemComplete (void)
1487 {
1488         ThreadPoolCounter counter;
1489
1490         if (mono_domain_is_unloading (mono_domain_get ()) || mono_runtime_is_shutting_down ())
1491                 return FALSE;
1492
1493         heuristic_notify_work_completed ();
1494
1495         if (heuristic_should_adjust ())
1496                 heuristic_adjust ();
1497
1498         counter = COUNTER_READ ();
1499         return counter._.active <= counter._.max_working;
1500 }
1501
1502 void
1503 ves_icall_System_Threading_Microsoft_ThreadPool_NotifyWorkItemProgressNative (void)
1504 {
1505         heuristic_notify_work_completed ();
1506
1507         if (heuristic_should_adjust ())
1508                 heuristic_adjust ();
1509 }
1510
1511 void
1512 ves_icall_System_Threading_Microsoft_ThreadPool_ReportThreadStatus (gboolean is_working)
1513 {
1514         // TODO
1515         mono_raise_exception (mono_get_exception_not_implemented (NULL));
1516 }
1517
1518 gboolean
1519 ves_icall_System_Threading_Microsoft_ThreadPool_RequestWorkerThread (void)
1520 {
1521         return worker_request (mono_domain_get ());
1522 }
1523
1524 gboolean G_GNUC_UNUSED
1525 ves_icall_System_Threading_Microsoft_ThreadPool_PostQueuedCompletionStatus (MonoNativeOverlapped *native_overlapped)
1526 {
1527         /* This copy the behavior of the current Mono implementation */
1528         mono_raise_exception (mono_get_exception_not_implemented (NULL));
1529         return FALSE;
1530 }
1531
1532 gboolean G_GNUC_UNUSED
1533 ves_icall_System_Threading_Microsoft_ThreadPool_BindIOCompletionCallbackNative (gpointer file_handle)
1534 {
1535         /* This copy the behavior of the current Mono implementation */
1536         return TRUE;
1537 }
1538
1539 gboolean G_GNUC_UNUSED
1540 ves_icall_System_Threading_Microsoft_ThreadPool_IsThreadPoolHosted (void)
1541 {
1542         return FALSE;
1543 }