Merge pull request #1691 from esdrubal/exitevent
[mono.git] / mono / metadata / threadpool-ms.c
1 /*
2  * threadpool-ms.c: Microsoft threadpool runtime support
3  *
4  * Author:
5  *      Ludovic Henry (ludovic.henry@xamarin.com)
6  *
7  * Copyright 2015 Xamarin, Inc (http://www.xamarin.com)
8  */
9
10 //
11 // Copyright (c) Microsoft. All rights reserved.
12 // Licensed under the MIT license. See LICENSE file in the project root for full license information.
13 //
14 // Files:
15 //  - src/vm/comthreadpool.cpp
16 //  - src/vm/win32threadpoolcpp
17 //  - src/vm/threadpoolrequest.cpp
18 //  - src/vm/hillclimbing.cpp
19 //
20 // Ported from C++ to C and adjusted to Mono runtime
21
22 #include <stdlib.h>
23 #include <math.h>
24 #include <config.h>
25 #include <glib.h>
26
27 #if !defined (HAVE_COMPLEX_H)
28 #include <../../support/libm/complex.h>
29 #else
30 #include <complex.h>
31 #endif
32
33 #include <mono/metadata/class-internals.h>
34 #include <mono/metadata/exception.h>
35 #include <mono/metadata/gc-internal.h>
36 #include <mono/metadata/object.h>
37 #include <mono/metadata/object-internals.h>
38 #include <mono/metadata/threadpool-ms.h>
39 #include <mono/metadata/threadpool-ms-io.h>
40 #include <mono/metadata/threadpool-internals.h>
41 #include <mono/utils/atomic.h>
42 #include <mono/utils/mono-compiler.h>
43 #include <mono/utils/mono-proclib.h>
44 #include <mono/utils/mono-threads.h>
45 #include <mono/utils/mono-time.h>
46 #include <mono/utils/mono-rand.h>
47
48 #define CPU_USAGE_LOW 80
49 #define CPU_USAGE_HIGH 95
50
51 #define MONITOR_INTERVAL 100 // ms
52
53 /* The exponent to apply to the gain. 1.0 means to use linear gain,
54  * higher values will enhance large moves and damp small ones.
55  * default: 2.0 */
56 #define HILL_CLIMBING_GAIN_EXPONENT 2.0
57
58 /* The 'cost' of a thread. 0 means drive for increased throughput regardless
59  * of thread count, higher values bias more against higher thread counts.
60  * default: 0.15 */
61 #define HILL_CLIMBING_BIAS 0.15
62
63 #define HILL_CLIMBING_WAVE_PERIOD 4
64 #define HILL_CLIMBING_MAX_WAVE_MAGNITUDE 20
65 #define HILL_CLIMBING_WAVE_MAGNITUDE_MULTIPLIER 1.0
66 #define HILL_CLIMBING_WAVE_HISTORY_SIZE 8
67 #define HILL_CLIMBING_TARGET_SIGNAL_TO_NOISE_RATIO 3.0
68 #define HILL_CLIMBING_MAX_CHANGE_PER_SECOND 4
69 #define HILL_CLIMBING_MAX_CHANGE_PER_SAMPLE 20
70 #define HILL_CLIMBING_SAMPLE_INTERVAL_LOW 10
71 #define HILL_CLIMBING_SAMPLE_INTERVAL_HIGH 200
72 #define HILL_CLIMBING_ERROR_SMOOTHING_FACTOR 0.01
73 #define HILL_CLIMBING_MAX_SAMPLE_ERROR_PERCENT 0.15
74
75 /* Keep in sync with System.Threading.RuntimeWorkItem */
76 struct _MonoRuntimeWorkItem {
77         MonoObject object;
78         MonoAsyncResult *async_result;
79 };
80
81 typedef union {
82         struct {
83                 gint16 max_working; /* determined by heuristic */
84                 gint16 active; /* working or waiting on thread_work_sem; warm threads */
85                 gint16 working;
86                 gint16 parked;
87         } _;
88         gint64 as_gint64;
89 } ThreadPoolCounter;
90
91 typedef struct {
92         MonoDomain *domain;
93         gint32 outstanding_request;
94 } ThreadPoolDomain;
95
96 typedef struct {
97         gint32 wave_period;
98         gint32 samples_to_measure;
99         gdouble target_throughput_ratio;
100         gdouble target_signal_to_noise_ratio;
101         gdouble max_change_per_second;
102         gdouble max_change_per_sample;
103         gint32 max_thread_wave_magnitude;
104         gint32 sample_interval_low;
105         gdouble thread_magnitude_multiplier;
106         gint32 sample_interval_high;
107         gdouble throughput_error_smoothing_factor;
108         gdouble gain_exponent;
109         gdouble max_sample_error;
110
111         gdouble current_control_setting;
112         gint64 total_samples;
113         gint16 last_thread_count;
114         gdouble elapsed_since_last_change;
115         gdouble completions_since_last_change;
116
117         gdouble average_throughput_noise;
118
119         gdouble *samples;
120         gdouble *thread_counts;
121
122         guint32 current_sample_interval;
123         gpointer random_interval_generator;
124
125         gint32 accumulated_completion_count;
126         gdouble accumulated_sample_duration;
127 } ThreadPoolHillClimbing;
128
129 typedef struct {
130         ThreadPoolCounter counters;
131
132         GPtrArray *domains; // ThreadPoolDomain* []
133         mono_mutex_t domains_lock;
134
135         GPtrArray *working_threads; // MonoInternalThread* []
136         mono_mutex_t working_threads_lock;
137
138         GPtrArray *parked_threads; // mono_cond_t* []
139         mono_mutex_t parked_threads_lock;
140
141         gint32 heuristic_completions;
142         guint32 heuristic_sample_start;
143         guint32 heuristic_last_dequeue; // ms
144         guint32 heuristic_last_adjustment; // ms
145         guint32 heuristic_adjustment_interval; // ms
146         ThreadPoolHillClimbing heuristic_hill_climbing;
147         mono_mutex_t heuristic_lock;
148
149         gint32 limit_worker_min;
150         gint32 limit_worker_max;
151         gint32 limit_io_min;
152         gint32 limit_io_max;
153
154         MonoCpuUsageState *cpu_usage_state;
155         gint32 cpu_usage;
156
157         /* suspended by the debugger */
158         gboolean suspended;
159 } ThreadPool;
160
161 typedef enum {
162         TRANSITION_WARMUP,
163         TRANSITION_INITIALIZING,
164         TRANSITION_RANDOM_MOVE,
165         TRANSITION_CLIMBING_MOVE,
166         TRANSITION_CHANGE_POINT,
167         TRANSITION_STABILIZING,
168         TRANSITION_STARVATION,
169         TRANSITION_THREAD_TIMED_OUT,
170         TRANSITION_UNDEFINED,
171 } ThreadPoolHeuristicStateTransition;
172
173 enum {
174         MONITOR_STATUS_REQUESTED,
175         MONITOR_STATUS_WAITING_FOR_REQUEST,
176         MONITOR_STATUS_NOT_RUNNING,
177 };
178
179 static gint32 status = STATUS_NOT_INITIALIZED;
180 static gint32 monitor_status = MONITOR_STATUS_NOT_RUNNING;
181
182 static ThreadPool* threadpool;
183
184 #define COUNTER_CHECK(counter) \
185         do { \
186                 g_assert (counter._.max_working > 0); \
187                 g_assert (counter._.active >= 0); \
188         } while (0)
189
190 #define COUNTER_READ() ((ThreadPoolCounter) InterlockedRead64 (&threadpool->counters.as_gint64))
191
192 #define COUNTER_ATOMIC(var,block) \
193         do { \
194                 ThreadPoolCounter __old; \
195                 do { \
196                         g_assert (threadpool); \
197                         (var) = __old = COUNTER_READ (); \
198                         { block; } \
199                         COUNTER_CHECK (var); \
200                 } while (InterlockedCompareExchange64 (&threadpool->counters.as_gint64, (var).as_gint64, __old.as_gint64) != __old.as_gint64); \
201         } while (0)
202
203 #define COUNTER_TRY_ATOMIC(res,var,block) \
204         do { \
205                 ThreadPoolCounter __old; \
206                 do { \
207                         g_assert (threadpool); \
208                         (var) = __old = COUNTER_READ (); \
209                         (res) = FALSE; \
210                         { block; } \
211                         COUNTER_CHECK (var); \
212                         (res) = InterlockedCompareExchange64 (&threadpool->counters.as_gint64, (var).as_gint64, __old.as_gint64) == __old.as_gint64; \
213                 } while (0); \
214         } while (0)
215
216 static gpointer
217 rand_create (void)
218 {
219         mono_rand_open ();
220         return mono_rand_init (NULL, 0);
221 }
222
223 static guint32
224 rand_next (gpointer *handle, guint32 min, guint32 max)
225 {
226         guint32 val;
227         if (!mono_rand_try_get_uint32 (handle, &val, min, max)) {
228                 // FIXME handle error
229                 g_assert_not_reached ();
230         }
231         return val;
232 }
233
234 static void
235 rand_free (gpointer handle)
236 {
237         mono_rand_close (handle);
238 }
239
240 static void
241 ensure_initialized (MonoBoolean *enable_worker_tracking)
242 {
243         ThreadPoolHillClimbing *hc;
244         const char *threads_per_cpu_env;
245         gint threads_per_cpu;
246         gint threads_count;
247
248         if (enable_worker_tracking) {
249                 // TODO implement some kind of switch to have the possibily to use it
250                 *enable_worker_tracking = FALSE;
251         }
252
253         if (status >= STATUS_INITIALIZED)
254                 return;
255         if (status == STATUS_INITIALIZING || InterlockedCompareExchange (&status, STATUS_INITIALIZING, STATUS_NOT_INITIALIZED) != STATUS_NOT_INITIALIZED) {
256                 while (status == STATUS_INITIALIZING)
257                         mono_thread_info_yield ();
258                 g_assert (status >= STATUS_INITIALIZED);
259                 return;
260         }
261
262         g_assert (!threadpool);
263         threadpool = g_new0 (ThreadPool, 1);
264         g_assert (threadpool);
265
266         threadpool->domains = g_ptr_array_new ();
267         mono_mutex_init_recursive (&threadpool->domains_lock);
268
269         threadpool->parked_threads = g_ptr_array_new ();
270         mono_mutex_init (&threadpool->parked_threads_lock);
271
272         threadpool->working_threads = g_ptr_array_new ();
273         mono_mutex_init (&threadpool->working_threads_lock);
274
275         threadpool->heuristic_adjustment_interval = 10;
276         mono_mutex_init (&threadpool->heuristic_lock);
277
278         mono_rand_open ();
279
280         hc = &threadpool->heuristic_hill_climbing;
281
282         hc->wave_period = HILL_CLIMBING_WAVE_PERIOD;
283         hc->max_thread_wave_magnitude = HILL_CLIMBING_MAX_WAVE_MAGNITUDE;
284         hc->thread_magnitude_multiplier = (gdouble) HILL_CLIMBING_WAVE_MAGNITUDE_MULTIPLIER;
285         hc->samples_to_measure = hc->wave_period * HILL_CLIMBING_WAVE_HISTORY_SIZE;
286         hc->target_throughput_ratio = (gdouble) HILL_CLIMBING_BIAS;
287         hc->target_signal_to_noise_ratio = (gdouble) HILL_CLIMBING_TARGET_SIGNAL_TO_NOISE_RATIO;
288         hc->max_change_per_second = (gdouble) HILL_CLIMBING_MAX_CHANGE_PER_SECOND;
289         hc->max_change_per_sample = (gdouble) HILL_CLIMBING_MAX_CHANGE_PER_SAMPLE;
290         hc->sample_interval_low = HILL_CLIMBING_SAMPLE_INTERVAL_LOW;
291         hc->sample_interval_high = HILL_CLIMBING_SAMPLE_INTERVAL_HIGH;
292         hc->throughput_error_smoothing_factor = (gdouble) HILL_CLIMBING_ERROR_SMOOTHING_FACTOR;
293         hc->gain_exponent = (gdouble) HILL_CLIMBING_GAIN_EXPONENT;
294         hc->max_sample_error = (gdouble) HILL_CLIMBING_MAX_SAMPLE_ERROR_PERCENT;
295         hc->current_control_setting = 0;
296         hc->total_samples = 0;
297         hc->last_thread_count = 0;
298         hc->average_throughput_noise = 0;
299         hc->elapsed_since_last_change = 0;
300         hc->accumulated_completion_count = 0;
301         hc->accumulated_sample_duration = 0;
302         hc->samples = g_new0 (gdouble, hc->samples_to_measure);
303         hc->thread_counts = g_new0 (gdouble, hc->samples_to_measure);
304         hc->random_interval_generator = rand_create ();
305         hc->current_sample_interval = rand_next (&hc->random_interval_generator, hc->sample_interval_low, hc->sample_interval_high);
306
307         if (!(threads_per_cpu_env = g_getenv ("MONO_THREADS_PER_CPU")))
308                 threads_per_cpu = 1;
309         else
310                 threads_per_cpu = CLAMP (atoi (threads_per_cpu_env), 1, 50);
311
312         threads_count = mono_cpu_count () * threads_per_cpu;
313
314         threadpool->limit_worker_min = threadpool->limit_io_min = threads_count;
315         threadpool->limit_worker_max = threadpool->limit_io_max = threads_count * 100;
316
317         threadpool->counters._.max_working = threadpool->limit_worker_min;
318
319         threadpool->cpu_usage_state = g_new0 (MonoCpuUsageState, 1);
320
321         threadpool->suspended = FALSE;
322
323         status = STATUS_INITIALIZED;
324 }
325
326 static void
327 ensure_cleanedup (void)
328 {
329         if (status == STATUS_NOT_INITIALIZED && InterlockedCompareExchange (&status, STATUS_CLEANED_UP, STATUS_NOT_INITIALIZED) == STATUS_NOT_INITIALIZED)
330                 return;
331         if (status == STATUS_INITIALIZING) {
332                 while (status == STATUS_INITIALIZING)
333                         mono_thread_info_yield ();
334         }
335         if (status == STATUS_CLEANED_UP)
336                 return;
337         if (status == STATUS_CLEANING_UP || InterlockedCompareExchange (&status, STATUS_CLEANING_UP, STATUS_INITIALIZED) != STATUS_INITIALIZED) {
338                 while (status == STATUS_CLEANING_UP)
339                         mono_thread_info_yield ();
340                 g_assert (status == STATUS_CLEANED_UP);
341                 return;
342         }
343
344         /* we make the assumption along the code that we are
345          * cleaning up only if the runtime is shutting down */
346         g_assert (mono_runtime_is_shutting_down ());
347
348         /* Unpark all worker threads */
349         mono_mutex_lock (&threadpool->parked_threads_lock);
350         for (;;) {
351                 guint i;
352                 ThreadPoolCounter counter = COUNTER_READ ();
353                 if (counter._.active == 0 && counter._.parked == 0)
354                         break;
355                 if (counter._.active == 1) {
356                         MonoInternalThread *thread = mono_thread_internal_current ();
357                         if (thread->threadpool_thread) {
358                                 /* if there is only one active thread
359                                  * left and it's the current one */
360                                 break;
361                         }
362                 }
363                 for (i = 0; i < threadpool->parked_threads->len; ++i) {
364                         mono_cond_t *cond = (mono_cond_t*) g_ptr_array_index (threadpool->parked_threads, i);
365                         mono_cond_signal (cond);
366                 }
367                 mono_mutex_unlock (&threadpool->parked_threads_lock);
368                 usleep (1000);
369                 mono_mutex_lock (&threadpool->parked_threads_lock);
370         }
371         mono_mutex_unlock (&threadpool->parked_threads_lock);
372
373         while (monitor_status != MONITOR_STATUS_NOT_RUNNING)
374                 usleep (1000);
375
376         g_ptr_array_free (threadpool->domains, TRUE);
377         mono_mutex_destroy (&threadpool->domains_lock);
378
379         g_ptr_array_free (threadpool->parked_threads, TRUE);
380         mono_mutex_destroy (&threadpool->parked_threads_lock);
381
382         g_ptr_array_free (threadpool->working_threads, TRUE);
383         mono_mutex_destroy (&threadpool->working_threads_lock);
384
385         mono_mutex_destroy (&threadpool->heuristic_lock);
386         g_free (threadpool->heuristic_hill_climbing.samples);
387         g_free (threadpool->heuristic_hill_climbing.thread_counts);
388         rand_free (threadpool->heuristic_hill_climbing.random_interval_generator);
389
390         g_free (threadpool->cpu_usage_state);
391
392         g_assert (threadpool);
393         g_free (threadpool);
394         threadpool = NULL;
395         g_assert (!threadpool);
396
397         status = STATUS_CLEANED_UP;
398 }
399
400 void
401 mono_threadpool_ms_enqueue_work_item (MonoDomain *domain, MonoObject *work_item)
402 {
403         static MonoClass *threadpool_class = NULL;
404         static MonoMethod *unsafe_queue_custom_work_item_method = NULL;
405         MonoDomain *current_domain;
406         MonoBoolean f;
407         gpointer args [2];
408
409         g_assert (work_item);
410
411         if (!threadpool_class)
412                 threadpool_class = mono_class_from_name (mono_defaults.corlib, "System.Threading", "ThreadPool");
413         g_assert (threadpool_class);
414
415         if (!unsafe_queue_custom_work_item_method)
416                 unsafe_queue_custom_work_item_method = mono_class_get_method_from_name (threadpool_class, "UnsafeQueueCustomWorkItem", 2);
417         g_assert (unsafe_queue_custom_work_item_method);
418
419         f = FALSE;
420
421         args [0] = (gpointer) work_item;
422         args [1] = (gpointer) &f;
423
424         current_domain = mono_domain_get ();
425         if (current_domain == domain) {
426                 mono_runtime_invoke (unsafe_queue_custom_work_item_method, NULL, args, NULL);
427         } else {
428                 mono_thread_push_appdomain_ref (domain);
429                 if (mono_domain_set (domain, FALSE)) {
430                         mono_runtime_invoke (unsafe_queue_custom_work_item_method, NULL, args, NULL);
431                         mono_domain_set (current_domain, TRUE);
432                 }
433                 mono_thread_pop_appdomain_ref ();
434         }
435 }
436
437 void
438 mono_threadpool_ms_enqueue_async_result (MonoDomain *domain, MonoAsyncResult *ares)
439 {
440         static MonoClass *runtime_work_item_class = NULL;
441         MonoRuntimeWorkItem *rwi;
442
443         g_assert (ares);
444
445         if (!runtime_work_item_class)
446                 runtime_work_item_class = mono_class_from_name (mono_defaults.corlib, "System.Threading", "MonoRuntimeWorkItem");
447         g_assert (runtime_work_item_class);
448
449         rwi = (MonoRuntimeWorkItem*) mono_object_new (domain, runtime_work_item_class);
450         MONO_OBJECT_SETREF (rwi, async_result, ares);
451
452         mono_threadpool_ms_enqueue_work_item (domain, (MonoObject*) rwi);
453 }
454
455 static void
456 domain_add (ThreadPoolDomain *tpdomain)
457 {
458         guint i, len;
459
460         g_assert (tpdomain);
461
462         mono_mutex_lock (&threadpool->domains_lock);
463         len = threadpool->domains->len;
464         for (i = 0; i < len; ++i) {
465                 if (g_ptr_array_index (threadpool->domains, i) == tpdomain)
466                         break;
467         }
468         if (i == len)
469                 g_ptr_array_add (threadpool->domains, tpdomain);
470         mono_mutex_unlock (&threadpool->domains_lock);
471 }
472
473 static gboolean
474 domain_remove (ThreadPoolDomain *tpdomain)
475 {
476         gboolean res;
477
478         g_assert (tpdomain);
479
480         mono_mutex_lock (&threadpool->domains_lock);
481         res = g_ptr_array_remove (threadpool->domains, tpdomain);
482         mono_mutex_unlock (&threadpool->domains_lock);
483
484         return res;
485 }
486
487 static ThreadPoolDomain *
488 domain_get_or_create (MonoDomain *domain)
489 {
490         ThreadPoolDomain *tpdomain = NULL;
491         guint i;
492
493         g_assert (domain);
494
495         mono_mutex_lock (&threadpool->domains_lock);
496         for (i = 0; i < threadpool->domains->len; ++i) {
497                 ThreadPoolDomain *tmp = g_ptr_array_index (threadpool->domains, i);
498                 if (tmp->domain == domain) {
499                         tpdomain = tmp;
500                         break;
501                 }
502         }
503         if (!tpdomain) {
504                 tpdomain = g_new0 (ThreadPoolDomain, 1);
505                 tpdomain->domain = domain;
506                 domain_add (tpdomain);
507         }
508         mono_mutex_unlock (&threadpool->domains_lock);
509         return tpdomain;
510 }
511
512 static gboolean
513 domain_any_has_request ()
514 {
515         gboolean res = FALSE;
516         guint i;
517
518         mono_mutex_lock (&threadpool->domains_lock);
519         for (i = 0; i < threadpool->domains->len; ++i) {
520                 ThreadPoolDomain *tmp = g_ptr_array_index (threadpool->domains, i);
521                 if (tmp->outstanding_request > 0) {
522                         res = TRUE;
523                         break;
524                 }
525         }
526         mono_mutex_unlock (&threadpool->domains_lock);
527         return res;
528 }
529
530 static ThreadPoolDomain *
531 domain_get_next (ThreadPoolDomain *current)
532 {
533         ThreadPoolDomain *tpdomain = NULL;
534         guint len;
535
536         mono_mutex_lock (&threadpool->domains_lock);
537         len = threadpool->domains->len;
538         if (len > 0) {
539                 guint i, current_idx = -1;
540                 if (current) {
541                         for (i = 0; i < len; ++i) {
542                                 if (current == g_ptr_array_index (threadpool->domains, i)) {
543                                         current_idx = i;
544                                         break;
545                                 }
546                         }
547                         g_assert (current_idx >= 0);
548                 }
549                 for (i = current_idx + 1; i < len + current_idx + 1; ++i) {
550                         ThreadPoolDomain *tmp = g_ptr_array_index (threadpool->domains, i % len);
551                         if (tmp->outstanding_request > 0) {
552                                 tpdomain = tmp;
553                                 tpdomain->outstanding_request --;
554                                 g_assert (tpdomain->outstanding_request >= 0);
555                                 break;
556                         }
557                 }
558         }
559         mono_mutex_unlock (&threadpool->domains_lock);
560         return tpdomain;
561 }
562
563 static void
564 worker_park (void)
565 {
566         mono_cond_t cond;
567         mono_cond_init (&cond, NULL);
568
569         mono_mutex_lock (&threadpool->parked_threads_lock);
570         g_ptr_array_add (threadpool->parked_threads, &cond);
571         mono_cond_wait (&cond, &threadpool->parked_threads_lock);
572         g_ptr_array_remove (threadpool->parked_threads, &cond);
573         mono_mutex_unlock (&threadpool->parked_threads_lock);
574
575         mono_cond_destroy (&cond);
576 }
577
578 static gboolean
579 worker_try_unpark (void)
580 {
581         gboolean res = FALSE;
582         guint len;
583
584         mono_mutex_lock (&threadpool->parked_threads_lock);
585         len = threadpool->parked_threads->len;
586         if (len > 0) {
587                 mono_cond_t *cond = (mono_cond_t*) g_ptr_array_index (threadpool->parked_threads, len - 1);
588                 mono_cond_signal (cond);
589                 res = TRUE;
590         }
591         mono_mutex_unlock (&threadpool->parked_threads_lock);
592         return res;
593 }
594
595 static void
596 worker_thread (gpointer data)
597 {
598         static MonoClass *threadpool_wait_callback_class = NULL;
599         static MonoMethod *perform_wait_callback_method = NULL;
600         MonoInternalThread *thread;
601         ThreadPoolDomain *tpdomain;
602         ThreadPoolCounter counter;
603         gboolean retire = FALSE;
604
605         g_assert (status >= STATUS_INITIALIZED);
606
607         tpdomain = data;
608         g_assert (tpdomain);
609         g_assert (tpdomain->domain);
610
611         if (mono_runtime_is_shutting_down () || mono_domain_is_unloading (tpdomain->domain)) {
612                 COUNTER_ATOMIC (counter, { counter._.active --; });
613                 return;
614         }
615
616         if (!threadpool_wait_callback_class)
617                 threadpool_wait_callback_class = mono_class_from_name (mono_defaults.corlib, "System.Threading.Microsoft", "_ThreadPoolWaitCallback");
618         g_assert (threadpool_wait_callback_class);
619
620         if (!perform_wait_callback_method)
621                 perform_wait_callback_method = mono_class_get_method_from_name (threadpool_wait_callback_class, "PerformWaitCallback", 0);
622         g_assert (perform_wait_callback_method);
623
624         g_assert (threadpool);
625
626         thread = mono_thread_internal_current ();
627         g_assert (thread);
628
629         mono_mutex_lock (&threadpool->domains_lock);
630
631         do {
632                 guint i, c;
633
634                 g_assert (tpdomain);
635                 g_assert (tpdomain->domain);
636
637                 tpdomain->domain->threadpool_jobs ++;
638
639                 mono_mutex_unlock (&threadpool->domains_lock);
640
641                 mono_mutex_lock (&threadpool->working_threads_lock);
642                 g_ptr_array_add (threadpool->working_threads, thread);
643                 mono_mutex_unlock (&threadpool->working_threads_lock);
644
645                 COUNTER_ATOMIC (counter, { counter._.working ++; });
646
647                 mono_thread_push_appdomain_ref (tpdomain->domain);
648                 if (mono_domain_set (tpdomain->domain, FALSE)) {
649                         MonoObject *exc = NULL;
650                         MonoObject *res = mono_runtime_invoke (perform_wait_callback_method, NULL, NULL, &exc);
651                         if (exc)
652                                 mono_internal_thread_unhandled_exception (exc);
653                         else if (res && *(MonoBoolean*) mono_object_unbox (res) == FALSE)
654                                 retire = TRUE;
655
656                         mono_thread_clr_state (thread , ~ThreadState_Background);
657                         if (!mono_thread_test_state (thread , ThreadState_Background))
658                                 ves_icall_System_Threading_Thread_SetState (thread, ThreadState_Background);
659                 }
660                 mono_thread_pop_appdomain_ref ();
661
662                 COUNTER_ATOMIC (counter, { counter._.working --; });
663
664                 mono_mutex_lock (&threadpool->working_threads_lock);
665                 g_ptr_array_remove_fast (threadpool->working_threads, thread);
666                 mono_mutex_unlock (&threadpool->working_threads_lock);
667
668                 mono_mutex_lock (&threadpool->domains_lock);
669
670                 tpdomain->domain->threadpool_jobs --;
671                 g_assert (tpdomain->domain->threadpool_jobs >= 0);
672
673                 if (tpdomain->domain->threadpool_jobs == 0 && mono_domain_is_unloading (tpdomain->domain)) {
674                         gboolean removed = domain_remove (tpdomain);
675                         g_assert (removed);
676                         if (tpdomain->domain->cleanup_semaphore)
677                                 ReleaseSemaphore (tpdomain->domain->cleanup_semaphore, 1, NULL);
678                         g_free (tpdomain);
679                         tpdomain = NULL;
680                 }
681
682                 for (i = 0, c = 5; i < c; ++i) {
683                         if (mono_runtime_is_shutting_down ())
684                                 break;
685
686                         if (!retire) {
687                                 tpdomain = domain_get_next (tpdomain);
688                                 if (tpdomain)
689                                         break;
690                         }
691
692                         if (i < c - 1) {
693                                 gboolean park = TRUE;
694
695                                 COUNTER_ATOMIC (counter, {
696                                         if (counter._.active <= counter._.max_working) {
697                                                 park = FALSE;
698                                                 break;
699                                         }
700                                         counter._.active --;
701                                         counter._.parked ++;
702                                 });
703
704                                 if (park) {
705                                         mono_mutex_unlock (&threadpool->domains_lock);
706                                         mono_gc_set_skip_thread (TRUE);
707                                         worker_park ();
708                                         mono_gc_set_skip_thread (FALSE);
709                                         mono_mutex_lock (&threadpool->domains_lock);
710
711                                         COUNTER_ATOMIC (counter, {
712                                                 counter._.active ++;
713                                                 counter._.parked --;
714                                         });
715                                 }
716                         }
717
718                         retire = FALSE;
719                 }
720         } while (tpdomain && !mono_runtime_is_shutting_down ());
721
722         mono_mutex_unlock (&threadpool->domains_lock);
723
724         COUNTER_ATOMIC (counter, { counter._.active --; });
725 }
726
727 static gboolean
728 worker_try_create (ThreadPoolDomain *tpdomain)
729 {
730         g_assert (tpdomain);
731         g_assert (tpdomain->domain);
732
733         return mono_thread_create_internal (tpdomain->domain, worker_thread, tpdomain, TRUE, 0) != NULL;
734 }
735
736 static void monitor_ensure_running (void);
737
738 static gboolean
739 worker_request (MonoDomain *domain)
740 {
741         ThreadPoolDomain *tpdomain;
742         ThreadPoolCounter counter;
743
744         g_assert (domain);
745         g_assert (threadpool);
746
747         if (mono_runtime_is_shutting_down () || mono_domain_is_unloading (domain))
748                 return FALSE;
749
750         mono_mutex_lock (&threadpool->domains_lock);
751         tpdomain = domain_get_or_create (domain);
752         g_assert (tpdomain);
753         tpdomain->outstanding_request ++;
754         mono_mutex_unlock (&threadpool->domains_lock);
755
756         if (threadpool->suspended)
757                 return FALSE;
758
759         monitor_ensure_running ();
760
761         if (worker_try_unpark ())
762                 return TRUE;
763
764         COUNTER_ATOMIC (counter, {
765                 if (counter._.active >= counter._.max_working)
766                         return FALSE;
767                 counter._.active ++;
768         });
769
770         if (worker_try_create (tpdomain))
771                 return TRUE;
772
773         COUNTER_ATOMIC (counter, { counter._.active --; });
774         return FALSE;
775 }
776
777 static gboolean
778 monitor_should_keep_running (void)
779 {
780         g_assert (monitor_status == MONITOR_STATUS_WAITING_FOR_REQUEST || monitor_status == MONITOR_STATUS_REQUESTED);
781
782         if (InterlockedExchange (&monitor_status, MONITOR_STATUS_WAITING_FOR_REQUEST) == MONITOR_STATUS_WAITING_FOR_REQUEST) {
783                 if (mono_runtime_is_shutting_down () || !domain_any_has_request ()) {
784                         if (InterlockedExchange (&monitor_status, MONITOR_STATUS_NOT_RUNNING) == MONITOR_STATUS_WAITING_FOR_REQUEST)
785                                 return FALSE;
786                 }
787         }
788
789         g_assert (monitor_status == MONITOR_STATUS_WAITING_FOR_REQUEST || monitor_status == MONITOR_STATUS_REQUESTED);
790
791         return TRUE;
792 }
793
794 static gboolean
795 monitor_sufficient_delay_since_last_dequeue (void)
796 {
797         guint32 threshold;
798
799         g_assert (threadpool);
800
801         if (threadpool->cpu_usage < CPU_USAGE_LOW) {
802                 threshold = MONITOR_INTERVAL;
803         } else {
804                 ThreadPoolCounter counter = COUNTER_READ ();
805                 threshold = counter._.max_working * MONITOR_INTERVAL * 2;
806         }
807
808         return mono_msec_ticks () >= threadpool->heuristic_last_dequeue + threshold;
809 }
810
811 static void hill_climbing_force_change (gint16 new_thread_count, ThreadPoolHeuristicStateTransition transition);
812
813 static void
814 monitor_thread (void)
815 {
816         MonoInternalThread *current_thread = mono_thread_internal_current ();
817         guint i;
818
819         mono_cpu_usage (threadpool->cpu_usage_state);
820
821         do {
822                 MonoInternalThread *thread;
823                 gboolean all_waitsleepjoin = TRUE;
824                 gint32 interval_left = MONITOR_INTERVAL;
825                 gint32 awake = 0; /* number of spurious awakes we tolerate before doing a round of rebalancing */
826
827                 g_assert (monitor_status != MONITOR_STATUS_NOT_RUNNING);
828
829                 mono_gc_set_skip_thread (TRUE);
830
831                 do {
832                         guint32 ts;
833
834                         if (mono_runtime_is_shutting_down ())
835                                 break;
836
837                         ts = mono_msec_ticks ();
838                         if (SleepEx (interval_left, TRUE) == 0)
839                                 break;
840                         interval_left -= mono_msec_ticks () - ts;
841
842                         if ((current_thread->state & (ThreadState_StopRequested | ThreadState_SuspendRequested)) != 0)
843                                 mono_thread_interruption_checkpoint ();
844                 } while (interval_left > 0 && ++awake < 10);
845
846                 mono_gc_set_skip_thread (FALSE);
847
848                 if (threadpool->suspended)
849                         continue;
850
851                 if (mono_runtime_is_shutting_down () || !domain_any_has_request ())
852                         continue;
853
854                 mono_mutex_lock (&threadpool->working_threads_lock);
855                 for (i = 0; i < threadpool->working_threads->len; ++i) {
856                         thread = g_ptr_array_index (threadpool->working_threads, i);
857                         if ((thread->state & ThreadState_WaitSleepJoin) == 0) {
858                                 all_waitsleepjoin = FALSE;
859                                 break;
860                         }
861                 }
862                 mono_mutex_unlock (&threadpool->working_threads_lock);
863
864                 if (all_waitsleepjoin) {
865                         ThreadPoolCounter counter;
866                         COUNTER_ATOMIC (counter, { counter._.max_working ++; });
867                         hill_climbing_force_change (counter._.max_working, TRANSITION_STARVATION);
868                 }
869
870                 threadpool->cpu_usage = mono_cpu_usage (threadpool->cpu_usage_state);
871
872                 if (monitor_sufficient_delay_since_last_dequeue ()) {
873                         for (i = 0; i < 5; ++i) {
874                                 ThreadPoolDomain *tpdomain;
875                                 ThreadPoolCounter counter;
876                                 gboolean success;
877
878                                 if (mono_runtime_is_shutting_down ())
879                                         break;
880
881                                 if (worker_try_unpark ())
882                                         break;
883
884                                 COUNTER_TRY_ATOMIC (success, counter, {
885                                         if (counter._.active >= counter._.max_working)
886                                                 break;
887                                         counter._.active ++;
888                                 });
889
890                                 if (!success)
891                                         continue;
892
893                                 tpdomain = domain_get_next (NULL);
894                                 if (tpdomain && worker_try_create (tpdomain))
895                                         break;
896
897                                 COUNTER_ATOMIC (counter, { counter._.active --; });
898                         }
899                 }
900         } while (monitor_should_keep_running ());
901 }
902
903 static void
904 monitor_ensure_running (void)
905 {
906         for (;;) {
907                 switch (monitor_status) {
908                 case MONITOR_STATUS_REQUESTED:
909                         return;
910                 case MONITOR_STATUS_WAITING_FOR_REQUEST:
911                         InterlockedCompareExchange (&monitor_status, MONITOR_STATUS_REQUESTED, MONITOR_STATUS_WAITING_FOR_REQUEST);
912                         break;
913                 case MONITOR_STATUS_NOT_RUNNING:
914                         if (InterlockedCompareExchange (&monitor_status, MONITOR_STATUS_REQUESTED, MONITOR_STATUS_NOT_RUNNING) == MONITOR_STATUS_NOT_RUNNING) {
915                                 if (!mono_thread_create_internal (mono_get_root_domain (), monitor_thread, NULL, TRUE, SMALL_STACK))
916                                         monitor_status = MONITOR_STATUS_NOT_RUNNING;
917                                 return;
918                         }
919                         break;
920                 default: g_assert_not_reached ();
921                 }
922         }
923 }
924
925 static void
926 hill_climbing_change_thread_count (gint16 new_thread_count, ThreadPoolHeuristicStateTransition transition)
927 {
928         ThreadPoolHillClimbing *hc;
929
930         g_assert (threadpool);
931
932         hc = &threadpool->heuristic_hill_climbing;
933
934         hc->last_thread_count = new_thread_count;
935         hc->current_sample_interval = rand_next (&hc->random_interval_generator, hc->sample_interval_low, hc->sample_interval_high);
936         hc->elapsed_since_last_change = 0;
937         hc->completions_since_last_change = 0;
938 }
939
940 static void
941 hill_climbing_force_change (gint16 new_thread_count, ThreadPoolHeuristicStateTransition transition)
942 {
943         ThreadPoolHillClimbing *hc;
944
945         g_assert (threadpool);
946
947         hc = &threadpool->heuristic_hill_climbing;
948
949         if (new_thread_count != hc->last_thread_count) {
950                 hc->current_control_setting += new_thread_count - hc->last_thread_count;
951                 hill_climbing_change_thread_count (new_thread_count, transition);
952         }
953 }
954
955 static double complex
956 hill_climbing_get_wave_component (gdouble *samples, guint sample_count, gdouble period)
957 {
958         ThreadPoolHillClimbing *hc;
959         gdouble w, cosine, sine, coeff, q0, q1, q2;
960         guint i;
961
962         g_assert (threadpool);
963         g_assert (sample_count >= period);
964         g_assert (period >= 2);
965
966         hc = &threadpool->heuristic_hill_climbing;
967
968         w = 2.0 * M_PI / period;
969         cosine = cos (w);
970         sine = sin (w);
971         coeff = 2.0 * cosine;
972         q0 = q1 = q2 = 0;
973
974         for (i = 0; i < sample_count; ++i) {
975                 q0 = coeff * q1 - q2 + samples [(hc->total_samples - sample_count + i) % hc->samples_to_measure];
976                 q2 = q1;
977                 q1 = q0;
978         }
979
980         return ((q1 - q2 * cosine) + (q2 * sine) * I) / ((gdouble) sample_count);
981 }
982
983 static gint16
984 hill_climbing_update (gint16 current_thread_count, guint32 sample_duration, gint32 completions, guint32 *adjustment_interval)
985 {
986         ThreadPoolHillClimbing *hc;
987         ThreadPoolHeuristicStateTransition transition;
988         gdouble throughput;
989         gdouble throughput_error_estimate;
990         gdouble confidence;
991         gdouble move;
992         gdouble gain;
993         gint sample_index;
994         gint sample_count;
995         gint new_thread_wave_magnitude;
996         gint new_thread_count;
997         double complex thread_wave_component;
998         double complex throughput_wave_component;
999         double complex ratio;
1000
1001         g_assert (threadpool);
1002         g_assert (adjustment_interval);
1003
1004         hc = &threadpool->heuristic_hill_climbing;
1005
1006         /* If someone changed the thread count without telling us, update our records accordingly. */
1007         if (current_thread_count != hc->last_thread_count)
1008                 hill_climbing_force_change (current_thread_count, TRANSITION_INITIALIZING);
1009
1010         /* Update the cumulative stats for this thread count */
1011         hc->elapsed_since_last_change += sample_duration;
1012         hc->completions_since_last_change += completions;
1013
1014         /* Add in any data we've already collected about this sample */
1015         sample_duration += hc->accumulated_sample_duration;
1016         completions += hc->accumulated_completion_count;
1017
1018         /* We need to make sure we're collecting reasonably accurate data. Since we're just counting the end
1019          * of each work item, we are goinng to be missing some data about what really happened during the
1020          * sample interval. The count produced by each thread includes an initial work item that may have
1021          * started well before the start of the interval, and each thread may have been running some new
1022          * work item for some time before the end of the interval, which did not yet get counted. So
1023          * our count is going to be off by +/- threadCount workitems.
1024          *
1025          * The exception is that the thread that reported to us last time definitely wasn't running any work
1026          * at that time, and the thread that's reporting now definitely isn't running a work item now. So
1027          * we really only need to consider threadCount-1 threads.
1028          *
1029          * Thus the percent error in our count is +/- (threadCount-1)/numCompletions.
1030          *
1031          * We cannot rely on the frequency-domain analysis we'll be doing later to filter out this error, because
1032          * of the way it accumulates over time. If this sample is off by, say, 33% in the negative direction,
1033          * then the next one likely will be too. The one after that will include the sum of the completions
1034          * we missed in the previous samples, and so will be 33% positive. So every three samples we'll have
1035          * two "low" samples and one "high" sample. This will appear as periodic variation right in the frequency
1036          * range we're targeting, which will not be filtered by the frequency-domain translation. */
1037         if (hc->total_samples > 0 && ((current_thread_count - 1.0) / completions) >= hc->max_sample_error) {
1038                 /* Not accurate enough yet. Let's accumulate the data so
1039                  * far, and tell the ThreadPool to collect a little more. */
1040                 hc->accumulated_sample_duration = sample_duration;
1041                 hc->accumulated_completion_count = completions;
1042                 *adjustment_interval = 10;
1043                 return current_thread_count;
1044         }
1045
1046         /* We've got enouugh data for our sample; reset our accumulators for next time. */
1047         hc->accumulated_sample_duration = 0;
1048         hc->accumulated_completion_count = 0;
1049
1050         /* Add the current thread count and throughput sample to our history. */
1051         throughput = ((gdouble) completions) / sample_duration;
1052
1053         sample_index = hc->total_samples % hc->samples_to_measure;
1054         hc->samples [sample_index] = throughput;
1055         hc->thread_counts [sample_index] = current_thread_count;
1056         hc->total_samples ++;
1057
1058         /* Set up defaults for our metrics. */
1059         thread_wave_component = 0;
1060         throughput_wave_component = 0;
1061         throughput_error_estimate = 0;
1062         ratio = 0;
1063         confidence = 0;
1064
1065         transition = TRANSITION_WARMUP;
1066
1067         /* How many samples will we use? It must be at least the three wave periods we're looking for, and it must also
1068          * be a whole multiple of the primary wave's period; otherwise the frequency we're looking for will fall between
1069          * two frequency bands in the Fourier analysis, and we won't be able to measure it accurately. */
1070         sample_count = ((gint) MIN (hc->total_samples - 1, hc->samples_to_measure) / hc->wave_period) * hc->wave_period;
1071
1072         if (sample_count > hc->wave_period) {
1073                 guint i;
1074                 gdouble average_throughput;
1075                 gdouble average_thread_count;
1076                 gdouble sample_sum = 0;
1077                 gdouble thread_sum = 0;
1078
1079                 /* Average the throughput and thread count samples, so we can scale the wave magnitudes later. */
1080                 for (i = 0; i < sample_count; ++i) {
1081                         guint j = (hc->total_samples - sample_count + i) % hc->samples_to_measure;
1082                         sample_sum += hc->samples [j];
1083                         thread_sum += hc->thread_counts [j];
1084                 }
1085
1086                 average_throughput = sample_sum / sample_count;
1087                 average_thread_count = thread_sum / sample_count;
1088
1089                 if (average_throughput > 0 && average_thread_count > 0) {
1090                         gdouble noise_for_confidence, adjacent_period_1, adjacent_period_2;
1091
1092                         /* Calculate the periods of the adjacent frequency bands we'll be using to
1093                          * measure noise levels. We want the two adjacent Fourier frequency bands. */
1094                         adjacent_period_1 = sample_count / (((gdouble) sample_count) / ((gdouble) hc->wave_period) + 1);
1095                         adjacent_period_2 = sample_count / (((gdouble) sample_count) / ((gdouble) hc->wave_period) - 1);
1096
1097                         /* Get the the three different frequency components of the throughput (scaled by average
1098                          * throughput). Our "error" estimate (the amount of noise that might be present in the
1099                          * frequency band we're really interested in) is the average of the adjacent bands. */
1100                         throughput_wave_component = hill_climbing_get_wave_component (hc->samples, sample_count, hc->wave_period) / average_throughput;
1101                         throughput_error_estimate = cabs (hill_climbing_get_wave_component (hc->samples, sample_count, adjacent_period_1) / average_throughput);
1102
1103                         if (adjacent_period_2 <= sample_count) {
1104                                 throughput_error_estimate = MAX (throughput_error_estimate, cabs (hill_climbing_get_wave_component (
1105                                         hc->samples, sample_count, adjacent_period_2) / average_throughput));
1106                         }
1107
1108                         /* Do the same for the thread counts, so we have something to compare to. We don't
1109                          * measure thread count noise, because there is none; these are exact measurements. */
1110                         thread_wave_component = hill_climbing_get_wave_component (hc->thread_counts, sample_count, hc->wave_period) / average_thread_count;
1111
1112                         /* Update our moving average of the throughput noise. We'll use this
1113                          * later as feedback to determine the new size of the thread wave. */
1114                         if (hc->average_throughput_noise == 0) {
1115                                 hc->average_throughput_noise = throughput_error_estimate;
1116                         } else {
1117                                 hc->average_throughput_noise = (hc->throughput_error_smoothing_factor * throughput_error_estimate)
1118                                         + ((1.0 + hc->throughput_error_smoothing_factor) * hc->average_throughput_noise);
1119                         }
1120
1121                         if (cabs (thread_wave_component) > 0) {
1122                                 /* Adjust the throughput wave so it's centered around the target wave,
1123                                  * and then calculate the adjusted throughput/thread ratio. */
1124                                 ratio = (throughput_wave_component - (hc->target_throughput_ratio * thread_wave_component)) / thread_wave_component;
1125                                 transition = TRANSITION_CLIMBING_MOVE;
1126                         } else {
1127                                 ratio = 0;
1128                                 transition = TRANSITION_STABILIZING;
1129                         }
1130
1131                         noise_for_confidence = MAX (hc->average_throughput_noise, throughput_error_estimate);
1132                         if (noise_for_confidence > 0) {
1133                                 confidence = cabs (thread_wave_component) / noise_for_confidence / hc->target_signal_to_noise_ratio;
1134                         } else {
1135                                 /* there is no noise! */
1136                                 confidence = 1.0;
1137                         }
1138                 }
1139         }
1140
1141         /* We use just the real part of the complex ratio we just calculated. If the throughput signal
1142          * is exactly in phase with the thread signal, this will be the same as taking the magnitude of
1143          * the complex move and moving that far up. If they're 180 degrees out of phase, we'll move
1144          * backward (because this indicates that our changes are having the opposite of the intended effect).
1145          * If they're 90 degrees out of phase, we won't move at all, because we can't tell wether we're
1146          * having a negative or positive effect on throughput. */
1147         move = creal (ratio);
1148         move = CLAMP (move, -1.0, 1.0);
1149
1150         /* Apply our confidence multiplier. */
1151         move *= CLAMP (confidence, -1.0, 1.0);
1152
1153         /* Now apply non-linear gain, such that values around zero are attenuated, while higher values
1154          * are enhanced. This allows us to move quickly if we're far away from the target, but more slowly
1155         * if we're getting close, giving us rapid ramp-up without wild oscillations around the target. */
1156         gain = hc->max_change_per_second * sample_duration;
1157         move = pow (fabs (move), hc->gain_exponent) * (move >= 0.0 ? 1 : -1) * gain;
1158         move = MIN (move, hc->max_change_per_sample);
1159
1160         /* If the result was positive, and CPU is > 95%, refuse the move. */
1161         if (move > 0.0 && threadpool->cpu_usage > CPU_USAGE_HIGH)
1162                 move = 0.0;
1163
1164         /* Apply the move to our control setting. */
1165         hc->current_control_setting += move;
1166
1167         /* Calculate the new thread wave magnitude, which is based on the moving average we've been keeping of the
1168          * throughput error.  This average starts at zero, so we'll start with a nice safe little wave at first. */
1169         new_thread_wave_magnitude = (gint)(0.5 + (hc->current_control_setting * hc->average_throughput_noise
1170                 * hc->target_signal_to_noise_ratio * hc->thread_magnitude_multiplier * 2.0));
1171         new_thread_wave_magnitude = CLAMP (new_thread_wave_magnitude, 1, hc->max_thread_wave_magnitude);
1172
1173         /* Make sure our control setting is within the ThreadPool's limits. */
1174         hc->current_control_setting = CLAMP (hc->current_control_setting, threadpool->limit_worker_min, threadpool->limit_worker_max - new_thread_wave_magnitude);
1175
1176         /* Calculate the new thread count (control setting + square wave). */
1177         new_thread_count = (gint)(hc->current_control_setting + new_thread_wave_magnitude * ((hc->total_samples / (hc->wave_period / 2)) % 2));
1178
1179         /* Make sure the new thread count doesn't exceed the ThreadPool's limits. */
1180         new_thread_count = CLAMP (new_thread_count, threadpool->limit_worker_min, threadpool->limit_worker_max);
1181
1182         if (new_thread_count != current_thread_count)
1183                 hill_climbing_change_thread_count (new_thread_count, transition);
1184
1185         if (creal (ratio) < 0.0 && new_thread_count == threadpool->limit_worker_min)
1186                 *adjustment_interval = (gint)(0.5 + hc->current_sample_interval * (10.0 * MAX (-1.0 * creal (ratio), 1.0)));
1187         else
1188                 *adjustment_interval = hc->current_sample_interval;
1189
1190         return new_thread_count;
1191 }
1192
1193 static void
1194 heuristic_notify_work_completed (void)
1195 {
1196         g_assert (threadpool);
1197
1198         InterlockedIncrement (&threadpool->heuristic_completions);
1199         threadpool->heuristic_last_dequeue = mono_msec_ticks ();
1200 }
1201
1202 static gboolean
1203 heuristic_should_adjust ()
1204 {
1205         g_assert (threadpool);
1206
1207         if (threadpool->heuristic_last_dequeue > threadpool->heuristic_last_adjustment + threadpool->heuristic_adjustment_interval) {
1208                 ThreadPoolCounter counter = COUNTER_READ ();
1209                 if (counter._.active <= counter._.max_working)
1210                         return TRUE;
1211         }
1212
1213         return FALSE;
1214 }
1215
1216 static void
1217 heuristic_adjust ()
1218 {
1219         g_assert (threadpool);
1220
1221         if (mono_mutex_trylock (&threadpool->heuristic_lock) == 0) {
1222                 gint32 completions = InterlockedExchange (&threadpool->heuristic_completions, 0);
1223                 guint32 sample_end = mono_msec_ticks ();
1224                 guint32 sample_duration = sample_end - threadpool->heuristic_sample_start;
1225
1226                 if (sample_duration >= threadpool->heuristic_adjustment_interval / 2) {
1227                         ThreadPoolCounter counter;
1228                         gint16 new_thread_count;
1229
1230                         counter = COUNTER_READ ();
1231                         new_thread_count = hill_climbing_update (counter._.max_working, sample_duration, completions, &threadpool->heuristic_adjustment_interval);
1232
1233                         COUNTER_ATOMIC (counter, { counter._.max_working = new_thread_count; });
1234
1235                         if (new_thread_count > counter._.max_working)
1236                                 worker_request (mono_domain_get ());
1237
1238                         threadpool->heuristic_sample_start = sample_end;
1239                         threadpool->heuristic_last_adjustment = mono_msec_ticks ();
1240                 }
1241
1242                 mono_mutex_unlock (&threadpool->heuristic_lock);
1243         }
1244 }
1245
1246 void
1247 mono_threadpool_ms_cleanup (void)
1248 {
1249 #ifndef DISABLE_SOCKETS
1250         mono_threadpool_ms_io_cleanup ();
1251 #endif
1252         ensure_cleanedup ();
1253 }
1254
1255 MonoAsyncResult *
1256 mono_threadpool_ms_add (MonoObject *target, MonoMethodMessage *msg, MonoDelegate *async_callback, MonoObject *state)
1257 {
1258         static MonoClass *async_call_klass = NULL;
1259         MonoDomain *domain;
1260         MonoAsyncResult *ares;
1261         MonoAsyncCall *ac;
1262
1263         if (!async_call_klass)
1264                 async_call_klass = mono_class_from_name (mono_defaults.corlib, "System", "MonoAsyncCall");
1265         g_assert (async_call_klass);
1266
1267         ensure_initialized (NULL);
1268
1269         domain = mono_domain_get ();
1270
1271         ac = (MonoAsyncCall*) mono_object_new (domain, async_call_klass);
1272         MONO_OBJECT_SETREF (ac, msg, msg);
1273         MONO_OBJECT_SETREF (ac, state, state);
1274
1275         if (async_callback) {
1276                 MONO_OBJECT_SETREF (ac, cb_method, mono_get_delegate_invoke (((MonoObject*) async_callback)->vtable->klass));
1277                 MONO_OBJECT_SETREF (ac, cb_target, async_callback);
1278         }
1279
1280         ares = mono_async_result_new (domain, NULL, ac->state, NULL, (MonoObject*) ac);
1281         MONO_OBJECT_SETREF (ares, async_delegate, target);
1282
1283 #ifndef DISABLE_SOCKETS
1284         if (mono_threadpool_ms_is_io (target, state))
1285                 return mono_threadpool_ms_io_add (ares, (MonoSocketAsyncResult*) state);
1286 #endif
1287
1288         mono_threadpool_ms_enqueue_async_result (domain, ares);
1289         return ares;
1290 }
1291
1292 MonoObject *
1293 mono_threadpool_ms_finish (MonoAsyncResult *ares, MonoArray **out_args, MonoObject **exc)
1294 {
1295         MonoAsyncCall *ac;
1296
1297         g_assert (exc);
1298         g_assert (out_args);
1299
1300         *exc = NULL;
1301         *out_args = NULL;
1302
1303         /* check if already finished */
1304         mono_monitor_enter ((MonoObject*) ares);
1305
1306         if (ares->endinvoke_called) {
1307                 *exc = (MonoObject*) mono_get_exception_invalid_operation (NULL);
1308                 mono_monitor_exit ((MonoObject*) ares);
1309                 return NULL;
1310         }
1311
1312         MONO_OBJECT_SETREF (ares, endinvoke_called, 1);
1313
1314         /* wait until we are really finished */
1315         if (ares->completed) {
1316                 mono_monitor_exit ((MonoObject *) ares);
1317         } else {
1318                 gpointer wait_event;
1319                 if (ares->handle) {
1320                         wait_event = mono_wait_handle_get_handle ((MonoWaitHandle*) ares->handle);
1321                 } else {
1322                         wait_event = CreateEvent (NULL, TRUE, FALSE, NULL);
1323                         g_assert(wait_event);
1324                         MONO_OBJECT_SETREF (ares, handle, (MonoObject*) mono_wait_handle_new (mono_object_domain (ares), wait_event));
1325                 }
1326                 mono_monitor_exit ((MonoObject*) ares);
1327                 MONO_PREPARE_BLOCKING
1328                 WaitForSingleObjectEx (wait_event, INFINITE, TRUE);
1329                 MONO_FINISH_BLOCKING
1330         }
1331
1332         ac = (MonoAsyncCall*) ares->object_data;
1333         g_assert (ac);
1334
1335         *exc = ac->msg->exc; /* FIXME: GC add write barrier */
1336         *out_args = ac->out_args;
1337         return ac->res;
1338 }
1339
1340 gboolean
1341 mono_threadpool_ms_remove_domain_jobs (MonoDomain *domain, int timeout)
1342 {
1343         gboolean res = TRUE;
1344         guint32 start;
1345         gpointer sem;
1346
1347         g_assert (domain);
1348         g_assert (timeout >= -1);
1349
1350         if (timeout != -1)
1351                 start = mono_msec_ticks ();
1352
1353 #ifndef DISABLE_SOCKETS
1354         mono_threadpool_ms_io_remove_domain_jobs (domain);
1355         if (timeout != -1) {
1356                 timeout -= mono_msec_ticks () - start;
1357                 if (timeout < 0)
1358                         return FALSE;
1359         }
1360 #endif
1361         /*
1362          * There might be some threads out that could be about to execute stuff from the given domain.
1363          * We avoid that by setting up a semaphore to be pulsed by the thread that reaches zero.
1364          */
1365         sem = domain->cleanup_semaphore = CreateSemaphore (NULL, 0, 1, NULL);
1366
1367         /*
1368          * The memory barrier here is required to have global ordering between assigning to cleanup_semaphone
1369          * and reading threadpool_jobs. Otherwise this thread could read a stale version of threadpool_jobs
1370          * and wait forever.
1371          */
1372         mono_memory_write_barrier ();
1373
1374         while (domain->threadpool_jobs) {
1375                 MONO_PREPARE_BLOCKING
1376                 WaitForSingleObject (sem, timeout);
1377                 MONO_FINISH_BLOCKING
1378                 if (timeout != -1) {
1379                         timeout -= mono_msec_ticks () - start;
1380                         if (timeout <= 0) {
1381                                 res = FALSE;
1382                                 break;
1383                         }
1384                 }
1385         }
1386
1387         domain->cleanup_semaphore = NULL;
1388         CloseHandle (sem);
1389
1390         return res;
1391 }
1392
1393 void
1394 mono_threadpool_ms_suspend (void)
1395 {
1396         threadpool->suspended = TRUE;
1397 }
1398
1399 void
1400 mono_threadpool_ms_resume (void)
1401 {
1402         threadpool->suspended = FALSE;
1403 }
1404
1405 void
1406 ves_icall_System_Threading_MonoRuntimeWorkItem_ExecuteWorkItem (MonoRuntimeWorkItem *rwi)
1407 {
1408         MonoAsyncResult *ares;
1409         MonoObject *exc = NULL;
1410
1411         g_assert (rwi);
1412         ares = rwi->async_result;
1413         g_assert (ares);
1414
1415         mono_async_result_invoke (ares, &exc);
1416         if (exc)
1417                 mono_raise_exception ((MonoException*) exc);
1418 }
1419
1420 void
1421 ves_icall_System_Threading_Microsoft_ThreadPool_GetAvailableThreadsNative (gint *worker_threads, gint *completion_port_threads)
1422 {
1423         if (!worker_threads || !completion_port_threads)
1424                 return;
1425
1426         ensure_initialized (NULL);
1427
1428         *worker_threads = threadpool->limit_worker_max;
1429         *completion_port_threads = threadpool->limit_io_max;
1430 }
1431
1432 void
1433 ves_icall_System_Threading_Microsoft_ThreadPool_GetMinThreadsNative (gint *worker_threads, gint *completion_port_threads)
1434 {
1435         if (!worker_threads || !completion_port_threads)
1436                 return;
1437
1438         ensure_initialized (NULL);
1439
1440         *worker_threads = threadpool->limit_worker_min;
1441         *completion_port_threads = threadpool->limit_io_min;
1442 }
1443
1444 void
1445 ves_icall_System_Threading_Microsoft_ThreadPool_GetMaxThreadsNative (gint *worker_threads, gint *completion_port_threads)
1446 {
1447         if (!worker_threads || !completion_port_threads)
1448                 return;
1449
1450         ensure_initialized (NULL);
1451
1452         *worker_threads = threadpool->limit_worker_max;
1453         *completion_port_threads = threadpool->limit_io_max;
1454 }
1455
1456 MonoBoolean
1457 ves_icall_System_Threading_Microsoft_ThreadPool_SetMinThreadsNative (gint worker_threads, gint completion_port_threads)
1458 {
1459         ensure_initialized (NULL);
1460
1461         if (worker_threads <= 0 || worker_threads > threadpool->limit_worker_max)
1462                 return FALSE;
1463         if (completion_port_threads <= 0 || completion_port_threads > threadpool->limit_io_max)
1464                 return FALSE;
1465
1466         threadpool->limit_worker_max = worker_threads;
1467         threadpool->limit_io_max = completion_port_threads;
1468
1469         return TRUE;
1470 }
1471
1472 MonoBoolean
1473 ves_icall_System_Threading_Microsoft_ThreadPool_SetMaxThreadsNative (gint worker_threads, gint completion_port_threads)
1474 {
1475         gint cpu_count = mono_cpu_count ();
1476
1477         ensure_initialized (NULL);
1478
1479         if (worker_threads < threadpool->limit_worker_min || worker_threads < cpu_count)
1480                 return FALSE;
1481         if (completion_port_threads < threadpool->limit_io_min || completion_port_threads < cpu_count)
1482                 return FALSE;
1483
1484         threadpool->limit_worker_max = worker_threads;
1485         threadpool->limit_io_max = completion_port_threads;
1486
1487         return TRUE;
1488 }
1489
1490 void
1491 ves_icall_System_Threading_Microsoft_ThreadPool_InitializeVMTp (MonoBoolean *enable_worker_tracking)
1492 {
1493         ensure_initialized (enable_worker_tracking);
1494 }
1495
1496 MonoBoolean
1497 ves_icall_System_Threading_Microsoft_ThreadPool_NotifyWorkItemComplete (void)
1498 {
1499         ThreadPoolCounter counter;
1500
1501         if (mono_domain_is_unloading (mono_domain_get ()) || mono_runtime_is_shutting_down ())
1502                 return FALSE;
1503
1504         heuristic_notify_work_completed ();
1505
1506         if (heuristic_should_adjust ())
1507                 heuristic_adjust ();
1508
1509         counter = COUNTER_READ ();
1510         return counter._.active <= counter._.max_working;
1511 }
1512
1513 void
1514 ves_icall_System_Threading_Microsoft_ThreadPool_NotifyWorkItemProgressNative (void)
1515 {
1516         heuristic_notify_work_completed ();
1517
1518         if (heuristic_should_adjust ())
1519                 heuristic_adjust ();
1520 }
1521
1522 void
1523 ves_icall_System_Threading_Microsoft_ThreadPool_ReportThreadStatus (MonoBoolean is_working)
1524 {
1525         // TODO
1526         mono_raise_exception (mono_get_exception_not_implemented (NULL));
1527 }
1528
1529 MonoBoolean
1530 ves_icall_System_Threading_Microsoft_ThreadPool_RequestWorkerThread (void)
1531 {
1532         return worker_request (mono_domain_get ());
1533 }
1534
1535 MonoBoolean G_GNUC_UNUSED
1536 ves_icall_System_Threading_Microsoft_ThreadPool_PostQueuedCompletionStatus (MonoNativeOverlapped *native_overlapped)
1537 {
1538         /* This copy the behavior of the current Mono implementation */
1539         mono_raise_exception (mono_get_exception_not_implemented (NULL));
1540         return FALSE;
1541 }
1542
1543 MonoBoolean G_GNUC_UNUSED
1544 ves_icall_System_Threading_Microsoft_ThreadPool_BindIOCompletionCallbackNative (gpointer file_handle)
1545 {
1546         /* This copy the behavior of the current Mono implementation */
1547         return TRUE;
1548 }
1549
1550 MonoBoolean G_GNUC_UNUSED
1551 ves_icall_System_Threading_Microsoft_ThreadPool_IsThreadPoolHosted (void)
1552 {
1553         return FALSE;
1554 }