[threadpool] Reduce logging verbosity level. Fixes #58829
[mono.git] / mono / metadata / threadpool-worker-default.c
1 /**
2  * \file
3  * native threadpool worker
4  *
5  * Author:
6  *      Ludovic Henry (ludovic.henry@xamarin.com)
7  *
8  * Licensed under the MIT license. See LICENSE file in the project root for full license information.
9  */
10
11 #include <stdlib.h>
12 #define _USE_MATH_DEFINES // needed by MSVC to define math constants
13 #include <math.h>
14 #include <config.h>
15 #include <glib.h>
16
17 #include <mono/metadata/class-internals.h>
18 #include <mono/metadata/exception.h>
19 #include <mono/metadata/gc-internals.h>
20 #include <mono/metadata/object.h>
21 #include <mono/metadata/object-internals.h>
22 #include <mono/metadata/threadpool.h>
23 #include <mono/metadata/threadpool-worker.h>
24 #include <mono/metadata/threadpool-io.h>
25 #include <mono/metadata/w32event.h>
26 #include <mono/utils/atomic.h>
27 #include <mono/utils/mono-compiler.h>
28 #include <mono/utils/mono-complex.h>
29 #include <mono/utils/mono-logger.h>
30 #include <mono/utils/mono-logger-internals.h>
31 #include <mono/utils/mono-proclib.h>
32 #include <mono/utils/mono-threads.h>
33 #include <mono/utils/mono-time.h>
34 #include <mono/utils/mono-rand.h>
35 #include <mono/utils/refcount.h>
36 #include <mono/utils/w32api.h>
37
38 #define CPU_USAGE_LOW 80
39 #define CPU_USAGE_HIGH 95
40
41 #define MONITOR_INTERVAL 500 // ms
42 #define MONITOR_MINIMAL_LIFETIME 60 * 1000 // ms
43
44 #define WORKER_CREATION_MAX_PER_SEC 10
45
46 /* The exponent to apply to the gain. 1.0 means to use linear gain,
47  * higher values will enhance large moves and damp small ones.
48  * default: 2.0 */
49 #define HILL_CLIMBING_GAIN_EXPONENT 2.0
50
51 /* The 'cost' of a thread. 0 means drive for increased throughput regardless
52  * of thread count, higher values bias more against higher thread counts.
53  * default: 0.15 */
54 #define HILL_CLIMBING_BIAS 0.15
55
56 #define HILL_CLIMBING_WAVE_PERIOD 4
57 #define HILL_CLIMBING_MAX_WAVE_MAGNITUDE 20
58 #define HILL_CLIMBING_WAVE_MAGNITUDE_MULTIPLIER 1.0
59 #define HILL_CLIMBING_WAVE_HISTORY_SIZE 8
60 #define HILL_CLIMBING_TARGET_SIGNAL_TO_NOISE_RATIO 3.0
61 #define HILL_CLIMBING_MAX_CHANGE_PER_SECOND 4
62 #define HILL_CLIMBING_MAX_CHANGE_PER_SAMPLE 20
63 #define HILL_CLIMBING_SAMPLE_INTERVAL_LOW 10
64 #define HILL_CLIMBING_SAMPLE_INTERVAL_HIGH 200
65 #define HILL_CLIMBING_ERROR_SMOOTHING_FACTOR 0.01
66 #define HILL_CLIMBING_MAX_SAMPLE_ERROR_PERCENT 0.15
67
68 typedef enum {
69         TRANSITION_WARMUP,
70         TRANSITION_INITIALIZING,
71         TRANSITION_RANDOM_MOVE,
72         TRANSITION_CLIMBING_MOVE,
73         TRANSITION_CHANGE_POINT,
74         TRANSITION_STABILIZING,
75         TRANSITION_STARVATION,
76         TRANSITION_THREAD_TIMED_OUT,
77         TRANSITION_UNDEFINED,
78 } ThreadPoolHeuristicStateTransition;
79
80 typedef struct {
81         gint32 wave_period;
82         gint32 samples_to_measure;
83         gdouble target_throughput_ratio;
84         gdouble target_signal_to_noise_ratio;
85         gdouble max_change_per_second;
86         gdouble max_change_per_sample;
87         gint32 max_thread_wave_magnitude;
88         gint32 sample_interval_low;
89         gdouble thread_magnitude_multiplier;
90         gint32 sample_interval_high;
91         gdouble throughput_error_smoothing_factor;
92         gdouble gain_exponent;
93         gdouble max_sample_error;
94
95         gdouble current_control_setting;
96         gint64 total_samples;
97         gint16 last_thread_count;
98         gdouble elapsed_since_last_change;
99         gdouble completions_since_last_change;
100
101         gdouble average_throughput_noise;
102
103         gdouble *samples;
104         gdouble *thread_counts;
105
106         guint32 current_sample_interval;
107         gpointer random_interval_generator;
108
109         gint32 accumulated_completion_count;
110         gdouble accumulated_sample_duration;
111 } ThreadPoolHillClimbing;
112
113 typedef union {
114         struct {
115                 gint16 max_working; /* determined by heuristic */
116                 gint16 starting; /* starting, but not yet in worker_thread */
117                 gint16 working; /* executing worker_thread */
118                 gint16 parked; /* parked */
119         } _;
120         gint64 as_gint64;
121 } ThreadPoolWorkerCounter
122 #ifdef __GNUC__
123 __attribute__((aligned(64)))
124 #endif
125 ;
126
127 typedef struct {
128         MonoRefCount ref;
129
130         MonoThreadPoolWorkerCallback callback;
131
132         ThreadPoolWorkerCounter counters;
133
134         MonoCoopMutex parked_threads_lock;
135         gint32 parked_threads_count;
136         MonoCoopCond parked_threads_cond;
137
138         volatile gint32 work_items_count;
139
140         guint32 worker_creation_current_second;
141         guint32 worker_creation_current_count;
142         MonoCoopMutex worker_creation_lock;
143
144         gint32 heuristic_completions;
145         gint64 heuristic_sample_start;
146         gint64 heuristic_last_dequeue; // ms
147         gint64 heuristic_last_adjustment; // ms
148         gint64 heuristic_adjustment_interval; // ms
149         ThreadPoolHillClimbing heuristic_hill_climbing;
150         MonoCoopMutex heuristic_lock;
151
152         gint32 limit_worker_min;
153         gint32 limit_worker_max;
154
155         MonoCpuUsageState *cpu_usage_state;
156         gint32 cpu_usage;
157
158         /* suspended by the debugger */
159         gboolean suspended;
160
161         gint32 monitor_status;
162 } ThreadPoolWorker;
163
164 enum {
165         MONITOR_STATUS_REQUESTED,
166         MONITOR_STATUS_WAITING_FOR_REQUEST,
167         MONITOR_STATUS_NOT_RUNNING,
168 };
169
170 static ThreadPoolWorker worker;
171
172 #define COUNTER_CHECK(counter) \
173         do { \
174                 g_assert (counter._.max_working > 0); \
175                 g_assert (counter._.starting >= 0); \
176                 g_assert (counter._.working >= 0); \
177         } while (0)
178
179 #define COUNTER_ATOMIC(var,block) \
180         do { \
181                 ThreadPoolWorkerCounter __old; \
182                 do { \
183                         __old = COUNTER_READ (); \
184                         (var) = __old; \
185                         { block; } \
186                         COUNTER_CHECK (var); \
187                 } while (InterlockedCompareExchange64 (&worker.counters.as_gint64, (var).as_gint64, __old.as_gint64) != __old.as_gint64); \
188         } while (0)
189
190 static inline ThreadPoolWorkerCounter
191 COUNTER_READ (void)
192 {
193         ThreadPoolWorkerCounter counter;
194         counter.as_gint64 = InterlockedRead64 (&worker.counters.as_gint64);
195         return counter;
196 }
197
198 static gpointer
199 rand_create (void)
200 {
201         mono_rand_open ();
202         return mono_rand_init (NULL, 0);
203 }
204
205 static guint32
206 rand_next (gpointer *handle, guint32 min, guint32 max)
207 {
208         MonoError error;
209         guint32 val;
210         mono_rand_try_get_uint32 (handle, &val, min, max, &error);
211         // FIXME handle error
212         mono_error_assert_ok (&error);
213         return val;
214 }
215
216 static void
217 destroy (gpointer data)
218 {
219         mono_coop_mutex_destroy (&worker.parked_threads_lock);
220         mono_coop_cond_destroy (&worker.parked_threads_cond);
221
222         mono_coop_mutex_destroy (&worker.worker_creation_lock);
223
224         mono_coop_mutex_destroy (&worker.heuristic_lock);
225
226         g_free (worker.cpu_usage_state);
227 }
228
229 void
230 mono_threadpool_worker_init (MonoThreadPoolWorkerCallback callback)
231 {
232         ThreadPoolHillClimbing *hc;
233         const char *threads_per_cpu_env;
234         gint threads_per_cpu;
235         gint threads_count;
236
237         mono_refcount_init (&worker, destroy);
238
239         worker.callback = callback;
240
241         mono_coop_mutex_init (&worker.parked_threads_lock);
242         worker.parked_threads_count = 0;
243         mono_coop_cond_init (&worker.parked_threads_cond);
244
245         worker.worker_creation_current_second = -1;
246         mono_coop_mutex_init (&worker.worker_creation_lock);
247
248         worker.heuristic_adjustment_interval = 10;
249         mono_coop_mutex_init (&worker.heuristic_lock);
250
251         mono_rand_open ();
252
253         hc = &worker.heuristic_hill_climbing;
254
255         hc->wave_period = HILL_CLIMBING_WAVE_PERIOD;
256         hc->max_thread_wave_magnitude = HILL_CLIMBING_MAX_WAVE_MAGNITUDE;
257         hc->thread_magnitude_multiplier = (gdouble) HILL_CLIMBING_WAVE_MAGNITUDE_MULTIPLIER;
258         hc->samples_to_measure = hc->wave_period * HILL_CLIMBING_WAVE_HISTORY_SIZE;
259         hc->target_throughput_ratio = (gdouble) HILL_CLIMBING_BIAS;
260         hc->target_signal_to_noise_ratio = (gdouble) HILL_CLIMBING_TARGET_SIGNAL_TO_NOISE_RATIO;
261         hc->max_change_per_second = (gdouble) HILL_CLIMBING_MAX_CHANGE_PER_SECOND;
262         hc->max_change_per_sample = (gdouble) HILL_CLIMBING_MAX_CHANGE_PER_SAMPLE;
263         hc->sample_interval_low = HILL_CLIMBING_SAMPLE_INTERVAL_LOW;
264         hc->sample_interval_high = HILL_CLIMBING_SAMPLE_INTERVAL_HIGH;
265         hc->throughput_error_smoothing_factor = (gdouble) HILL_CLIMBING_ERROR_SMOOTHING_FACTOR;
266         hc->gain_exponent = (gdouble) HILL_CLIMBING_GAIN_EXPONENT;
267         hc->max_sample_error = (gdouble) HILL_CLIMBING_MAX_SAMPLE_ERROR_PERCENT;
268         hc->current_control_setting = 0;
269         hc->total_samples = 0;
270         hc->last_thread_count = 0;
271         hc->average_throughput_noise = 0;
272         hc->elapsed_since_last_change = 0;
273         hc->accumulated_completion_count = 0;
274         hc->accumulated_sample_duration = 0;
275         hc->samples = g_new0 (gdouble, hc->samples_to_measure);
276         hc->thread_counts = g_new0 (gdouble, hc->samples_to_measure);
277         hc->random_interval_generator = rand_create ();
278         hc->current_sample_interval = rand_next (&hc->random_interval_generator, hc->sample_interval_low, hc->sample_interval_high);
279
280         if (!(threads_per_cpu_env = g_getenv ("MONO_THREADS_PER_CPU")))
281                 threads_per_cpu = 1;
282         else
283                 threads_per_cpu = CLAMP (atoi (threads_per_cpu_env), 1, 50);
284
285         threads_count = mono_cpu_count () * threads_per_cpu;
286
287         worker.limit_worker_min = threads_count;
288
289 #if defined (HOST_ANDROID) || defined (HOST_IOS)
290         worker.limit_worker_max = CLAMP (threads_count * 100, MIN (threads_count, 200), MAX (threads_count, 200));
291 #else
292         worker.limit_worker_max = threads_count * 100;
293 #endif
294
295         worker.counters._.max_working = worker.limit_worker_min;
296
297         worker.cpu_usage_state = g_new0 (MonoCpuUsageState, 1);
298
299         worker.suspended = FALSE;
300
301         worker.monitor_status = MONITOR_STATUS_NOT_RUNNING;
302 }
303
304 void
305 mono_threadpool_worker_cleanup (void)
306 {
307         mono_refcount_dec (&worker);
308 }
309
310 static void
311 work_item_push (void)
312 {
313         gint32 old, new;
314
315         do {
316                 old = InterlockedRead (&worker.work_items_count);
317                 g_assert (old >= 0);
318
319                 new = old + 1;
320         } while (InterlockedCompareExchange (&worker.work_items_count, new, old) != old);
321 }
322
323 static gboolean
324 work_item_try_pop (void)
325 {
326         gint32 old, new;
327
328         do {
329                 old = InterlockedRead (&worker.work_items_count);
330                 g_assert (old >= 0);
331
332                 if (old == 0)
333                         return FALSE;
334
335                 new = old - 1;
336         } while (InterlockedCompareExchange (&worker.work_items_count, new, old) != old);
337
338         return TRUE;
339 }
340
341 static gint32
342 work_item_count (void)
343 {
344         return InterlockedRead (&worker.work_items_count);
345 }
346
347 static void worker_request (void);
348
349 void
350 mono_threadpool_worker_request (void)
351 {
352         if (!mono_refcount_tryinc (&worker))
353                 return;
354
355         work_item_push ();
356
357         worker_request ();
358
359         mono_refcount_dec (&worker);
360 }
361
362 static void
363 worker_wait_interrupt (gpointer unused)
364 {
365         /* If the runtime is not shutting down, we are not using this mechanism to wake up a unparked thread, and if the
366          * runtime is shutting down, then we need to wake up ALL the threads.
367          * It might be a bit wasteful, but I witnessed shutdown hang where the main thread would abort and then wait for all
368          * background threads to exit (see mono_thread_manage). This would go wrong because not all threadpool threads would
369          * be unparked. It would end up getting unstucked because of the timeout, but that would delay shutdown by 5-60s. */
370         if (!mono_runtime_is_shutting_down ())
371                 return;
372
373         if (!mono_refcount_tryinc (&worker))
374                 return;
375
376         mono_coop_mutex_lock (&worker.parked_threads_lock);
377         mono_coop_cond_broadcast (&worker.parked_threads_cond);
378         mono_coop_mutex_unlock (&worker.parked_threads_lock);
379
380         mono_refcount_dec (&worker);
381 }
382
383 /* return TRUE if timeout, FALSE otherwise (worker unpark or interrupt) */
384 static gboolean
385 worker_park (void)
386 {
387         gboolean timeout = FALSE;
388         gboolean interrupted = FALSE;
389
390         mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] worker parking",
391                 GUINT_TO_POINTER (MONO_NATIVE_THREAD_ID_TO_UINT (mono_native_thread_id_get ())));
392
393         mono_coop_mutex_lock (&worker.parked_threads_lock);
394
395         if (!mono_runtime_is_shutting_down ()) {
396                 static gpointer rand_handle = NULL;
397                 MonoInternalThread *thread;
398                 ThreadPoolWorkerCounter counter;
399
400                 if (!rand_handle)
401                         rand_handle = rand_create ();
402                 g_assert (rand_handle);
403
404                 thread = mono_thread_internal_current ();
405                 g_assert (thread);
406
407                 COUNTER_ATOMIC (counter, {
408                         counter._.working --;
409                         counter._.parked ++;
410                 });
411
412                 worker.parked_threads_count += 1;
413
414                 mono_thread_info_install_interrupt (worker_wait_interrupt, NULL, &interrupted);
415                 if (interrupted)
416                         goto done;
417
418                 if (mono_coop_cond_timedwait (&worker.parked_threads_cond, &worker.parked_threads_lock, rand_next (&rand_handle, 5 * 1000, 60 * 1000)) != 0)
419                         timeout = TRUE;
420
421                 mono_thread_info_uninstall_interrupt (&interrupted);
422
423 done:
424                 worker.parked_threads_count -= 1;
425
426                 COUNTER_ATOMIC (counter, {
427                         counter._.working ++;
428                         counter._.parked --;
429                 });
430         }
431
432         mono_coop_mutex_unlock (&worker.parked_threads_lock);
433
434         mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] worker unparking, timeout? %s interrupted? %s",
435                 GUINT_TO_POINTER (MONO_NATIVE_THREAD_ID_TO_UINT (mono_native_thread_id_get ())), timeout ? "yes" : "no", interrupted ? "yes" : "no");
436
437         return timeout;
438 }
439
440 static gboolean
441 worker_try_unpark (void)
442 {
443         gboolean res = FALSE;
444
445         mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try unpark worker",
446                 GUINT_TO_POINTER (MONO_NATIVE_THREAD_ID_TO_UINT (mono_native_thread_id_get ())));
447
448         mono_coop_mutex_lock (&worker.parked_threads_lock);
449         if (worker.parked_threads_count > 0) {
450                 mono_coop_cond_signal (&worker.parked_threads_cond);
451                 res = TRUE;
452         }
453         mono_coop_mutex_unlock (&worker.parked_threads_lock);
454
455         mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try unpark worker, success? %s",
456                 GUINT_TO_POINTER (MONO_NATIVE_THREAD_ID_TO_UINT (mono_native_thread_id_get ())), res ? "yes" : "no");
457
458         return res;
459 }
460
461 static gsize WINAPI
462 worker_thread (gpointer unused)
463 {
464         MonoInternalThread *thread;
465         ThreadPoolWorkerCounter counter;
466
467         mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] worker starting",
468                 GUINT_TO_POINTER (MONO_NATIVE_THREAD_ID_TO_UINT (mono_native_thread_id_get ())));
469
470         if (!mono_refcount_tryinc (&worker))
471                 return 0;
472
473         COUNTER_ATOMIC (counter, {
474                 counter._.starting --;
475                 counter._.working ++;
476         });
477
478         thread = mono_thread_internal_current ();
479         g_assert (thread);
480
481         while (!mono_runtime_is_shutting_down ()) {
482                 if (mono_thread_interruption_checkpoint ())
483                         continue;
484
485                 if (!work_item_try_pop ()) {
486                         gboolean timeout;
487
488                         timeout = worker_park ();
489                         if (timeout)
490                                 break;
491
492                         continue;
493                 }
494
495                 mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] worker executing",
496                         GUINT_TO_POINTER (MONO_NATIVE_THREAD_ID_TO_UINT (mono_native_thread_id_get ())));
497
498                 worker.callback ();
499         }
500
501         COUNTER_ATOMIC (counter, {
502                 counter._.working --;
503         });
504
505         mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] worker finishing",
506                 GUINT_TO_POINTER (MONO_NATIVE_THREAD_ID_TO_UINT (mono_native_thread_id_get ())));
507
508         mono_refcount_dec (&worker);
509
510         return 0;
511 }
512
513 static gboolean
514 worker_try_create (void)
515 {
516         MonoError error;
517         MonoInternalThread *thread;
518         gint64 current_ticks;
519         gint32 now;
520         ThreadPoolWorkerCounter counter;
521
522         if (mono_runtime_is_shutting_down ())
523                 return FALSE;
524
525         mono_coop_mutex_lock (&worker.worker_creation_lock);
526
527         mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try create worker",
528                 GUINT_TO_POINTER (MONO_NATIVE_THREAD_ID_TO_UINT (mono_native_thread_id_get ())));
529
530         current_ticks = mono_100ns_ticks ();
531         if (0 == current_ticks) {
532                 g_warning ("failed to get 100ns ticks");
533         } else {
534                 now = current_ticks / (10 * 1000 * 1000);
535                 if (worker.worker_creation_current_second != now) {
536                         worker.worker_creation_current_second = now;
537                         worker.worker_creation_current_count = 0;
538                 } else {
539                         g_assert (worker.worker_creation_current_count <= WORKER_CREATION_MAX_PER_SEC);
540                         if (worker.worker_creation_current_count == WORKER_CREATION_MAX_PER_SEC) {
541                                 mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try create worker, failed: maximum number of worker created per second reached, current count = %d",
542                                         GUINT_TO_POINTER (MONO_NATIVE_THREAD_ID_TO_UINT (mono_native_thread_id_get ())), worker.worker_creation_current_count);
543                                 mono_coop_mutex_unlock (&worker.worker_creation_lock);
544                                 return FALSE;
545                         }
546                 }
547         }
548
549         COUNTER_ATOMIC (counter, {
550                 if (counter._.working >= counter._.max_working) {
551                         mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try create worker, failed: maximum number of working threads reached",
552                                 GUINT_TO_POINTER (MONO_NATIVE_THREAD_ID_TO_UINT (mono_native_thread_id_get ())));
553                         mono_coop_mutex_unlock (&worker.worker_creation_lock);
554                         return FALSE;
555                 }
556                 counter._.starting ++;
557         });
558
559         thread = mono_thread_create_internal (mono_get_root_domain (), worker_thread, NULL, MONO_THREAD_CREATE_FLAGS_THREADPOOL, &error);
560         if (!thread) {
561                 mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try create worker, failed: could not create thread due to %s",
562                         GUINT_TO_POINTER (MONO_NATIVE_THREAD_ID_TO_UINT (mono_native_thread_id_get ())), mono_error_get_message (&error));
563                 mono_error_cleanup (&error);
564
565                 COUNTER_ATOMIC (counter, {
566                         counter._.starting --;
567                 });
568
569                 mono_coop_mutex_unlock (&worker.worker_creation_lock);
570
571                 return FALSE;
572         }
573
574         worker.worker_creation_current_count += 1;
575
576         mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] try create worker, created %p, now = %d count = %d",
577                 GUINT_TO_POINTER (MONO_NATIVE_THREAD_ID_TO_UINT (mono_native_thread_id_get ())), (gpointer) thread->tid, now, worker.worker_creation_current_count);
578
579         mono_coop_mutex_unlock (&worker.worker_creation_lock);
580         return TRUE;
581 }
582
583 static void monitor_ensure_running (void);
584
585 static void
586 worker_request (void)
587 {
588         if (worker.suspended)
589                 return;
590
591         monitor_ensure_running ();
592
593         if (worker_try_unpark ()) {
594                 mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] request worker, unparked",
595                         GUINT_TO_POINTER (MONO_NATIVE_THREAD_ID_TO_UINT (mono_native_thread_id_get ())));
596                 return;
597         }
598
599         if (worker_try_create ()) {
600                 mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] request worker, created",
601                         GUINT_TO_POINTER (MONO_NATIVE_THREAD_ID_TO_UINT (mono_native_thread_id_get ())));
602                 return;
603         }
604
605         mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] request worker, failed",
606                 GUINT_TO_POINTER (MONO_NATIVE_THREAD_ID_TO_UINT (mono_native_thread_id_get ())));
607 }
608
609 static gboolean
610 monitor_should_keep_running (void)
611 {
612         static gint64 last_should_keep_running = -1;
613
614         g_assert (worker.monitor_status == MONITOR_STATUS_WAITING_FOR_REQUEST || worker.monitor_status == MONITOR_STATUS_REQUESTED);
615
616         if (InterlockedExchange (&worker.monitor_status, MONITOR_STATUS_WAITING_FOR_REQUEST) == MONITOR_STATUS_WAITING_FOR_REQUEST) {
617                 gboolean should_keep_running = TRUE, force_should_keep_running = FALSE;
618
619                 if (mono_runtime_is_shutting_down ()) {
620                         should_keep_running = FALSE;
621                 } else {
622                         if (work_item_count () == 0)
623                                 should_keep_running = FALSE;
624
625                         if (!should_keep_running) {
626                                 if (last_should_keep_running == -1 || mono_100ns_ticks () - last_should_keep_running < MONITOR_MINIMAL_LIFETIME * 1000 * 10) {
627                                         should_keep_running = force_should_keep_running = TRUE;
628                                 }
629                         }
630                 }
631
632                 if (should_keep_running) {
633                         if (last_should_keep_running == -1 || !force_should_keep_running)
634                                 last_should_keep_running = mono_100ns_ticks ();
635                 } else {
636                         last_should_keep_running = -1;
637                         if (InterlockedCompareExchange (&worker.monitor_status, MONITOR_STATUS_NOT_RUNNING, MONITOR_STATUS_WAITING_FOR_REQUEST) == MONITOR_STATUS_WAITING_FOR_REQUEST)
638                                 return FALSE;
639                 }
640         }
641
642         g_assert (worker.monitor_status == MONITOR_STATUS_WAITING_FOR_REQUEST || worker.monitor_status == MONITOR_STATUS_REQUESTED);
643
644         return TRUE;
645 }
646
647 static gboolean
648 monitor_sufficient_delay_since_last_dequeue (void)
649 {
650         gint64 threshold;
651
652         if (worker.cpu_usage < CPU_USAGE_LOW) {
653                 threshold = MONITOR_INTERVAL;
654         } else {
655                 ThreadPoolWorkerCounter counter;
656                 counter = COUNTER_READ ();
657                 threshold = counter._.max_working * MONITOR_INTERVAL * 2;
658         }
659
660         return mono_msec_ticks () >= worker.heuristic_last_dequeue + threshold;
661 }
662
663 static void hill_climbing_force_change (gint16 new_thread_count, ThreadPoolHeuristicStateTransition transition);
664
665 static gsize WINAPI
666 monitor_thread (gpointer unused)
667 {
668         MonoInternalThread *internal;
669         guint i;
670
671         if (!mono_refcount_tryinc (&worker))
672                 return 0;
673
674         internal = mono_thread_internal_current ();
675         g_assert (internal);
676
677         mono_cpu_usage (worker.cpu_usage_state);
678
679         // printf ("monitor_thread: start\n");
680
681         mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] monitor thread, started",
682                 GUINT_TO_POINTER (MONO_NATIVE_THREAD_ID_TO_UINT (mono_native_thread_id_get ())));
683
684         do {
685                 ThreadPoolWorkerCounter counter;
686                 gboolean limit_worker_max_reached;
687                 gint32 interval_left = MONITOR_INTERVAL;
688                 gint32 awake = 0; /* number of spurious awakes we tolerate before doing a round of rebalancing */
689
690                 g_assert (worker.monitor_status != MONITOR_STATUS_NOT_RUNNING);
691
692                 // counter = COUNTER_READ ();
693                 // printf ("monitor_thread: starting = %d working = %d parked = %d max_working = %d\n",
694                 //      counter._.starting, counter._.working, counter._.parked, counter._.max_working);
695
696                 do {
697                         gint64 ts;
698                         gboolean alerted = FALSE;
699
700                         if (mono_runtime_is_shutting_down ())
701                                 break;
702
703                         ts = mono_msec_ticks ();
704                         if (mono_thread_info_sleep (interval_left, &alerted) == 0)
705                                 break;
706                         interval_left -= mono_msec_ticks () - ts;
707
708                         mono_thread_interruption_checkpoint ();
709                 } while (interval_left > 0 && ++awake < 10);
710
711                 if (mono_runtime_is_shutting_down ())
712                         continue;
713
714                 if (worker.suspended)
715                         continue;
716
717                 if (work_item_count () == 0)
718                         continue;
719
720                 worker.cpu_usage = mono_cpu_usage (worker.cpu_usage_state);
721
722                 if (!monitor_sufficient_delay_since_last_dequeue ())
723                         continue;
724
725                 limit_worker_max_reached = FALSE;
726
727                 COUNTER_ATOMIC (counter, {
728                         if (counter._.max_working >= worker.limit_worker_max) {
729                                 limit_worker_max_reached = TRUE;
730                                 break;
731                         }
732                         counter._.max_working ++;
733                 });
734
735                 if (limit_worker_max_reached)
736                         continue;
737
738                 hill_climbing_force_change (counter._.max_working, TRANSITION_STARVATION);
739
740                 for (i = 0; i < 5; ++i) {
741                         if (mono_runtime_is_shutting_down ())
742                                 break;
743
744                         if (worker_try_unpark ()) {
745                                 mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] monitor thread, unparked",
746                                         GUINT_TO_POINTER (MONO_NATIVE_THREAD_ID_TO_UINT (mono_native_thread_id_get ())));
747                                 break;
748                         }
749
750                         if (worker_try_create ()) {
751                                 mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] monitor thread, created",
752                                         GUINT_TO_POINTER (MONO_NATIVE_THREAD_ID_TO_UINT (mono_native_thread_id_get ())));
753                                 break;
754                         }
755                 }
756         } while (monitor_should_keep_running ());
757
758         // printf ("monitor_thread: stop\n");
759
760         mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] monitor thread, finished",
761                 GUINT_TO_POINTER (MONO_NATIVE_THREAD_ID_TO_UINT (mono_native_thread_id_get ())));
762
763         mono_refcount_dec (&worker);
764         return 0;
765 }
766
767 static void
768 monitor_ensure_running (void)
769 {
770         MonoError error;
771         for (;;) {
772                 switch (worker.monitor_status) {
773                 case MONITOR_STATUS_REQUESTED:
774                         // printf ("monitor_thread: requested\n");
775                         return;
776                 case MONITOR_STATUS_WAITING_FOR_REQUEST:
777                         // printf ("monitor_thread: waiting for request\n");
778                         InterlockedCompareExchange (&worker.monitor_status, MONITOR_STATUS_REQUESTED, MONITOR_STATUS_WAITING_FOR_REQUEST);
779                         break;
780                 case MONITOR_STATUS_NOT_RUNNING:
781                         // printf ("monitor_thread: not running\n");
782                         if (mono_runtime_is_shutting_down ())
783                                 return;
784                         if (InterlockedCompareExchange (&worker.monitor_status, MONITOR_STATUS_REQUESTED, MONITOR_STATUS_NOT_RUNNING) == MONITOR_STATUS_NOT_RUNNING) {
785                                 // printf ("monitor_thread: creating\n");
786                                 if (!mono_thread_create_internal (mono_get_root_domain (), monitor_thread, NULL, MONO_THREAD_CREATE_FLAGS_THREADPOOL | MONO_THREAD_CREATE_FLAGS_SMALL_STACK, &error)) {
787                                         // printf ("monitor_thread: creating failed\n");
788                                         worker.monitor_status = MONITOR_STATUS_NOT_RUNNING;
789                                         mono_error_cleanup (&error);
790                                         mono_refcount_dec (&worker);
791                                 }
792                                 return;
793                         }
794                         break;
795                 default: g_assert_not_reached ();
796                 }
797         }
798 }
799
800 static void
801 hill_climbing_change_thread_count (gint16 new_thread_count, ThreadPoolHeuristicStateTransition transition)
802 {
803         ThreadPoolHillClimbing *hc;
804
805         hc = &worker.heuristic_hill_climbing;
806
807         mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] hill climbing, change max number of threads %d",
808                 GUINT_TO_POINTER (MONO_NATIVE_THREAD_ID_TO_UINT (mono_native_thread_id_get ())), new_thread_count);
809
810         hc->last_thread_count = new_thread_count;
811         hc->current_sample_interval = rand_next (&hc->random_interval_generator, hc->sample_interval_low, hc->sample_interval_high);
812         hc->elapsed_since_last_change = 0;
813         hc->completions_since_last_change = 0;
814 }
815
816 static void
817 hill_climbing_force_change (gint16 new_thread_count, ThreadPoolHeuristicStateTransition transition)
818 {
819         ThreadPoolHillClimbing *hc;
820
821         hc = &worker.heuristic_hill_climbing;
822
823         if (new_thread_count != hc->last_thread_count) {
824                 hc->current_control_setting += new_thread_count - hc->last_thread_count;
825                 hill_climbing_change_thread_count (new_thread_count, transition);
826         }
827 }
828
829 static double_complex
830 hill_climbing_get_wave_component (gdouble *samples, guint sample_count, gdouble period)
831 {
832         ThreadPoolHillClimbing *hc;
833         gdouble w, cosine, sine, coeff, q0, q1, q2;
834         guint i;
835
836         g_assert (sample_count >= period);
837         g_assert (period >= 2);
838
839         hc = &worker.heuristic_hill_climbing;
840
841         w = 2.0 * M_PI / period;
842         cosine = cos (w);
843         sine = sin (w);
844         coeff = 2.0 * cosine;
845         q0 = q1 = q2 = 0;
846
847         for (i = 0; i < sample_count; ++i) {
848                 q0 = coeff * q1 - q2 + samples [(hc->total_samples - sample_count + i) % hc->samples_to_measure];
849                 q2 = q1;
850                 q1 = q0;
851         }
852
853         return mono_double_complex_scalar_div (mono_double_complex_make (q1 - q2 * cosine, (q2 * sine)), ((gdouble)sample_count));
854 }
855
856 static gint16
857 hill_climbing_update (gint16 current_thread_count, guint32 sample_duration, gint32 completions, gint64 *adjustment_interval)
858 {
859         ThreadPoolHillClimbing *hc;
860         ThreadPoolHeuristicStateTransition transition;
861         gdouble throughput;
862         gdouble throughput_error_estimate;
863         gdouble confidence;
864         gdouble move;
865         gdouble gain;
866         gint sample_index;
867         gint sample_count;
868         gint new_thread_wave_magnitude;
869         gint new_thread_count;
870         double_complex thread_wave_component;
871         double_complex throughput_wave_component;
872         double_complex ratio;
873
874         g_assert (adjustment_interval);
875
876         hc = &worker.heuristic_hill_climbing;
877
878         /* If someone changed the thread count without telling us, update our records accordingly. */
879         if (current_thread_count != hc->last_thread_count)
880                 hill_climbing_force_change (current_thread_count, TRANSITION_INITIALIZING);
881
882         /* Update the cumulative stats for this thread count */
883         hc->elapsed_since_last_change += sample_duration;
884         hc->completions_since_last_change += completions;
885
886         /* Add in any data we've already collected about this sample */
887         sample_duration += hc->accumulated_sample_duration;
888         completions += hc->accumulated_completion_count;
889
890         /* We need to make sure we're collecting reasonably accurate data. Since we're just counting the end
891          * of each work item, we are goinng to be missing some data about what really happened during the
892          * sample interval. The count produced by each thread includes an initial work item that may have
893          * started well before the start of the interval, and each thread may have been running some new
894          * work item for some time before the end of the interval, which did not yet get counted. So
895          * our count is going to be off by +/- threadCount workitems.
896          *
897          * The exception is that the thread that reported to us last time definitely wasn't running any work
898          * at that time, and the thread that's reporting now definitely isn't running a work item now. So
899          * we really only need to consider threadCount-1 threads.
900          *
901          * Thus the percent error in our count is +/- (threadCount-1)/numCompletions.
902          *
903          * We cannot rely on the frequency-domain analysis we'll be doing later to filter out this error, because
904          * of the way it accumulates over time. If this sample is off by, say, 33% in the negative direction,
905          * then the next one likely will be too. The one after that will include the sum of the completions
906          * we missed in the previous samples, and so will be 33% positive. So every three samples we'll have
907          * two "low" samples and one "high" sample. This will appear as periodic variation right in the frequency
908          * range we're targeting, which will not be filtered by the frequency-domain translation. */
909         if (hc->total_samples > 0 && ((current_thread_count - 1.0) / completions) >= hc->max_sample_error) {
910                 /* Not accurate enough yet. Let's accumulate the data so
911                  * far, and tell the ThreadPoolWorker to collect a little more. */
912                 hc->accumulated_sample_duration = sample_duration;
913                 hc->accumulated_completion_count = completions;
914                 *adjustment_interval = 10;
915                 return current_thread_count;
916         }
917
918         /* We've got enouugh data for our sample; reset our accumulators for next time. */
919         hc->accumulated_sample_duration = 0;
920         hc->accumulated_completion_count = 0;
921
922         /* Add the current thread count and throughput sample to our history. */
923         throughput = ((gdouble) completions) / sample_duration;
924
925         sample_index = hc->total_samples % hc->samples_to_measure;
926         hc->samples [sample_index] = throughput;
927         hc->thread_counts [sample_index] = current_thread_count;
928         hc->total_samples ++;
929
930         /* Set up defaults for our metrics. */
931         thread_wave_component = mono_double_complex_make(0, 0);
932         throughput_wave_component = mono_double_complex_make(0, 0);
933         throughput_error_estimate = 0;
934         ratio = mono_double_complex_make(0, 0);
935         confidence = 0;
936
937         transition = TRANSITION_WARMUP;
938
939         /* How many samples will we use? It must be at least the three wave periods we're looking for, and it must also
940          * be a whole multiple of the primary wave's period; otherwise the frequency we're looking for will fall between
941          * two frequency bands in the Fourier analysis, and we won't be able to measure it accurately. */
942         sample_count = ((gint) MIN (hc->total_samples - 1, hc->samples_to_measure) / hc->wave_period) * hc->wave_period;
943
944         if (sample_count > hc->wave_period) {
945                 guint i;
946                 gdouble average_throughput;
947                 gdouble average_thread_count;
948                 gdouble sample_sum = 0;
949                 gdouble thread_sum = 0;
950
951                 /* Average the throughput and thread count samples, so we can scale the wave magnitudes later. */
952                 for (i = 0; i < sample_count; ++i) {
953                         guint j = (hc->total_samples - sample_count + i) % hc->samples_to_measure;
954                         sample_sum += hc->samples [j];
955                         thread_sum += hc->thread_counts [j];
956                 }
957
958                 average_throughput = sample_sum / sample_count;
959                 average_thread_count = thread_sum / sample_count;
960
961                 if (average_throughput > 0 && average_thread_count > 0) {
962                         gdouble noise_for_confidence, adjacent_period_1, adjacent_period_2;
963
964                         /* Calculate the periods of the adjacent frequency bands we'll be using to
965                          * measure noise levels. We want the two adjacent Fourier frequency bands. */
966                         adjacent_period_1 = sample_count / (((gdouble) sample_count) / ((gdouble) hc->wave_period) + 1);
967                         adjacent_period_2 = sample_count / (((gdouble) sample_count) / ((gdouble) hc->wave_period) - 1);
968
969                         /* Get the the three different frequency components of the throughput (scaled by average
970                          * throughput). Our "error" estimate (the amount of noise that might be present in the
971                          * frequency band we're really interested in) is the average of the adjacent bands. */
972                         throughput_wave_component = mono_double_complex_scalar_div (hill_climbing_get_wave_component (hc->samples, sample_count, hc->wave_period), average_throughput);
973                         throughput_error_estimate = cabs (mono_double_complex_scalar_div (hill_climbing_get_wave_component (hc->samples, sample_count, adjacent_period_1), average_throughput));
974
975                         if (adjacent_period_2 <= sample_count) {
976                                 throughput_error_estimate = MAX (throughput_error_estimate, cabs (mono_double_complex_scalar_div (hill_climbing_get_wave_component (
977                                         hc->samples, sample_count, adjacent_period_2), average_throughput)));
978                         }
979
980                         /* Do the same for the thread counts, so we have something to compare to. We don't
981                          * measure thread count noise, because there is none; these are exact measurements. */
982                         thread_wave_component = mono_double_complex_scalar_div (hill_climbing_get_wave_component (hc->thread_counts, sample_count, hc->wave_period), average_thread_count);
983
984                         /* Update our moving average of the throughput noise. We'll use this
985                          * later as feedback to determine the new size of the thread wave. */
986                         if (hc->average_throughput_noise == 0) {
987                                 hc->average_throughput_noise = throughput_error_estimate;
988                         } else {
989                                 hc->average_throughput_noise = (hc->throughput_error_smoothing_factor * throughput_error_estimate)
990                                         + ((1.0 + hc->throughput_error_smoothing_factor) * hc->average_throughput_noise);
991                         }
992
993                         if (cabs (thread_wave_component) > 0) {
994                                 /* Adjust the throughput wave so it's centered around the target wave,
995                                  * and then calculate the adjusted throughput/thread ratio. */
996                                 ratio = mono_double_complex_div (mono_double_complex_sub (throughput_wave_component, mono_double_complex_scalar_mul(thread_wave_component, hc->target_throughput_ratio)), thread_wave_component);
997                                 transition = TRANSITION_CLIMBING_MOVE;
998                         } else {
999                                 ratio = mono_double_complex_make (0, 0);
1000                                 transition = TRANSITION_STABILIZING;
1001                         }
1002
1003                         noise_for_confidence = MAX (hc->average_throughput_noise, throughput_error_estimate);
1004                         if (noise_for_confidence > 0) {
1005                                 confidence = cabs (thread_wave_component) / noise_for_confidence / hc->target_signal_to_noise_ratio;
1006                         } else {
1007                                 /* there is no noise! */
1008                                 confidence = 1.0;
1009                         }
1010                 }
1011         }
1012
1013         /* We use just the real part of the complex ratio we just calculated. If the throughput signal
1014          * is exactly in phase with the thread signal, this will be the same as taking the magnitude of
1015          * the complex move and moving that far up. If they're 180 degrees out of phase, we'll move
1016          * backward (because this indicates that our changes are having the opposite of the intended effect).
1017          * If they're 90 degrees out of phase, we won't move at all, because we can't tell wether we're
1018          * having a negative or positive effect on throughput. */
1019         move = creal (ratio);
1020         move = CLAMP (move, -1.0, 1.0);
1021
1022         /* Apply our confidence multiplier. */
1023         move *= CLAMP (confidence, -1.0, 1.0);
1024
1025         /* Now apply non-linear gain, such that values around zero are attenuated, while higher values
1026          * are enhanced. This allows us to move quickly if we're far away from the target, but more slowly
1027         * if we're getting close, giving us rapid ramp-up without wild oscillations around the target. */
1028         gain = hc->max_change_per_second * sample_duration;
1029         move = pow (fabs (move), hc->gain_exponent) * (move >= 0.0 ? 1 : -1) * gain;
1030         move = MIN (move, hc->max_change_per_sample);
1031
1032         /* If the result was positive, and CPU is > 95%, refuse the move. */
1033         if (move > 0.0 && worker.cpu_usage > CPU_USAGE_HIGH)
1034                 move = 0.0;
1035
1036         /* Apply the move to our control setting. */
1037         hc->current_control_setting += move;
1038
1039         /* Calculate the new thread wave magnitude, which is based on the moving average we've been keeping of the
1040          * throughput error.  This average starts at zero, so we'll start with a nice safe little wave at first. */
1041         new_thread_wave_magnitude = (gint)(0.5 + (hc->current_control_setting * hc->average_throughput_noise
1042                 * hc->target_signal_to_noise_ratio * hc->thread_magnitude_multiplier * 2.0));
1043         new_thread_wave_magnitude = CLAMP (new_thread_wave_magnitude, 1, hc->max_thread_wave_magnitude);
1044
1045         /* Make sure our control setting is within the ThreadPoolWorker's limits. */
1046         hc->current_control_setting = CLAMP (hc->current_control_setting, worker.limit_worker_min, worker.limit_worker_max - new_thread_wave_magnitude);
1047
1048         /* Calculate the new thread count (control setting + square wave). */
1049         new_thread_count = (gint)(hc->current_control_setting + new_thread_wave_magnitude * ((hc->total_samples / (hc->wave_period / 2)) % 2));
1050
1051         /* Make sure the new thread count doesn't exceed the ThreadPoolWorker's limits. */
1052         new_thread_count = CLAMP (new_thread_count, worker.limit_worker_min, worker.limit_worker_max);
1053
1054         if (new_thread_count != current_thread_count)
1055                 hill_climbing_change_thread_count (new_thread_count, transition);
1056
1057         if (creal (ratio) < 0.0 && new_thread_count == worker.limit_worker_min)
1058                 *adjustment_interval = (gint)(0.5 + hc->current_sample_interval * (10.0 * MAX (-1.0 * creal (ratio), 1.0)));
1059         else
1060                 *adjustment_interval = hc->current_sample_interval;
1061
1062         return new_thread_count;
1063 }
1064
1065 static gboolean
1066 heuristic_should_adjust (void)
1067 {
1068         if (worker.heuristic_last_dequeue > worker.heuristic_last_adjustment + worker.heuristic_adjustment_interval) {
1069                 ThreadPoolWorkerCounter counter;
1070                 counter = COUNTER_READ ();
1071                 if (counter._.working <= counter._.max_working)
1072                         return TRUE;
1073         }
1074
1075         return FALSE;
1076 }
1077
1078 static void
1079 heuristic_adjust (void)
1080 {
1081         if (mono_coop_mutex_trylock (&worker.heuristic_lock) == 0) {
1082                 gint32 completions = InterlockedExchange (&worker.heuristic_completions, 0);
1083                 gint64 sample_end = mono_msec_ticks ();
1084                 gint64 sample_duration = sample_end - worker.heuristic_sample_start;
1085
1086                 if (sample_duration >= worker.heuristic_adjustment_interval / 2) {
1087                         ThreadPoolWorkerCounter counter;
1088                         gint16 new_thread_count;
1089
1090                         counter = COUNTER_READ ();
1091                         new_thread_count = hill_climbing_update (counter._.max_working, sample_duration, completions, &worker.heuristic_adjustment_interval);
1092
1093                         COUNTER_ATOMIC (counter, {
1094                                 counter._.max_working = new_thread_count;
1095                         });
1096
1097                         if (new_thread_count > counter._.max_working)
1098                                 worker_request ();
1099
1100                         worker.heuristic_sample_start = sample_end;
1101                         worker.heuristic_last_adjustment = mono_msec_ticks ();
1102                 }
1103
1104                 mono_coop_mutex_unlock (&worker.heuristic_lock);
1105         }
1106 }
1107
1108 static void
1109 heuristic_notify_work_completed (void)
1110 {
1111         InterlockedIncrement (&worker.heuristic_completions);
1112         worker.heuristic_last_dequeue = mono_msec_ticks ();
1113
1114         if (heuristic_should_adjust ())
1115                 heuristic_adjust ();
1116 }
1117
1118 gboolean
1119 mono_threadpool_worker_notify_completed (void)
1120 {
1121         ThreadPoolWorkerCounter counter;
1122
1123         heuristic_notify_work_completed ();
1124
1125         counter = COUNTER_READ ();
1126         return counter._.working <= counter._.max_working;
1127 }
1128
1129 gint32
1130 mono_threadpool_worker_get_min (void)
1131 {
1132         gint32 ret;
1133
1134         if (!mono_refcount_tryinc (&worker))
1135                 return 0;
1136
1137         ret = worker.limit_worker_min;
1138
1139         mono_refcount_dec (&worker);
1140         return ret;
1141 }
1142
1143 gboolean
1144 mono_threadpool_worker_set_min (gint32 value)
1145 {
1146         if (value <= 0 || value > worker.limit_worker_max)
1147                 return FALSE;
1148
1149         if (!mono_refcount_tryinc (&worker))
1150                 return FALSE;
1151
1152         worker.limit_worker_min = value;
1153
1154         mono_refcount_dec (&worker);
1155         return TRUE;
1156 }
1157
1158 gint32
1159 mono_threadpool_worker_get_max (void)
1160 {
1161         gint32 ret;
1162
1163         if (!mono_refcount_tryinc (&worker))
1164                 return 0;
1165
1166         ret = worker.limit_worker_max;
1167
1168         mono_refcount_dec (&worker);
1169         return ret;
1170 }
1171
1172 gboolean
1173 mono_threadpool_worker_set_max (gint32 value)
1174 {
1175         gint32 cpu_count;
1176
1177         cpu_count = mono_cpu_count ();
1178         if (value < worker.limit_worker_min || value < cpu_count)
1179                 return FALSE;
1180
1181         if (!mono_refcount_tryinc (&worker))
1182                 return FALSE;
1183
1184         worker.limit_worker_max = value;
1185
1186         mono_refcount_dec (&worker);
1187         return TRUE;
1188 }
1189
1190 void
1191 mono_threadpool_worker_set_suspended (gboolean suspended)
1192 {
1193         if (!mono_refcount_tryinc (&worker))
1194                 return;
1195
1196         worker.suspended = suspended;
1197         if (!suspended)
1198                 worker_request ();
1199
1200         mono_refcount_dec (&worker);
1201 }