[sgen] Fix heavy statistics.
[mono.git] / mono / metadata / threadpool-ms.c
1 /*
2  * threadpool-ms.c: Microsoft threadpool runtime support
3  *
4  * Author:
5  *      Ludovic Henry (ludovic.henry@xamarin.com)
6  *
7  * Copyright 2015 Xamarin, Inc (http://www.xamarin.com)
8  */
9
10 //
11 // Copyright (c) Microsoft. All rights reserved.
12 // Licensed under the MIT license. See LICENSE file in the project root for full license information.
13 //
14 // Files:
15 //  - src/vm/comthreadpool.cpp
16 //  - src/vm/win32threadpoolcpp
17 //  - src/vm/threadpoolrequest.cpp
18 //  - src/vm/hillclimbing.cpp
19 //
20 // Ported from C++ to C and adjusted to Mono runtime
21
22 #include <stdlib.h>
23 #include <math.h>
24 #include <config.h>
25 #include <glib.h>
26
27 #if !defined (HAVE_COMPLEX_H)
28 #include <../../support/libm/complex.h>
29 #else
30 #include <complex.h>
31 #endif
32
33 #include <mono/metadata/class-internals.h>
34 #include <mono/metadata/exception.h>
35 #include <mono/metadata/gc-internal.h>
36 #include <mono/metadata/object.h>
37 #include <mono/metadata/object-internals.h>
38 #include <mono/metadata/threadpool-ms.h>
39 #include <mono/metadata/threadpool-ms-io.h>
40 #include <mono/metadata/threadpool-internals.h>
41 #include <mono/utils/atomic.h>
42 #include <mono/utils/mono-compiler.h>
43 #include <mono/utils/mono-proclib.h>
44 #include <mono/utils/mono-threads.h>
45 #include <mono/utils/mono-time.h>
46 #include <mono/utils/mono-rand.h>
47
48 #define CPU_USAGE_LOW 80
49 #define CPU_USAGE_HIGH 95
50
51 #define MONITOR_INTERVAL 100 // ms
52
53 /* The exponent to apply to the gain. 1.0 means to use linear gain,
54  * higher values will enhance large moves and damp small ones.
55  * default: 2.0 */
56 #define HILL_CLIMBING_GAIN_EXPONENT 2.0
57
58 /* The 'cost' of a thread. 0 means drive for increased throughput regardless
59  * of thread count, higher values bias more against higher thread counts.
60  * default: 0.15 */
61 #define HILL_CLIMBING_BIAS 0.15
62
63 #define HILL_CLIMBING_WAVE_PERIOD 4
64 #define HILL_CLIMBING_MAX_WAVE_MAGNITUDE 20
65 #define HILL_CLIMBING_WAVE_MAGNITUDE_MULTIPLIER 1.0
66 #define HILL_CLIMBING_WAVE_HISTORY_SIZE 8
67 #define HILL_CLIMBING_TARGET_SIGNAL_TO_NOISE_RATIO 3.0
68 #define HILL_CLIMBING_MAX_CHANGE_PER_SECOND 4
69 #define HILL_CLIMBING_MAX_CHANGE_PER_SAMPLE 20
70 #define HILL_CLIMBING_SAMPLE_INTERVAL_LOW 10
71 #define HILL_CLIMBING_SAMPLE_INTERVAL_HIGH 200
72 #define HILL_CLIMBING_ERROR_SMOOTHING_FACTOR 0.01
73 #define HILL_CLIMBING_MAX_SAMPLE_ERROR_PERCENT 0.15
74
75 typedef union {
76         struct {
77                 gint16 max_working; /* determined by heuristic */
78                 gint16 active; /* executing worker_thread */
79                 gint16 working; /* actively executing worker_thread, not parked */
80                 gint16 parked; /* parked */
81         } _;
82         gint64 as_gint64;
83 } ThreadPoolCounter;
84
85 typedef struct {
86         MonoDomain *domain;
87         gint32 outstanding_request;
88 } ThreadPoolDomain;
89
90 typedef MonoInternalThread ThreadPoolWorkingThread;
91 typedef mono_cond_t ThreadPoolParkedThread;
92
93 typedef struct {
94         gint32 wave_period;
95         gint32 samples_to_measure;
96         gdouble target_throughput_ratio;
97         gdouble target_signal_to_noise_ratio;
98         gdouble max_change_per_second;
99         gdouble max_change_per_sample;
100         gint32 max_thread_wave_magnitude;
101         gint32 sample_interval_low;
102         gdouble thread_magnitude_multiplier;
103         gint32 sample_interval_high;
104         gdouble throughput_error_smoothing_factor;
105         gdouble gain_exponent;
106         gdouble max_sample_error;
107
108         gdouble current_control_setting;
109         gint64 total_samples;
110         gint16 last_thread_count;
111         gdouble elapsed_since_last_change;
112         gdouble completions_since_last_change;
113
114         gdouble average_throughput_noise;
115
116         gdouble *samples;
117         gdouble *thread_counts;
118
119         guint32 current_sample_interval;
120         gpointer random_interval_generator;
121
122         gint32 accumulated_completion_count;
123         gdouble accumulated_sample_duration;
124 } ThreadPoolHillClimbing;
125
126 typedef struct {
127         ThreadPoolCounter counters;
128
129         GPtrArray *domains; // ThreadPoolDomain* []
130         mono_mutex_t domains_lock;
131
132         GPtrArray *working_threads; // ThreadPoolWorkingThread* []
133         GPtrArray *parked_threads; // ThreadPoolParkedThread* []
134         mono_mutex_t active_threads_lock; /* protect access to working_threads and parked_threads */
135
136         gint32 heuristic_completions;
137         guint32 heuristic_sample_start;
138         guint32 heuristic_last_dequeue; // ms
139         guint32 heuristic_last_adjustment; // ms
140         guint32 heuristic_adjustment_interval; // ms
141         ThreadPoolHillClimbing heuristic_hill_climbing;
142         mono_mutex_t heuristic_lock;
143
144         gint32 limit_worker_min;
145         gint32 limit_worker_max;
146         gint32 limit_io_min;
147         gint32 limit_io_max;
148
149         MonoCpuUsageState *cpu_usage_state;
150         gint32 cpu_usage;
151
152         /* suspended by the debugger */
153         gboolean suspended;
154 } ThreadPool;
155
156 typedef enum {
157         TRANSITION_WARMUP,
158         TRANSITION_INITIALIZING,
159         TRANSITION_RANDOM_MOVE,
160         TRANSITION_CLIMBING_MOVE,
161         TRANSITION_CHANGE_POINT,
162         TRANSITION_STABILIZING,
163         TRANSITION_STARVATION,
164         TRANSITION_THREAD_TIMED_OUT,
165         TRANSITION_UNDEFINED,
166 } ThreadPoolHeuristicStateTransition;
167
168 enum {
169         MONITOR_STATUS_REQUESTED,
170         MONITOR_STATUS_WAITING_FOR_REQUEST,
171         MONITOR_STATUS_NOT_RUNNING,
172 };
173
174 static gint32 status = STATUS_NOT_INITIALIZED;
175 static gint32 monitor_status = MONITOR_STATUS_NOT_RUNNING;
176
177 static ThreadPool* threadpool;
178
179 #define COUNTER_CHECK(counter) \
180         do { \
181                 g_assert (counter._.max_working > 0); \
182                 g_assert (counter._.working >= 0); \
183                 g_assert (counter._.active >= 0); \
184         } while (0)
185
186 #define COUNTER_READ() ((ThreadPoolCounter) InterlockedRead64 (&threadpool->counters.as_gint64))
187
188 #define COUNTER_ATOMIC(var,block) \
189         do { \
190                 ThreadPoolCounter __old; \
191                 do { \
192                         g_assert (threadpool); \
193                         (var) = __old = COUNTER_READ (); \
194                         { block; } \
195                         COUNTER_CHECK (var); \
196                 } while (InterlockedCompareExchange64 (&threadpool->counters.as_gint64, (var).as_gint64, __old.as_gint64) != __old.as_gint64); \
197         } while (0)
198
199 #define COUNTER_TRY_ATOMIC(res,var,block) \
200         do { \
201                 ThreadPoolCounter __old; \
202                 do { \
203                         g_assert (threadpool); \
204                         (var) = __old = COUNTER_READ (); \
205                         (res) = FALSE; \
206                         { block; } \
207                         COUNTER_CHECK (var); \
208                         (res) = InterlockedCompareExchange64 (&threadpool->counters.as_gint64, (var).as_gint64, __old.as_gint64) == __old.as_gint64; \
209                 } while (0); \
210         } while (0)
211
212 static gpointer
213 rand_create (void)
214 {
215         mono_rand_open ();
216         return mono_rand_init (NULL, 0);
217 }
218
219 static guint32
220 rand_next (gpointer *handle, guint32 min, guint32 max)
221 {
222         guint32 val;
223         if (!mono_rand_try_get_uint32 (handle, &val, min, max)) {
224                 // FIXME handle error
225                 g_assert_not_reached ();
226         }
227         return val;
228 }
229
230 static void
231 rand_free (gpointer handle)
232 {
233         mono_rand_close (handle);
234 }
235
236 static void
237 ensure_initialized (MonoBoolean *enable_worker_tracking)
238 {
239         ThreadPoolHillClimbing *hc;
240         const char *threads_per_cpu_env;
241         gint threads_per_cpu;
242         gint threads_count;
243
244         if (enable_worker_tracking) {
245                 // TODO implement some kind of switch to have the possibily to use it
246                 *enable_worker_tracking = FALSE;
247         }
248
249         if (status >= STATUS_INITIALIZED)
250                 return;
251         if (status == STATUS_INITIALIZING || InterlockedCompareExchange (&status, STATUS_INITIALIZING, STATUS_NOT_INITIALIZED) != STATUS_NOT_INITIALIZED) {
252                 while (status == STATUS_INITIALIZING)
253                         mono_thread_info_yield ();
254                 g_assert (status >= STATUS_INITIALIZED);
255                 return;
256         }
257
258         g_assert (!threadpool);
259         threadpool = g_new0 (ThreadPool, 1);
260         g_assert (threadpool);
261
262         threadpool->domains = g_ptr_array_new ();
263         mono_mutex_init_recursive (&threadpool->domains_lock);
264
265         threadpool->parked_threads = g_ptr_array_new ();
266         threadpool->working_threads = g_ptr_array_new ();
267         mono_mutex_init (&threadpool->active_threads_lock);
268
269         threadpool->heuristic_adjustment_interval = 10;
270         mono_mutex_init (&threadpool->heuristic_lock);
271
272         mono_rand_open ();
273
274         hc = &threadpool->heuristic_hill_climbing;
275
276         hc->wave_period = HILL_CLIMBING_WAVE_PERIOD;
277         hc->max_thread_wave_magnitude = HILL_CLIMBING_MAX_WAVE_MAGNITUDE;
278         hc->thread_magnitude_multiplier = (gdouble) HILL_CLIMBING_WAVE_MAGNITUDE_MULTIPLIER;
279         hc->samples_to_measure = hc->wave_period * HILL_CLIMBING_WAVE_HISTORY_SIZE;
280         hc->target_throughput_ratio = (gdouble) HILL_CLIMBING_BIAS;
281         hc->target_signal_to_noise_ratio = (gdouble) HILL_CLIMBING_TARGET_SIGNAL_TO_NOISE_RATIO;
282         hc->max_change_per_second = (gdouble) HILL_CLIMBING_MAX_CHANGE_PER_SECOND;
283         hc->max_change_per_sample = (gdouble) HILL_CLIMBING_MAX_CHANGE_PER_SAMPLE;
284         hc->sample_interval_low = HILL_CLIMBING_SAMPLE_INTERVAL_LOW;
285         hc->sample_interval_high = HILL_CLIMBING_SAMPLE_INTERVAL_HIGH;
286         hc->throughput_error_smoothing_factor = (gdouble) HILL_CLIMBING_ERROR_SMOOTHING_FACTOR;
287         hc->gain_exponent = (gdouble) HILL_CLIMBING_GAIN_EXPONENT;
288         hc->max_sample_error = (gdouble) HILL_CLIMBING_MAX_SAMPLE_ERROR_PERCENT;
289         hc->current_control_setting = 0;
290         hc->total_samples = 0;
291         hc->last_thread_count = 0;
292         hc->average_throughput_noise = 0;
293         hc->elapsed_since_last_change = 0;
294         hc->accumulated_completion_count = 0;
295         hc->accumulated_sample_duration = 0;
296         hc->samples = g_new0 (gdouble, hc->samples_to_measure);
297         hc->thread_counts = g_new0 (gdouble, hc->samples_to_measure);
298         hc->random_interval_generator = rand_create ();
299         hc->current_sample_interval = rand_next (&hc->random_interval_generator, hc->sample_interval_low, hc->sample_interval_high);
300
301         if (!(threads_per_cpu_env = g_getenv ("MONO_THREADS_PER_CPU")))
302                 threads_per_cpu = 1;
303         else
304                 threads_per_cpu = CLAMP (atoi (threads_per_cpu_env), 1, 50);
305
306         threads_count = mono_cpu_count () * threads_per_cpu;
307
308         threadpool->limit_worker_min = threadpool->limit_io_min = threads_count;
309         threadpool->limit_worker_max = threadpool->limit_io_max = threads_count * 100;
310
311         threadpool->counters._.max_working = threadpool->limit_worker_min;
312
313         threadpool->cpu_usage_state = g_new0 (MonoCpuUsageState, 1);
314
315         threadpool->suspended = FALSE;
316
317         status = STATUS_INITIALIZED;
318 }
319
320 static void worker_unpark (ThreadPoolParkedThread *thread);
321 static void worker_kill (ThreadPoolWorkingThread *thread);
322
323 static void
324 ensure_cleanedup (void)
325 {
326         guint i;
327
328         if (status == STATUS_NOT_INITIALIZED && InterlockedCompareExchange (&status, STATUS_CLEANED_UP, STATUS_NOT_INITIALIZED) == STATUS_NOT_INITIALIZED)
329                 return;
330         if (status == STATUS_INITIALIZING) {
331                 while (status == STATUS_INITIALIZING)
332                         mono_thread_info_yield ();
333         }
334         if (status == STATUS_CLEANED_UP)
335                 return;
336         if (status == STATUS_CLEANING_UP || InterlockedCompareExchange (&status, STATUS_CLEANING_UP, STATUS_INITIALIZED) != STATUS_INITIALIZED) {
337                 while (status == STATUS_CLEANING_UP)
338                         mono_thread_info_yield ();
339                 g_assert (status == STATUS_CLEANED_UP);
340                 return;
341         }
342
343         /* we make the assumption along the code that we are
344          * cleaning up only if the runtime is shutting down */
345         g_assert (mono_runtime_is_shutting_down ());
346
347         while (monitor_status != MONITOR_STATUS_NOT_RUNNING)
348                 usleep (1000);
349
350         mono_mutex_lock (&threadpool->active_threads_lock);
351
352         /* stop all threadpool->working_threads */
353         for (i = 0; i < threadpool->working_threads->len; ++i)
354                 worker_kill ((ThreadPoolWorkingThread*) g_ptr_array_index (threadpool->working_threads, i));
355
356         /* unpark all threadpool->parked_threads */
357         for (i = 0; i < threadpool->parked_threads->len; ++i)
358                 worker_unpark ((ThreadPoolParkedThread*) g_ptr_array_index (threadpool->parked_threads, i));
359
360         mono_mutex_unlock (&threadpool->active_threads_lock);
361
362         status = STATUS_CLEANED_UP;
363 }
364
365 void
366 mono_threadpool_ms_enqueue_work_item (MonoDomain *domain, MonoObject *work_item)
367 {
368         static MonoClass *threadpool_class = NULL;
369         static MonoMethod *unsafe_queue_custom_work_item_method = NULL;
370         MonoDomain *current_domain;
371         MonoBoolean f;
372         gpointer args [2];
373
374         g_assert (work_item);
375
376         if (!threadpool_class)
377                 threadpool_class = mono_class_from_name (mono_defaults.corlib, "System.Threading", "ThreadPool");
378         g_assert (threadpool_class);
379
380         if (!unsafe_queue_custom_work_item_method)
381                 unsafe_queue_custom_work_item_method = mono_class_get_method_from_name (threadpool_class, "UnsafeQueueCustomWorkItem", 2);
382         g_assert (unsafe_queue_custom_work_item_method);
383
384         f = FALSE;
385
386         args [0] = (gpointer) work_item;
387         args [1] = (gpointer) &f;
388
389         current_domain = mono_domain_get ();
390         if (current_domain == domain) {
391                 mono_runtime_invoke (unsafe_queue_custom_work_item_method, NULL, args, NULL);
392         } else {
393                 mono_thread_push_appdomain_ref (domain);
394                 if (mono_domain_set (domain, FALSE)) {
395                         mono_runtime_invoke (unsafe_queue_custom_work_item_method, NULL, args, NULL);
396                         mono_domain_set (current_domain, TRUE);
397                 }
398                 mono_thread_pop_appdomain_ref ();
399         }
400 }
401
402 static void
403 domain_add (ThreadPoolDomain *tpdomain)
404 {
405         guint i, len;
406
407         g_assert (tpdomain);
408
409         mono_mutex_lock (&threadpool->domains_lock);
410         len = threadpool->domains->len;
411         for (i = 0; i < len; ++i) {
412                 if (g_ptr_array_index (threadpool->domains, i) == tpdomain)
413                         break;
414         }
415         if (i == len)
416                 g_ptr_array_add (threadpool->domains, tpdomain);
417         mono_mutex_unlock (&threadpool->domains_lock);
418 }
419
420 static gboolean
421 domain_remove (ThreadPoolDomain *tpdomain)
422 {
423         gboolean res;
424
425         g_assert (tpdomain);
426
427         mono_mutex_lock (&threadpool->domains_lock);
428         res = g_ptr_array_remove (threadpool->domains, tpdomain);
429         mono_mutex_unlock (&threadpool->domains_lock);
430
431         return res;
432 }
433
434 static ThreadPoolDomain *
435 domain_get (MonoDomain *domain, gboolean create)
436 {
437         ThreadPoolDomain *tpdomain = NULL;
438         guint i;
439
440         g_assert (domain);
441
442         mono_mutex_lock (&threadpool->domains_lock);
443         for (i = 0; i < threadpool->domains->len; ++i) {
444                 ThreadPoolDomain *tmp = g_ptr_array_index (threadpool->domains, i);
445                 if (tmp->domain == domain) {
446                         tpdomain = tmp;
447                         break;
448                 }
449         }
450         if (!tpdomain && create) {
451                 tpdomain = g_new0 (ThreadPoolDomain, 1);
452                 tpdomain->domain = domain;
453                 domain_add (tpdomain);
454         }
455         mono_mutex_unlock (&threadpool->domains_lock);
456         return tpdomain;
457 }
458
459 static void
460 domain_free (ThreadPoolDomain *tpdomain)
461 {
462         g_free (tpdomain);
463 }
464
465 static gboolean
466 domain_any_has_request (void)
467 {
468         gboolean res = FALSE;
469         guint i;
470
471         mono_mutex_lock (&threadpool->domains_lock);
472         for (i = 0; i < threadpool->domains->len; ++i) {
473                 ThreadPoolDomain *tmp = g_ptr_array_index (threadpool->domains, i);
474                 if (tmp->outstanding_request > 0) {
475                         res = TRUE;
476                         break;
477                 }
478         }
479         mono_mutex_unlock (&threadpool->domains_lock);
480         return res;
481 }
482
483 static ThreadPoolDomain *
484 domain_get_next (ThreadPoolDomain *current)
485 {
486         ThreadPoolDomain *tpdomain = NULL;
487         guint len;
488
489         mono_mutex_lock (&threadpool->domains_lock);
490         len = threadpool->domains->len;
491         if (len > 0) {
492                 guint i, current_idx = -1;
493                 if (current) {
494                         for (i = 0; i < len; ++i) {
495                                 if (current == g_ptr_array_index (threadpool->domains, i)) {
496                                         current_idx = i;
497                                         break;
498                                 }
499                         }
500                         g_assert (current_idx >= 0);
501                 }
502                 for (i = current_idx + 1; i < len + current_idx + 1; ++i) {
503                         ThreadPoolDomain *tmp = g_ptr_array_index (threadpool->domains, i % len);
504                         if (tmp->outstanding_request > 0) {
505                                 tpdomain = tmp;
506                                 break;
507                         }
508                 }
509         }
510         mono_mutex_unlock (&threadpool->domains_lock);
511         return tpdomain;
512 }
513
514 static void
515 worker_park (void)
516 {
517         mono_cond_t cond;
518         MonoInternalThread *thread = mono_thread_internal_current ();
519
520         mono_cond_init (&cond, NULL);
521
522         mono_gc_set_skip_thread (TRUE);
523
524         mono_mutex_lock (&threadpool->active_threads_lock);
525
526         if (!mono_runtime_is_shutting_down ()) {
527                 g_ptr_array_add (threadpool->parked_threads, &cond);
528                 g_ptr_array_remove_fast (threadpool->working_threads, thread);
529
530                 mono_cond_wait (&cond, &threadpool->active_threads_lock);
531
532                 g_ptr_array_add (threadpool->working_threads, thread);
533                 g_ptr_array_remove (threadpool->parked_threads, &cond);
534         }
535
536         mono_mutex_unlock (&threadpool->active_threads_lock);
537
538         mono_gc_set_skip_thread (FALSE);
539
540         mono_cond_destroy (&cond);
541 }
542
543 static gboolean
544 worker_try_unpark (void)
545 {
546         gboolean res = FALSE;
547         guint len;
548
549         mono_mutex_lock (&threadpool->active_threads_lock);
550         len = threadpool->parked_threads->len;
551         if (len > 0) {
552                 mono_cond_t *cond = (mono_cond_t*) g_ptr_array_index (threadpool->parked_threads, len - 1);
553                 mono_cond_signal (cond);
554                 res = TRUE;
555         }
556         mono_mutex_unlock (&threadpool->active_threads_lock);
557         return res;
558 }
559
560 static void
561 worker_unpark (ThreadPoolParkedThread *thread)
562 {
563         mono_cond_signal ((mono_cond_t*) thread);
564 }
565
566 static void
567 worker_kill (ThreadPoolWorkingThread *thread)
568 {
569         ThreadPoolCounter counter;
570
571         if (thread == mono_thread_internal_current ())
572                 return;
573
574         mono_thread_internal_stop ((MonoInternalThread*) thread);
575 }
576
577 static void
578 worker_thread (gpointer data)
579 {
580         static MonoClass *threadpool_wait_callback_class = NULL;
581         static MonoMethod *perform_wait_callback_method = NULL;
582         MonoInternalThread *thread;
583         ThreadPoolDomain *tpdomain, *previous_tpdomain;
584         ThreadPoolCounter counter;
585         gboolean retire = FALSE;
586
587         g_assert (status >= STATUS_INITIALIZED);
588
589         if (!threadpool_wait_callback_class)
590                 threadpool_wait_callback_class = mono_class_from_name (mono_defaults.corlib, "System.Threading.Microsoft", "_ThreadPoolWaitCallback");
591         g_assert (threadpool_wait_callback_class);
592
593         if (!perform_wait_callback_method)
594                 perform_wait_callback_method = mono_class_get_method_from_name (threadpool_wait_callback_class, "PerformWaitCallback", 0);
595         g_assert (perform_wait_callback_method);
596
597         g_assert (threadpool);
598
599         thread = mono_thread_internal_current ();
600         g_assert (thread);
601
602         mono_thread_set_name_internal (thread, mono_string_new (mono_domain_get (), "Threadpool worker"), FALSE);
603
604         mono_mutex_lock (&threadpool->active_threads_lock);
605         g_ptr_array_add (threadpool->working_threads, thread);
606         mono_mutex_unlock (&threadpool->active_threads_lock);
607
608         previous_tpdomain = NULL;
609
610         mono_mutex_lock (&threadpool->domains_lock);
611
612         while (!mono_runtime_is_shutting_down ()) {
613                 tpdomain = NULL;
614
615                 if ((thread->state & (ThreadState_StopRequested | ThreadState_SuspendRequested)) != 0) {
616                         mono_mutex_unlock (&threadpool->domains_lock);
617                         mono_thread_interruption_checkpoint ();
618                         mono_mutex_lock (&threadpool->domains_lock);
619                 }
620
621                 if (retire || !(tpdomain = domain_get_next (previous_tpdomain))) {
622                         COUNTER_ATOMIC (counter, {
623                                 counter._.working --;
624                                 counter._.parked ++;
625                         });
626
627                         mono_mutex_unlock (&threadpool->domains_lock);
628                         worker_park ();
629                         mono_mutex_lock (&threadpool->domains_lock);
630
631                         COUNTER_ATOMIC (counter, {
632                                 counter._.working ++;
633                                 counter._.parked --;
634                         });
635
636                         if (retire)
637                                 retire = FALSE;
638
639                         continue;
640                 }
641
642                 tpdomain->outstanding_request --;
643                 g_assert (tpdomain->outstanding_request >= 0);
644
645                 g_assert (tpdomain->domain);
646                 g_assert (tpdomain->domain->threadpool_jobs >= 0);
647                 tpdomain->domain->threadpool_jobs ++;
648
649                 mono_mutex_unlock (&threadpool->domains_lock);
650
651                 mono_thread_push_appdomain_ref (tpdomain->domain);
652                 if (mono_domain_set (tpdomain->domain, FALSE)) {
653                         MonoObject *exc = NULL;
654                         MonoObject *res = mono_runtime_invoke (perform_wait_callback_method, NULL, NULL, &exc);
655                         if (exc)
656                                 mono_internal_thread_unhandled_exception (exc);
657                         else if (res && *(MonoBoolean*) mono_object_unbox (res) == FALSE)
658                                 retire = TRUE;
659
660                         mono_thread_clr_state (thread , ~ThreadState_Background);
661                         if (!mono_thread_test_state (thread , ThreadState_Background))
662                                 ves_icall_System_Threading_Thread_SetState (thread, ThreadState_Background);
663
664                         mono_domain_set (mono_get_root_domain (), TRUE);
665                 }
666                 mono_thread_pop_appdomain_ref ();
667
668                 mono_mutex_lock (&threadpool->domains_lock);
669
670                 tpdomain->domain->threadpool_jobs --;
671                 g_assert (tpdomain->domain->threadpool_jobs >= 0);
672
673                 if (tpdomain->domain->threadpool_jobs == 0 && mono_domain_is_unloading (tpdomain->domain)) {
674                         gboolean removed = domain_remove (tpdomain);
675                         g_assert (removed);
676                         if (tpdomain->domain->cleanup_semaphore)
677                                 ReleaseSemaphore (tpdomain->domain->cleanup_semaphore, 1, NULL);
678                         domain_free (tpdomain);
679                         tpdomain = NULL;
680                 }
681
682                 previous_tpdomain = tpdomain;
683         }
684
685         mono_mutex_unlock (&threadpool->domains_lock);
686
687         mono_mutex_lock (&threadpool->active_threads_lock);
688         g_ptr_array_remove_fast (threadpool->working_threads, thread);
689         mono_mutex_unlock (&threadpool->active_threads_lock);
690
691         COUNTER_ATOMIC (counter, {
692                 counter._.working--;
693                 counter._.active --;
694         });
695 }
696
697 static gboolean
698 worker_try_create (void)
699 {
700         ThreadPoolCounter counter;
701
702         COUNTER_ATOMIC (counter, {
703                 if (counter._.working >= counter._.max_working)
704                         return FALSE;
705                 counter._.working ++;
706                 counter._.active ++;
707         });
708
709         if (mono_thread_create_internal (mono_get_root_domain (), worker_thread, NULL, TRUE, 0) != NULL)
710                 return TRUE;
711
712         COUNTER_ATOMIC (counter, {
713                 counter._.working --;
714                 counter._.active --;
715         });
716
717         return FALSE;
718 }
719
720 static void monitor_ensure_running (void);
721
722 static gboolean
723 worker_request (MonoDomain *domain)
724 {
725         ThreadPoolDomain *tpdomain;
726
727         g_assert (domain);
728         g_assert (threadpool);
729
730         if (mono_runtime_is_shutting_down ())
731                 return FALSE;
732
733         mono_mutex_lock (&threadpool->domains_lock);
734
735         /* synchronize check with worker_thread */
736         if (mono_domain_is_unloading (domain)) {
737                 mono_mutex_unlock (&threadpool->domains_lock);
738                 return FALSE;
739         }
740
741         tpdomain = domain_get (domain, TRUE);
742         g_assert (tpdomain);
743         tpdomain->outstanding_request ++;
744
745         mono_mutex_unlock (&threadpool->domains_lock);
746
747         if (threadpool->suspended)
748                 return FALSE;
749
750         monitor_ensure_running ();
751
752         if (worker_try_unpark ())
753                 return TRUE;
754
755         if (worker_try_create ())
756                 return TRUE;
757
758         return FALSE;
759 }
760
761 static gboolean
762 monitor_should_keep_running (void)
763 {
764         g_assert (monitor_status == MONITOR_STATUS_WAITING_FOR_REQUEST || monitor_status == MONITOR_STATUS_REQUESTED);
765
766         if (InterlockedExchange (&monitor_status, MONITOR_STATUS_WAITING_FOR_REQUEST) == MONITOR_STATUS_WAITING_FOR_REQUEST) {
767                 if (mono_runtime_is_shutting_down () || !domain_any_has_request ()) {
768                         if (InterlockedCompareExchange (&monitor_status, MONITOR_STATUS_NOT_RUNNING, MONITOR_STATUS_WAITING_FOR_REQUEST) == MONITOR_STATUS_WAITING_FOR_REQUEST)
769                                 return FALSE;
770                 }
771         }
772
773         g_assert (monitor_status == MONITOR_STATUS_WAITING_FOR_REQUEST || monitor_status == MONITOR_STATUS_REQUESTED);
774
775         return TRUE;
776 }
777
778 static gboolean
779 monitor_sufficient_delay_since_last_dequeue (void)
780 {
781         guint32 threshold;
782
783         g_assert (threadpool);
784
785         if (threadpool->cpu_usage < CPU_USAGE_LOW) {
786                 threshold = MONITOR_INTERVAL;
787         } else {
788                 ThreadPoolCounter counter = COUNTER_READ ();
789                 threshold = counter._.max_working * MONITOR_INTERVAL * 2;
790         }
791
792         return mono_msec_ticks () >= threadpool->heuristic_last_dequeue + threshold;
793 }
794
795 static void hill_climbing_force_change (gint16 new_thread_count, ThreadPoolHeuristicStateTransition transition);
796
797 static void
798 monitor_thread (void)
799 {
800         MonoInternalThread *current_thread = mono_thread_internal_current ();
801         guint i;
802
803         mono_cpu_usage (threadpool->cpu_usage_state);
804
805         do {
806                 MonoInternalThread *thread;
807                 gboolean all_waitsleepjoin = TRUE;
808                 gint32 interval_left = MONITOR_INTERVAL;
809                 gint32 awake = 0; /* number of spurious awakes we tolerate before doing a round of rebalancing */
810
811                 g_assert (monitor_status != MONITOR_STATUS_NOT_RUNNING);
812
813                 mono_gc_set_skip_thread (TRUE);
814
815                 do {
816                         guint32 ts;
817
818                         if (mono_runtime_is_shutting_down ())
819                                 break;
820
821                         ts = mono_msec_ticks ();
822                         if (SleepEx (interval_left, TRUE) == 0)
823                                 break;
824                         interval_left -= mono_msec_ticks () - ts;
825
826                         mono_gc_set_skip_thread (FALSE);
827                         if ((current_thread->state & (ThreadState_StopRequested | ThreadState_SuspendRequested)) != 0)
828                                 mono_thread_interruption_checkpoint ();
829                         mono_gc_set_skip_thread (TRUE);
830                 } while (interval_left > 0 && ++awake < 10);
831
832                 mono_gc_set_skip_thread (FALSE);
833
834                 if (threadpool->suspended)
835                         continue;
836
837                 if (mono_runtime_is_shutting_down () || !domain_any_has_request ())
838                         continue;
839
840                 mono_mutex_lock (&threadpool->active_threads_lock);
841                 for (i = 0; i < threadpool->working_threads->len; ++i) {
842                         thread = g_ptr_array_index (threadpool->working_threads, i);
843                         if ((thread->state & ThreadState_WaitSleepJoin) == 0) {
844                                 all_waitsleepjoin = FALSE;
845                                 break;
846                         }
847                 }
848                 mono_mutex_unlock (&threadpool->active_threads_lock);
849
850                 if (all_waitsleepjoin) {
851                         ThreadPoolCounter counter;
852                         COUNTER_ATOMIC (counter, { counter._.max_working ++; });
853                         hill_climbing_force_change (counter._.max_working, TRANSITION_STARVATION);
854                 }
855
856                 threadpool->cpu_usage = mono_cpu_usage (threadpool->cpu_usage_state);
857
858                 if (monitor_sufficient_delay_since_last_dequeue ()) {
859                         for (i = 0; i < 5; ++i) {
860                                 if (mono_runtime_is_shutting_down ())
861                                         break;
862
863                                 if (worker_try_unpark ())
864                                         break;
865
866                                 if (worker_try_create ())
867                                         break;
868                         }
869                 }
870         } while (monitor_should_keep_running ());
871 }
872
873 static void
874 monitor_ensure_running (void)
875 {
876         for (;;) {
877                 switch (monitor_status) {
878                 case MONITOR_STATUS_REQUESTED:
879                         return;
880                 case MONITOR_STATUS_WAITING_FOR_REQUEST:
881                         InterlockedCompareExchange (&monitor_status, MONITOR_STATUS_REQUESTED, MONITOR_STATUS_WAITING_FOR_REQUEST);
882                         break;
883                 case MONITOR_STATUS_NOT_RUNNING:
884                         if (mono_runtime_is_shutting_down ())
885                                 return;
886                         if (InterlockedCompareExchange (&monitor_status, MONITOR_STATUS_REQUESTED, MONITOR_STATUS_NOT_RUNNING) == MONITOR_STATUS_NOT_RUNNING) {
887                                 if (!mono_thread_create_internal (mono_get_root_domain (), monitor_thread, NULL, TRUE, SMALL_STACK))
888                                         monitor_status = MONITOR_STATUS_NOT_RUNNING;
889                                 return;
890                         }
891                         break;
892                 default: g_assert_not_reached ();
893                 }
894         }
895 }
896
897 static void
898 hill_climbing_change_thread_count (gint16 new_thread_count, ThreadPoolHeuristicStateTransition transition)
899 {
900         ThreadPoolHillClimbing *hc;
901
902         g_assert (threadpool);
903
904         hc = &threadpool->heuristic_hill_climbing;
905
906         hc->last_thread_count = new_thread_count;
907         hc->current_sample_interval = rand_next (&hc->random_interval_generator, hc->sample_interval_low, hc->sample_interval_high);
908         hc->elapsed_since_last_change = 0;
909         hc->completions_since_last_change = 0;
910 }
911
912 static void
913 hill_climbing_force_change (gint16 new_thread_count, ThreadPoolHeuristicStateTransition transition)
914 {
915         ThreadPoolHillClimbing *hc;
916
917         g_assert (threadpool);
918
919         hc = &threadpool->heuristic_hill_climbing;
920
921         if (new_thread_count != hc->last_thread_count) {
922                 hc->current_control_setting += new_thread_count - hc->last_thread_count;
923                 hill_climbing_change_thread_count (new_thread_count, transition);
924         }
925 }
926
927 static double complex
928 hill_climbing_get_wave_component (gdouble *samples, guint sample_count, gdouble period)
929 {
930         ThreadPoolHillClimbing *hc;
931         gdouble w, cosine, sine, coeff, q0, q1, q2;
932         guint i;
933
934         g_assert (threadpool);
935         g_assert (sample_count >= period);
936         g_assert (period >= 2);
937
938         hc = &threadpool->heuristic_hill_climbing;
939
940         w = 2.0 * M_PI / period;
941         cosine = cos (w);
942         sine = sin (w);
943         coeff = 2.0 * cosine;
944         q0 = q1 = q2 = 0;
945
946         for (i = 0; i < sample_count; ++i) {
947                 q0 = coeff * q1 - q2 + samples [(hc->total_samples - sample_count + i) % hc->samples_to_measure];
948                 q2 = q1;
949                 q1 = q0;
950         }
951
952         return ((q1 - q2 * cosine) + (q2 * sine) * I) / ((gdouble) sample_count);
953 }
954
955 static gint16
956 hill_climbing_update (gint16 current_thread_count, guint32 sample_duration, gint32 completions, guint32 *adjustment_interval)
957 {
958         ThreadPoolHillClimbing *hc;
959         ThreadPoolHeuristicStateTransition transition;
960         gdouble throughput;
961         gdouble throughput_error_estimate;
962         gdouble confidence;
963         gdouble move;
964         gdouble gain;
965         gint sample_index;
966         gint sample_count;
967         gint new_thread_wave_magnitude;
968         gint new_thread_count;
969         double complex thread_wave_component;
970         double complex throughput_wave_component;
971         double complex ratio;
972
973         g_assert (threadpool);
974         g_assert (adjustment_interval);
975
976         hc = &threadpool->heuristic_hill_climbing;
977
978         /* If someone changed the thread count without telling us, update our records accordingly. */
979         if (current_thread_count != hc->last_thread_count)
980                 hill_climbing_force_change (current_thread_count, TRANSITION_INITIALIZING);
981
982         /* Update the cumulative stats for this thread count */
983         hc->elapsed_since_last_change += sample_duration;
984         hc->completions_since_last_change += completions;
985
986         /* Add in any data we've already collected about this sample */
987         sample_duration += hc->accumulated_sample_duration;
988         completions += hc->accumulated_completion_count;
989
990         /* We need to make sure we're collecting reasonably accurate data. Since we're just counting the end
991          * of each work item, we are goinng to be missing some data about what really happened during the
992          * sample interval. The count produced by each thread includes an initial work item that may have
993          * started well before the start of the interval, and each thread may have been running some new
994          * work item for some time before the end of the interval, which did not yet get counted. So
995          * our count is going to be off by +/- threadCount workitems.
996          *
997          * The exception is that the thread that reported to us last time definitely wasn't running any work
998          * at that time, and the thread that's reporting now definitely isn't running a work item now. So
999          * we really only need to consider threadCount-1 threads.
1000          *
1001          * Thus the percent error in our count is +/- (threadCount-1)/numCompletions.
1002          *
1003          * We cannot rely on the frequency-domain analysis we'll be doing later to filter out this error, because
1004          * of the way it accumulates over time. If this sample is off by, say, 33% in the negative direction,
1005          * then the next one likely will be too. The one after that will include the sum of the completions
1006          * we missed in the previous samples, and so will be 33% positive. So every three samples we'll have
1007          * two "low" samples and one "high" sample. This will appear as periodic variation right in the frequency
1008          * range we're targeting, which will not be filtered by the frequency-domain translation. */
1009         if (hc->total_samples > 0 && ((current_thread_count - 1.0) / completions) >= hc->max_sample_error) {
1010                 /* Not accurate enough yet. Let's accumulate the data so
1011                  * far, and tell the ThreadPool to collect a little more. */
1012                 hc->accumulated_sample_duration = sample_duration;
1013                 hc->accumulated_completion_count = completions;
1014                 *adjustment_interval = 10;
1015                 return current_thread_count;
1016         }
1017
1018         /* We've got enouugh data for our sample; reset our accumulators for next time. */
1019         hc->accumulated_sample_duration = 0;
1020         hc->accumulated_completion_count = 0;
1021
1022         /* Add the current thread count and throughput sample to our history. */
1023         throughput = ((gdouble) completions) / sample_duration;
1024
1025         sample_index = hc->total_samples % hc->samples_to_measure;
1026         hc->samples [sample_index] = throughput;
1027         hc->thread_counts [sample_index] = current_thread_count;
1028         hc->total_samples ++;
1029
1030         /* Set up defaults for our metrics. */
1031         thread_wave_component = 0;
1032         throughput_wave_component = 0;
1033         throughput_error_estimate = 0;
1034         ratio = 0;
1035         confidence = 0;
1036
1037         transition = TRANSITION_WARMUP;
1038
1039         /* How many samples will we use? It must be at least the three wave periods we're looking for, and it must also
1040          * be a whole multiple of the primary wave's period; otherwise the frequency we're looking for will fall between
1041          * two frequency bands in the Fourier analysis, and we won't be able to measure it accurately. */
1042         sample_count = ((gint) MIN (hc->total_samples - 1, hc->samples_to_measure) / hc->wave_period) * hc->wave_period;
1043
1044         if (sample_count > hc->wave_period) {
1045                 guint i;
1046                 gdouble average_throughput;
1047                 gdouble average_thread_count;
1048                 gdouble sample_sum = 0;
1049                 gdouble thread_sum = 0;
1050
1051                 /* Average the throughput and thread count samples, so we can scale the wave magnitudes later. */
1052                 for (i = 0; i < sample_count; ++i) {
1053                         guint j = (hc->total_samples - sample_count + i) % hc->samples_to_measure;
1054                         sample_sum += hc->samples [j];
1055                         thread_sum += hc->thread_counts [j];
1056                 }
1057
1058                 average_throughput = sample_sum / sample_count;
1059                 average_thread_count = thread_sum / sample_count;
1060
1061                 if (average_throughput > 0 && average_thread_count > 0) {
1062                         gdouble noise_for_confidence, adjacent_period_1, adjacent_period_2;
1063
1064                         /* Calculate the periods of the adjacent frequency bands we'll be using to
1065                          * measure noise levels. We want the two adjacent Fourier frequency bands. */
1066                         adjacent_period_1 = sample_count / (((gdouble) sample_count) / ((gdouble) hc->wave_period) + 1);
1067                         adjacent_period_2 = sample_count / (((gdouble) sample_count) / ((gdouble) hc->wave_period) - 1);
1068
1069                         /* Get the the three different frequency components of the throughput (scaled by average
1070                          * throughput). Our "error" estimate (the amount of noise that might be present in the
1071                          * frequency band we're really interested in) is the average of the adjacent bands. */
1072                         throughput_wave_component = hill_climbing_get_wave_component (hc->samples, sample_count, hc->wave_period) / average_throughput;
1073                         throughput_error_estimate = cabs (hill_climbing_get_wave_component (hc->samples, sample_count, adjacent_period_1) / average_throughput);
1074
1075                         if (adjacent_period_2 <= sample_count) {
1076                                 throughput_error_estimate = MAX (throughput_error_estimate, cabs (hill_climbing_get_wave_component (
1077                                         hc->samples, sample_count, adjacent_period_2) / average_throughput));
1078                         }
1079
1080                         /* Do the same for the thread counts, so we have something to compare to. We don't
1081                          * measure thread count noise, because there is none; these are exact measurements. */
1082                         thread_wave_component = hill_climbing_get_wave_component (hc->thread_counts, sample_count, hc->wave_period) / average_thread_count;
1083
1084                         /* Update our moving average of the throughput noise. We'll use this
1085                          * later as feedback to determine the new size of the thread wave. */
1086                         if (hc->average_throughput_noise == 0) {
1087                                 hc->average_throughput_noise = throughput_error_estimate;
1088                         } else {
1089                                 hc->average_throughput_noise = (hc->throughput_error_smoothing_factor * throughput_error_estimate)
1090                                         + ((1.0 + hc->throughput_error_smoothing_factor) * hc->average_throughput_noise);
1091                         }
1092
1093                         if (cabs (thread_wave_component) > 0) {
1094                                 /* Adjust the throughput wave so it's centered around the target wave,
1095                                  * and then calculate the adjusted throughput/thread ratio. */
1096                                 ratio = (throughput_wave_component - (hc->target_throughput_ratio * thread_wave_component)) / thread_wave_component;
1097                                 transition = TRANSITION_CLIMBING_MOVE;
1098                         } else {
1099                                 ratio = 0;
1100                                 transition = TRANSITION_STABILIZING;
1101                         }
1102
1103                         noise_for_confidence = MAX (hc->average_throughput_noise, throughput_error_estimate);
1104                         if (noise_for_confidence > 0) {
1105                                 confidence = cabs (thread_wave_component) / noise_for_confidence / hc->target_signal_to_noise_ratio;
1106                         } else {
1107                                 /* there is no noise! */
1108                                 confidence = 1.0;
1109                         }
1110                 }
1111         }
1112
1113         /* We use just the real part of the complex ratio we just calculated. If the throughput signal
1114          * is exactly in phase with the thread signal, this will be the same as taking the magnitude of
1115          * the complex move and moving that far up. If they're 180 degrees out of phase, we'll move
1116          * backward (because this indicates that our changes are having the opposite of the intended effect).
1117          * If they're 90 degrees out of phase, we won't move at all, because we can't tell wether we're
1118          * having a negative or positive effect on throughput. */
1119         move = creal (ratio);
1120         move = CLAMP (move, -1.0, 1.0);
1121
1122         /* Apply our confidence multiplier. */
1123         move *= CLAMP (confidence, -1.0, 1.0);
1124
1125         /* Now apply non-linear gain, such that values around zero are attenuated, while higher values
1126          * are enhanced. This allows us to move quickly if we're far away from the target, but more slowly
1127         * if we're getting close, giving us rapid ramp-up without wild oscillations around the target. */
1128         gain = hc->max_change_per_second * sample_duration;
1129         move = pow (fabs (move), hc->gain_exponent) * (move >= 0.0 ? 1 : -1) * gain;
1130         move = MIN (move, hc->max_change_per_sample);
1131
1132         /* If the result was positive, and CPU is > 95%, refuse the move. */
1133         if (move > 0.0 && threadpool->cpu_usage > CPU_USAGE_HIGH)
1134                 move = 0.0;
1135
1136         /* Apply the move to our control setting. */
1137         hc->current_control_setting += move;
1138
1139         /* Calculate the new thread wave magnitude, which is based on the moving average we've been keeping of the
1140          * throughput error.  This average starts at zero, so we'll start with a nice safe little wave at first. */
1141         new_thread_wave_magnitude = (gint)(0.5 + (hc->current_control_setting * hc->average_throughput_noise
1142                 * hc->target_signal_to_noise_ratio * hc->thread_magnitude_multiplier * 2.0));
1143         new_thread_wave_magnitude = CLAMP (new_thread_wave_magnitude, 1, hc->max_thread_wave_magnitude);
1144
1145         /* Make sure our control setting is within the ThreadPool's limits. */
1146         hc->current_control_setting = CLAMP (hc->current_control_setting, threadpool->limit_worker_min, threadpool->limit_worker_max - new_thread_wave_magnitude);
1147
1148         /* Calculate the new thread count (control setting + square wave). */
1149         new_thread_count = (gint)(hc->current_control_setting + new_thread_wave_magnitude * ((hc->total_samples / (hc->wave_period / 2)) % 2));
1150
1151         /* Make sure the new thread count doesn't exceed the ThreadPool's limits. */
1152         new_thread_count = CLAMP (new_thread_count, threadpool->limit_worker_min, threadpool->limit_worker_max);
1153
1154         if (new_thread_count != current_thread_count)
1155                 hill_climbing_change_thread_count (new_thread_count, transition);
1156
1157         if (creal (ratio) < 0.0 && new_thread_count == threadpool->limit_worker_min)
1158                 *adjustment_interval = (gint)(0.5 + hc->current_sample_interval * (10.0 * MAX (-1.0 * creal (ratio), 1.0)));
1159         else
1160                 *adjustment_interval = hc->current_sample_interval;
1161
1162         return new_thread_count;
1163 }
1164
1165 static void
1166 heuristic_notify_work_completed (void)
1167 {
1168         g_assert (threadpool);
1169
1170         InterlockedIncrement (&threadpool->heuristic_completions);
1171         threadpool->heuristic_last_dequeue = mono_msec_ticks ();
1172 }
1173
1174 static gboolean
1175 heuristic_should_adjust (void)
1176 {
1177         g_assert (threadpool);
1178
1179         if (threadpool->heuristic_last_dequeue > threadpool->heuristic_last_adjustment + threadpool->heuristic_adjustment_interval) {
1180                 ThreadPoolCounter counter = COUNTER_READ ();
1181                 if (counter._.working <= counter._.max_working)
1182                         return TRUE;
1183         }
1184
1185         return FALSE;
1186 }
1187
1188 static void
1189 heuristic_adjust (void)
1190 {
1191         g_assert (threadpool);
1192
1193         if (mono_mutex_trylock (&threadpool->heuristic_lock) == 0) {
1194                 gint32 completions = InterlockedExchange (&threadpool->heuristic_completions, 0);
1195                 guint32 sample_end = mono_msec_ticks ();
1196                 guint32 sample_duration = sample_end - threadpool->heuristic_sample_start;
1197
1198                 if (sample_duration >= threadpool->heuristic_adjustment_interval / 2) {
1199                         ThreadPoolCounter counter;
1200                         gint16 new_thread_count;
1201
1202                         counter = COUNTER_READ ();
1203                         new_thread_count = hill_climbing_update (counter._.max_working, sample_duration, completions, &threadpool->heuristic_adjustment_interval);
1204
1205                         COUNTER_ATOMIC (counter, { counter._.max_working = new_thread_count; });
1206
1207                         if (new_thread_count > counter._.max_working)
1208                                 worker_request (mono_domain_get ());
1209
1210                         threadpool->heuristic_sample_start = sample_end;
1211                         threadpool->heuristic_last_adjustment = mono_msec_ticks ();
1212                 }
1213
1214                 mono_mutex_unlock (&threadpool->heuristic_lock);
1215         }
1216 }
1217
1218 void
1219 mono_threadpool_ms_cleanup (void)
1220 {
1221         #ifndef DISABLE_SOCKETS
1222                 mono_threadpool_ms_io_cleanup ();
1223         #endif
1224         ensure_cleanedup ();
1225 }
1226
1227 MonoAsyncResult *
1228 mono_threadpool_ms_begin_invoke (MonoDomain *domain, MonoObject *target, MonoMethod *method, gpointer *params)
1229 {
1230         static MonoClass *async_call_klass = NULL;
1231         MonoMethodMessage *message;
1232         MonoAsyncResult *async_result;
1233         MonoAsyncCall *async_call;
1234         MonoDelegate *async_callback = NULL;
1235         MonoObject *state = NULL;
1236
1237         if (!async_call_klass)
1238                 async_call_klass = mono_class_from_name (mono_defaults.corlib, "System", "MonoAsyncCall");
1239         g_assert (async_call_klass);
1240
1241         ensure_initialized (NULL);
1242
1243         message = mono_method_call_message_new (method, params, mono_get_delegate_invoke (method->klass), (params != NULL) ? (&async_callback) : NULL, (params != NULL) ? (&state) : NULL);
1244
1245         async_call = (MonoAsyncCall*) mono_object_new (domain, async_call_klass);
1246         MONO_OBJECT_SETREF (async_call, msg, message);
1247         MONO_OBJECT_SETREF (async_call, state, state);
1248
1249         if (async_callback) {
1250                 MONO_OBJECT_SETREF (async_call, cb_method, mono_get_delegate_invoke (((MonoObject*) async_callback)->vtable->klass));
1251                 MONO_OBJECT_SETREF (async_call, cb_target, async_callback);
1252         }
1253
1254         async_result = mono_async_result_new (domain, NULL, async_call->state, NULL, (MonoObject*) async_call);
1255         MONO_OBJECT_SETREF (async_result, async_delegate, target);
1256
1257 #ifndef DISABLE_SOCKETS
1258         if (mono_threadpool_ms_is_io (target, state))
1259                 return mono_threadpool_ms_io_add (async_result, (MonoSocketAsyncResult*) state);
1260 #endif
1261
1262         mono_threadpool_ms_enqueue_work_item (domain, (MonoObject*) async_result);
1263
1264         return async_result;
1265 }
1266
1267 MonoObject *
1268 mono_threadpool_ms_end_invoke (MonoAsyncResult *ares, MonoArray **out_args, MonoObject **exc)
1269 {
1270         MonoAsyncCall *ac;
1271
1272         g_assert (exc);
1273         g_assert (out_args);
1274
1275         *exc = NULL;
1276         *out_args = NULL;
1277
1278         /* check if already finished */
1279         mono_monitor_enter ((MonoObject*) ares);
1280
1281         if (ares->endinvoke_called) {
1282                 *exc = (MonoObject*) mono_get_exception_invalid_operation (NULL);
1283                 mono_monitor_exit ((MonoObject*) ares);
1284                 return NULL;
1285         }
1286
1287         MONO_OBJECT_SETREF (ares, endinvoke_called, 1);
1288
1289         /* wait until we are really finished */
1290         if (ares->completed) {
1291                 mono_monitor_exit ((MonoObject *) ares);
1292         } else {
1293                 gpointer wait_event;
1294                 if (ares->handle) {
1295                         wait_event = mono_wait_handle_get_handle ((MonoWaitHandle*) ares->handle);
1296                 } else {
1297                         wait_event = CreateEvent (NULL, TRUE, FALSE, NULL);
1298                         g_assert(wait_event);
1299                         MONO_OBJECT_SETREF (ares, handle, (MonoObject*) mono_wait_handle_new (mono_object_domain (ares), wait_event));
1300                 }
1301                 mono_monitor_exit ((MonoObject*) ares);
1302                 MONO_PREPARE_BLOCKING
1303                 WaitForSingleObjectEx (wait_event, INFINITE, TRUE);
1304                 MONO_FINISH_BLOCKING
1305         }
1306
1307         ac = (MonoAsyncCall*) ares->object_data;
1308         g_assert (ac);
1309
1310         *exc = ac->msg->exc; /* FIXME: GC add write barrier */
1311         *out_args = ac->out_args;
1312         return ac->res;
1313 }
1314
1315 gboolean
1316 mono_threadpool_ms_remove_domain_jobs (MonoDomain *domain, int timeout)
1317 {
1318         gboolean res = TRUE;
1319         guint32 start;
1320         gpointer sem;
1321
1322         g_assert (domain);
1323         g_assert (timeout >= -1);
1324
1325         g_assert (mono_domain_is_unloading (domain));
1326
1327         if (timeout != -1)
1328                 start = mono_msec_ticks ();
1329
1330 #ifndef DISABLE_SOCKETS
1331         mono_threadpool_ms_io_remove_domain_jobs (domain);
1332         if (timeout != -1) {
1333                 timeout -= mono_msec_ticks () - start;
1334                 if (timeout < 0)
1335                         return FALSE;
1336         }
1337 #endif
1338
1339         /*
1340          * There might be some threads out that could be about to execute stuff from the given domain.
1341          * We avoid that by setting up a semaphore to be pulsed by the thread that reaches zero.
1342          */
1343         sem = domain->cleanup_semaphore = CreateSemaphore (NULL, 0, 1, NULL);
1344
1345         /*
1346          * The memory barrier here is required to have global ordering between assigning to cleanup_semaphone
1347          * and reading threadpool_jobs. Otherwise this thread could read a stale version of threadpool_jobs
1348          * and wait forever.
1349          */
1350         mono_memory_write_barrier ();
1351
1352         while (domain->threadpool_jobs) {
1353                 MONO_PREPARE_BLOCKING
1354                 WaitForSingleObject (sem, timeout);
1355                 MONO_FINISH_BLOCKING
1356                 if (timeout != -1) {
1357                         timeout -= mono_msec_ticks () - start;
1358                         if (timeout <= 0) {
1359                                 res = FALSE;
1360                                 break;
1361                         }
1362                 }
1363         }
1364
1365         domain->cleanup_semaphore = NULL;
1366         CloseHandle (sem);
1367
1368         return res;
1369 }
1370
1371 void
1372 mono_threadpool_ms_suspend (void)
1373 {
1374         threadpool->suspended = TRUE;
1375 }
1376
1377 void
1378 mono_threadpool_ms_resume (void)
1379 {
1380         threadpool->suspended = FALSE;
1381 }
1382
1383 void
1384 ves_icall_System_Threading_Microsoft_ThreadPool_GetAvailableThreadsNative (gint32 *worker_threads, gint32 *completion_port_threads)
1385 {
1386         if (!worker_threads || !completion_port_threads)
1387                 return;
1388
1389         ensure_initialized (NULL);
1390
1391         *worker_threads = threadpool->limit_worker_max;
1392         *completion_port_threads = threadpool->limit_io_max;
1393 }
1394
1395 void
1396 ves_icall_System_Threading_Microsoft_ThreadPool_GetMinThreadsNative (gint32 *worker_threads, gint32 *completion_port_threads)
1397 {
1398         if (!worker_threads || !completion_port_threads)
1399                 return;
1400
1401         ensure_initialized (NULL);
1402
1403         *worker_threads = threadpool->limit_worker_min;
1404         *completion_port_threads = threadpool->limit_io_min;
1405 }
1406
1407 void
1408 ves_icall_System_Threading_Microsoft_ThreadPool_GetMaxThreadsNative (gint32 *worker_threads, gint32 *completion_port_threads)
1409 {
1410         if (!worker_threads || !completion_port_threads)
1411                 return;
1412
1413         ensure_initialized (NULL);
1414
1415         *worker_threads = threadpool->limit_worker_max;
1416         *completion_port_threads = threadpool->limit_io_max;
1417 }
1418
1419 MonoBoolean
1420 ves_icall_System_Threading_Microsoft_ThreadPool_SetMinThreadsNative (gint32 worker_threads, gint32 completion_port_threads)
1421 {
1422         ensure_initialized (NULL);
1423
1424         if (worker_threads <= 0 || worker_threads > threadpool->limit_worker_max)
1425                 return FALSE;
1426         if (completion_port_threads <= 0 || completion_port_threads > threadpool->limit_io_max)
1427                 return FALSE;
1428
1429         threadpool->limit_worker_max = worker_threads;
1430         threadpool->limit_io_max = completion_port_threads;
1431
1432         return TRUE;
1433 }
1434
1435 MonoBoolean
1436 ves_icall_System_Threading_Microsoft_ThreadPool_SetMaxThreadsNative (gint32 worker_threads, gint32 completion_port_threads)
1437 {
1438         gint cpu_count = mono_cpu_count ();
1439
1440         ensure_initialized (NULL);
1441
1442         if (worker_threads < threadpool->limit_worker_min || worker_threads < cpu_count)
1443                 return FALSE;
1444         if (completion_port_threads < threadpool->limit_io_min || completion_port_threads < cpu_count)
1445                 return FALSE;
1446
1447         threadpool->limit_worker_max = worker_threads;
1448         threadpool->limit_io_max = completion_port_threads;
1449
1450         return TRUE;
1451 }
1452
1453 void
1454 ves_icall_System_Threading_Microsoft_ThreadPool_InitializeVMTp (MonoBoolean *enable_worker_tracking)
1455 {
1456         ensure_initialized (enable_worker_tracking);
1457 }
1458
1459 MonoBoolean
1460 ves_icall_System_Threading_Microsoft_ThreadPool_NotifyWorkItemComplete (void)
1461 {
1462         ThreadPoolCounter counter;
1463
1464         if (mono_domain_is_unloading (mono_domain_get ()) || mono_runtime_is_shutting_down ())
1465                 return FALSE;
1466
1467         heuristic_notify_work_completed ();
1468
1469         if (heuristic_should_adjust ())
1470                 heuristic_adjust ();
1471
1472         counter = COUNTER_READ ();
1473         return counter._.working <= counter._.max_working;
1474 }
1475
1476 void
1477 ves_icall_System_Threading_Microsoft_ThreadPool_NotifyWorkItemProgressNative (void)
1478 {
1479         heuristic_notify_work_completed ();
1480
1481         if (heuristic_should_adjust ())
1482                 heuristic_adjust ();
1483 }
1484
1485 void
1486 ves_icall_System_Threading_Microsoft_ThreadPool_ReportThreadStatus (MonoBoolean is_working)
1487 {
1488         // TODO
1489         mono_raise_exception (mono_get_exception_not_implemented (NULL));
1490 }
1491
1492 MonoBoolean
1493 ves_icall_System_Threading_Microsoft_ThreadPool_RequestWorkerThread (void)
1494 {
1495         return worker_request (mono_domain_get ());
1496 }
1497
1498 MonoBoolean G_GNUC_UNUSED
1499 ves_icall_System_Threading_Microsoft_ThreadPool_PostQueuedCompletionStatus (MonoNativeOverlapped *native_overlapped)
1500 {
1501         /* This copy the behavior of the current Mono implementation */
1502         mono_raise_exception (mono_get_exception_not_implemented (NULL));
1503         return FALSE;
1504 }
1505
1506 MonoBoolean G_GNUC_UNUSED
1507 ves_icall_System_Threading_Microsoft_ThreadPool_BindIOCompletionCallbackNative (gpointer file_handle)
1508 {
1509         /* This copy the behavior of the current Mono implementation */
1510         return TRUE;
1511 }
1512
1513 MonoBoolean G_GNUC_UNUSED
1514 ves_icall_System_Threading_Microsoft_ThreadPool_IsThreadPoolHosted (void)
1515 {
1516         return FALSE;
1517 }