2 * threadpool.c: global thread pool
5 * Dietmar Maurer (dietmar@ximian.com)
6 * Gonzalo Paniagua Javier (gonzalo@ximian.com)
8 * Copyright 2001-2003 Ximian, Inc (http://www.ximian.com)
9 * Copyright 2004-2010 Novell, Inc (http://www.novell.com)
10 * Copyright 2001 Xamarin Inc (http://www.xamarin.com)
16 #include <mono/metadata/profiler-private.h>
17 #include <mono/metadata/threads.h>
18 #include <mono/metadata/threads-types.h>
19 #include <mono/metadata/threadpool-internals.h>
20 #include <mono/metadata/exception.h>
21 #include <mono/metadata/environment.h>
22 #include <mono/metadata/mono-mlist.h>
23 #include <mono/metadata/mono-perfcounters.h>
24 #include <mono/metadata/socket-io.h>
25 #include <mono/metadata/mono-cq.h>
26 #include <mono/metadata/mono-wsq.h>
27 #include <mono/metadata/mono-ptr-array.h>
28 #include <mono/io-layer/io-layer.h>
29 #include <mono/utils/mono-time.h>
30 #include <mono/utils/mono-proclib.h>
31 #include <mono/utils/mono-semaphore.h>
33 #ifdef HAVE_SYS_TIME_H
36 #include <sys/types.h>
42 #ifdef HAVE_SYS_SOCKET_H
43 #include <sys/socket.h>
45 #include <mono/utils/mono-poll.h>
47 #include <sys/epoll.h>
50 #include <sys/event.h>
54 #ifndef DISABLE_SOCKETS
55 #include "mono/io-layer/socket-wrappers.h"
58 #include "threadpool.h"
60 #define THREAD_WANTS_A_BREAK(t) ((t->state & (ThreadState_StopRequested | \
61 ThreadState_SuspendRequested)) != 0)
63 #define SPIN_TRYLOCK(i) (InterlockedCompareExchange (&(i), 1, 0) == 0)
64 #define SPIN_LOCK(i) do { \
65 if (SPIN_TRYLOCK (i)) \
69 #define SPIN_UNLOCK(i) i = 0
70 #define SMALL_STACK (128 * (sizeof (gpointer) / 4) * 1024)
72 /* DEBUG: prints tp data every 2s */
75 /* mono_thread_pool_init called */
76 static volatile int tp_inited;
85 CRITICAL_SECTION io_lock; /* access to sock_to_state */
86 int inited; // 0 -> not initialized , 1->initializing, 2->initialized, 3->cleaned up
87 MonoGHashTable *sock_to_state;
91 void (*modify) (gpointer event_data, int fd, int operation, int events, gboolean is_new);
92 void (*wait) (gpointer sock_data);
93 void (*shutdown) (gpointer event_data);
96 static SocketIOData socket_io_data;
98 /* Keep in sync with the System.MonoAsyncCall class which provides GC tracking */
101 MonoMethodMessage *msg;
102 MonoMethod *cb_method;
103 MonoDelegate *cb_target;
111 MonoCQ *queue; /* GC root */
113 volatile gint waiting; /* threads waiting for a work item */
116 volatile gint pool_status; /* 0 -> not initialized, 1 -> initialized, 2 -> cleaning up */
117 /* min, max, n and busy -> Interlocked */
118 volatile gint min_threads;
119 volatile gint max_threads;
120 volatile gint nthreads;
121 volatile gint busy_threads;
123 void (*async_invoke) (gpointer data);
124 void *pc_nitems; /* Performance counter for total number of items in added */
125 void *pc_nthreads; /* Performance counter for total number of active threads */
127 volatile gint destroy_thread;
128 volatile gint ignore_times; /* Used when there's a thread being created or destroyed */
129 volatile gint sp_lock; /* spin lock used to protect ignore_times */
130 volatile gint64 last_check;
131 volatile gint64 time_sum;
137 static ThreadPool async_tp;
138 static ThreadPool async_io_tp;
140 static void async_invoke_thread (gpointer data);
141 static MonoObject *mono_async_invoke (ThreadPool *tp, MonoAsyncResult *ares);
142 static void threadpool_free_queue (ThreadPool *tp);
143 static void threadpool_append_job (ThreadPool *tp, MonoObject *ar);
144 static void threadpool_append_jobs (ThreadPool *tp, MonoObject **jobs, gint njobs);
145 static void threadpool_init (ThreadPool *tp, int min_threads, int max_threads, void (*async_invoke) (gpointer));
146 static void threadpool_start_idle_threads (ThreadPool *tp);
147 static void threadpool_kill_idle_threads (ThreadPool *tp);
148 static gboolean threadpool_start_thread (ThreadPool *tp);
149 static void monitor_thread (gpointer data);
150 static void socket_io_cleanup (SocketIOData *data);
151 static MonoObject *get_io_event (MonoMList **list, gint event);
152 static int get_events_from_list (MonoMList *list);
153 static int get_event_from_state (MonoSocketAsyncResult *state);
155 static MonoClass *async_call_klass;
156 static MonoClass *socket_async_call_klass;
157 static MonoClass *process_async_call_klass;
159 static GPtrArray *wsqs;
160 CRITICAL_SECTION wsqs_lock;
163 static MonoThreadPoolFunc tp_start_func;
164 static MonoThreadPoolFunc tp_finish_func;
165 static gpointer tp_hooks_user_data;
166 static MonoThreadPoolItemFunc tp_item_begin_func;
167 static MonoThreadPoolItemFunc tp_item_end_func;
168 static gpointer tp_item_user_data;
178 AIO_OP_RECV_JUST_CALLBACK,
179 AIO_OP_SEND_JUST_CALLBACK,
183 AIO_OP_ACCEPTRECEIVE,
184 AIO_OP_RECEIVE_BUFFERS,
189 #include <mono/metadata/tpool-poll.c>
191 #include <mono/metadata/tpool-epoll.c>
192 #elif defined(HAVE_KQUEUE)
193 #include <mono/metadata/tpool-kqueue.c>
196 * Functions to check whenever a class is given system class. We need to cache things in MonoDomain since some of the
197 * assemblies can be unloaded.
201 is_system_type (MonoDomain *domain, MonoClass *klass)
203 if (domain->system_image == NULL)
204 domain->system_image = mono_image_loaded ("System");
206 return klass->image == domain->system_image;
210 is_corlib_type (MonoDomain *domain, MonoClass *klass)
212 return klass->image == mono_defaults.corlib;
216 * Note that we call it is_socket_type() where 'socket' refers to the image
217 * that contains the System.Net.Sockets.Socket type.
218 * For moonlight there is a System.Net.Sockets.Socket class in both System.dll and System.Net.dll.
221 is_socket_type (MonoDomain *domain, MonoClass *klass)
223 static const char *version = NULL;
224 static gboolean moonlight;
226 if (is_system_type (domain, klass))
229 /* If moonlight, check if the type is in System.Net.dll too */
230 if (version == NULL) {
231 version = mono_get_runtime_info ()->framework_version;
232 moonlight = !strcmp (version, "2.1");
238 if (domain->system_net_dll == NULL)
239 domain->system_net_dll = mono_image_loaded ("System.Net");
241 return klass->image == domain->system_net_dll;
244 #define check_type_cached(domain, ASSEMBLY, _class, _namespace, _name, loc) do { \
246 return *loc == _class; \
247 if (is_##ASSEMBLY##_type (domain, _class) && !strcmp (_name, _class->name) && !strcmp (_namespace, _class->name_space)) { \
254 #define check_corlib_type_cached(domain, _class, _namespace, _name, loc) check_type_cached (domain, corlib, _class, _namespace, _name, loc)
256 #define check_socket_type_cached(domain, _class, _namespace, _name, loc) check_type_cached (domain, socket, _class, _namespace, _name, loc)
258 #define check_system_type_cached(domain, _class, _namespace, _name, loc) check_type_cached (domain, system, _class, _namespace, _name, loc)
261 is_corlib_asyncresult (MonoDomain *domain, MonoClass *klass)
263 check_corlib_type_cached (domain, klass, "System.Runtime.Remoting.Messaging", "AsyncResult", &domain->corlib_asyncresult_class);
267 is_socket (MonoDomain *domain, MonoClass *klass)
269 check_socket_type_cached (domain, klass, "System.Net.Sockets", "Socket", &domain->socket_class);
273 is_socketasyncresult (MonoDomain *domain, MonoClass *klass)
275 return (klass->nested_in &&
276 is_socket (domain, klass->nested_in) &&
277 !strcmp (klass->name, "SocketAsyncResult"));
281 is_socketasynccall (MonoDomain *domain, MonoClass *klass)
283 return (klass->nested_in &&
284 is_socket (domain, klass->nested_in) &&
285 !strcmp (klass->name, "SocketAsyncCall"));
289 is_appdomainunloaded_exception (MonoDomain *domain, MonoClass *klass)
291 check_corlib_type_cached (domain, klass, "System", "AppDomainUnloadedException", &domain->ad_unloaded_ex_class);
295 is_sd_process (MonoDomain *domain, MonoClass *klass)
297 check_system_type_cached (domain, klass, "System.Diagnostics", "Process", &domain->process_class);
301 is_sdp_asyncreadhandler (MonoDomain *domain, MonoClass *klass)
304 return (klass->nested_in &&
305 is_sd_process (domain, klass->nested_in) &&
306 !strcmp (klass->name, "AsyncReadHandler"));
310 #ifdef DISABLE_SOCKETS
312 #define socket_io_cleanup(x)
315 get_event_from_state (MonoSocketAsyncResult *state)
317 g_assert_not_reached ();
322 get_events_from_list (MonoMList *list)
330 socket_io_cleanup (SocketIOData *data)
332 EnterCriticalSection (&data->io_lock);
333 if (data->inited != 2) {
334 LeaveCriticalSection (&data->io_lock);
338 data->shutdown (data->event_data);
339 LeaveCriticalSection (&data->io_lock);
343 get_event_from_state (MonoSocketAsyncResult *state)
345 switch (state->operation) {
348 case AIO_OP_RECV_JUST_CALLBACK:
349 case AIO_OP_RECEIVEFROM:
350 case AIO_OP_READPIPE:
351 case AIO_OP_ACCEPTRECEIVE:
352 case AIO_OP_RECEIVE_BUFFERS:
355 case AIO_OP_SEND_JUST_CALLBACK:
358 case AIO_OP_SEND_BUFFERS:
359 case AIO_OP_DISCONNECT:
361 default: /* Should never happen */
362 g_message ("get_event_from_state: unknown value in switch!!!");
368 get_events_from_list (MonoMList *list)
370 MonoSocketAsyncResult *state;
373 while (list && (state = (MonoSocketAsyncResult *)mono_mlist_get_data (list))) {
374 events |= get_event_from_state (state);
375 list = mono_mlist_next (list);
381 #define ICALL_RECV(x) ves_icall_System_Net_Sockets_Socket_Receive_internal (\
382 (SOCKET)(gssize)x->handle, x->buffer, x->offset, x->size,\
383 x->socket_flags, &x->error);
385 #define ICALL_SEND(x) ves_icall_System_Net_Sockets_Socket_Send_internal (\
386 (SOCKET)(gssize)x->handle, x->buffer, x->offset, x->size,\
387 x->socket_flags, &x->error);
389 #endif /* !DISABLE_SOCKETS */
392 threadpool_jobs_inc (MonoObject *obj)
395 InterlockedIncrement (&obj->vtable->domain->threadpool_jobs);
399 threadpool_jobs_dec (MonoObject *obj)
407 domain = obj->vtable->domain;
408 remaining_jobs = InterlockedDecrement (&domain->threadpool_jobs);
409 if (remaining_jobs == 0 && domain->cleanup_semaphore) {
410 ReleaseSemaphore (domain->cleanup_semaphore, 1, NULL);
417 get_io_event (MonoMList **list, gint event)
427 state = mono_mlist_get_data (current);
428 if (get_event_from_state ((MonoSocketAsyncResult *) state) == event)
433 current = mono_mlist_next (current);
438 mono_mlist_set_next (prev, mono_mlist_next (current));
440 *list = mono_mlist_next (*list);
448 * select/poll wake up when a socket is closed, but epoll just removes
449 * the socket from its internal list without notification.
452 mono_thread_pool_remove_socket (int sock)
455 MonoSocketAsyncResult *state;
458 if (socket_io_data.inited == 0)
461 EnterCriticalSection (&socket_io_data.io_lock);
462 if (socket_io_data.sock_to_state == NULL) {
463 LeaveCriticalSection (&socket_io_data.io_lock);
466 list = mono_g_hash_table_lookup (socket_io_data.sock_to_state, GINT_TO_POINTER (sock));
468 mono_g_hash_table_remove (socket_io_data.sock_to_state, GINT_TO_POINTER (sock));
469 LeaveCriticalSection (&socket_io_data.io_lock);
472 state = (MonoSocketAsyncResult *) mono_mlist_get_data (list);
473 if (state->operation == AIO_OP_RECEIVE)
474 state->operation = AIO_OP_RECV_JUST_CALLBACK;
475 else if (state->operation == AIO_OP_SEND)
476 state->operation = AIO_OP_SEND_JUST_CALLBACK;
478 ares = get_io_event (&list, MONO_POLLIN);
479 threadpool_append_job (&async_io_tp, ares);
481 ares = get_io_event (&list, MONO_POLLOUT);
482 threadpool_append_job (&async_io_tp, ares);
488 init_event_system (SocketIOData *data)
491 if (data->event_system == EPOLL_BACKEND) {
492 data->event_data = tp_epoll_init (data);
493 if (data->event_data == NULL) {
494 if (g_getenv ("MONO_DEBUG"))
495 g_message ("Falling back to poll()");
496 data->event_system = POLL_BACKEND;
499 #elif defined(HAVE_KQUEUE)
500 if (data->event_system == KQUEUE_BACKEND)
501 data->event_data = tp_kqueue_init (data);
503 if (data->event_system == POLL_BACKEND)
504 data->event_data = tp_poll_init (data);
508 socket_io_init (SocketIOData *data)
512 if (data->inited >= 2) // 2 -> initialized, 3-> cleaned up
515 inited = InterlockedCompareExchange (&data->inited, 1, 0);
518 if (data->inited >= 2)
524 EnterCriticalSection (&data->io_lock);
525 data->sock_to_state = mono_g_hash_table_new_type (g_direct_hash, g_direct_equal, MONO_HASH_VALUE_GC);
527 data->event_system = EPOLL_BACKEND;
528 #elif defined(HAVE_KQUEUE)
529 data->event_system = KQUEUE_BACKEND;
531 data->event_system = POLL_BACKEND;
533 if (g_getenv ("MONO_DISABLE_AIO") != NULL)
534 data->event_system = POLL_BACKEND;
536 init_event_system (data);
537 mono_thread_create_internal (mono_get_root_domain (), data->wait, data, TRUE, SMALL_STACK);
538 LeaveCriticalSection (&data->io_lock);
540 threadpool_start_thread (&async_io_tp);
544 socket_io_add (MonoAsyncResult *ares, MonoSocketAsyncResult *state)
547 SocketIOData *data = &socket_io_data;
552 socket_io_init (&socket_io_data);
553 if (mono_runtime_is_shutting_down () || data->inited == 3 || data->sock_to_state == NULL)
555 if (async_tp.pool_status == 2)
558 MONO_OBJECT_SETREF (state, ares, ares);
560 fd = GPOINTER_TO_INT (state->handle);
561 EnterCriticalSection (&data->io_lock);
562 if (data->sock_to_state == NULL) {
563 LeaveCriticalSection (&data->io_lock);
566 list = mono_g_hash_table_lookup (data->sock_to_state, GINT_TO_POINTER (fd));
568 list = mono_mlist_alloc ((MonoObject*)state);
571 list = mono_mlist_append (list, (MonoObject*)state);
575 mono_g_hash_table_replace (data->sock_to_state, state->handle, list);
576 ievt = get_events_from_list (list);
577 data->modify (data->event_data, fd, state->operation, ievt, is_new);
578 LeaveCriticalSection (&data->io_lock);
581 #ifndef DISABLE_SOCKETS
583 socket_io_filter (MonoObject *target, MonoObject *state)
586 MonoSocketAsyncResult *sock_res;
590 if (target == NULL || state == NULL)
593 domain = target->vtable->domain;
594 klass = target->vtable->klass;
595 if (socket_async_call_klass == NULL && is_socketasynccall (domain, klass))
596 socket_async_call_klass = klass;
598 if (process_async_call_klass == NULL && is_sdp_asyncreadhandler (domain, klass))
599 process_async_call_klass = klass;
601 if (klass != socket_async_call_klass && klass != process_async_call_klass)
604 sock_res = (MonoSocketAsyncResult *) state;
605 op = sock_res->operation;
606 if (op < AIO_OP_FIRST || op >= AIO_OP_LAST)
611 #endif /* !DISABLE_SOCKETS */
613 /* Returns the exception thrown when invoking, if any */
615 mono_async_invoke (ThreadPool *tp, MonoAsyncResult *ares)
617 ASyncCall *ac = (ASyncCall *)ares->object_data;
618 MonoObject *res, *exc = NULL;
619 MonoArray *out_args = NULL;
620 HANDLE wait_event = NULL;
622 if (ares->execution_context) {
623 /* use captured ExecutionContext (if available) */
624 MONO_OBJECT_SETREF (ares, original_context, mono_thread_get_execution_context ());
625 mono_thread_set_execution_context (ares->execution_context);
627 ares->original_context = NULL;
631 /* Fast path from ThreadPool.*QueueUserWorkItem */
632 void *pa = ares->async_state;
633 res = mono_runtime_delegate_invoke (ares->async_delegate, &pa, &exc);
635 MonoObject *cb_exc = NULL;
638 res = mono_message_invoke (ares->async_delegate, ac->msg, &exc, &out_args);
639 MONO_OBJECT_SETREF (ac, res, res);
640 MONO_OBJECT_SETREF (ac, msg->exc, exc);
641 MONO_OBJECT_SETREF (ac, out_args, out_args);
643 mono_monitor_enter ((MonoObject *) ares);
645 if (ares->handle != NULL)
646 wait_event = mono_wait_handle_get_handle ((MonoWaitHandle *) ares->handle);
647 mono_monitor_exit ((MonoObject *) ares);
648 /* notify listeners */
649 if (wait_event != NULL)
650 SetEvent (wait_event);
652 /* call async callback if cb_method != null*/
653 if (ac != NULL && ac->cb_method) {
656 mono_runtime_invoke (ac->cb_method, ac->cb_target, pa, &cb_exc);
663 /* restore original thread execution context if flow isn't suppressed, i.e. non null */
664 if (ares->original_context) {
665 mono_thread_set_execution_context (ares->original_context);
666 ares->original_context = NULL;
672 threadpool_start_idle_threads (ThreadPool *tp)
677 stack_size = (!tp->is_io) ? 0 : SMALL_STACK;
681 if (n >= tp->min_threads)
683 if (InterlockedCompareExchange (&tp->nthreads, n + 1, n) == n)
686 mono_perfcounter_update_value (tp->pc_nthreads, TRUE, 1);
687 mono_thread_create_internal (mono_get_root_domain (), tp->async_invoke, tp, TRUE, stack_size);
693 threadpool_init (ThreadPool *tp, int min_threads, int max_threads, void (*async_invoke) (gpointer))
695 memset (tp, 0, sizeof (ThreadPool));
696 tp->min_threads = min_threads;
697 tp->max_threads = max_threads;
698 tp->async_invoke = async_invoke;
699 tp->queue = mono_cq_create ();
700 MONO_SEM_INIT (&tp->new_job, 0);
704 init_perf_counter (const char *category, const char *counter)
706 MonoString *category_str;
707 MonoString *counter_str;
713 if (category == NULL || counter == NULL)
715 root = mono_get_root_domain ();
716 category_str = mono_string_new (root, category);
717 counter_str = mono_string_new (root, counter);
718 machine = mono_string_new (root, ".");
719 return mono_perfcounter_get_impl (category_str, counter_str, NULL, machine, &type, &custom);
724 print_pool_info (ThreadPool *tp)
727 // if (tp->tail - tp->head == 0)
730 g_print ("Pool status? %d\n", InterlockedCompareExchange (&tp->pool_status, 0, 0));
731 g_print ("Min. threads: %d\n", InterlockedCompareExchange (&tp->min_threads, 0, 0));
732 g_print ("Max. threads: %d\n", InterlockedCompareExchange (&tp->max_threads, 0, 0));
733 g_print ("nthreads: %d\n", InterlockedCompareExchange (&tp->nthreads, 0, 0));
734 g_print ("busy threads: %d\n", InterlockedCompareExchange (&tp->busy_threads, 0, 0));
735 g_print ("Waiting: %d\n", InterlockedCompareExchange (&tp->waiting, 0, 0));
736 g_print ("Queued: %d\n", (tp->tail - tp->head));
737 if (tp == &async_tp) {
739 EnterCriticalSection (&wsqs_lock);
740 for (i = 0; i < wsqs->len; i++) {
741 g_print ("\tWSQ %d: %d\n", i, mono_wsq_count (g_ptr_array_index (wsqs, i)));
743 LeaveCriticalSection (&wsqs_lock);
745 g_print ("\tSockets: %d\n", mono_g_hash_table_size (socket_io_data.sock_to_state));
747 g_print ("-------------\n");
751 signal_handler (int signo)
756 g_print ("\n-----Non-IO-----\n");
757 print_pool_info (tp);
759 g_print ("\n-----IO-----\n");
760 print_pool_info (tp);
766 monitor_thread (gpointer unused)
768 ThreadPool *pools [2];
769 MonoInternalThread *thread;
774 pools [0] = &async_tp;
775 pools [1] = &async_io_tp;
776 thread = mono_thread_internal_current ();
777 ves_icall_System_Threading_Thread_SetName_internal (thread, mono_string_new (mono_domain_get (), "Threadpool monitor"));
782 ts = mono_msec_ticks ();
783 if (SleepEx (ms, TRUE) == 0)
785 ms -= (mono_msec_ticks () - ts);
786 if (mono_runtime_is_shutting_down ())
788 if (THREAD_WANTS_A_BREAK (thread))
789 mono_thread_interruption_checkpoint ();
792 if (mono_runtime_is_shutting_down ())
795 for (i = 0; i < 2; i++) {
800 need_one = (mono_cq_count (tp->queue) > 0);
801 if (!need_one && !tp->is_io) {
802 EnterCriticalSection (&wsqs_lock);
803 for (i = 0; wsqs != NULL && i < wsqs->len; i++) {
805 wsq = g_ptr_array_index (wsqs, i);
806 if (mono_wsq_count (wsq) != 0) {
811 LeaveCriticalSection (&wsqs_lock);
814 threadpool_start_thread (tp);
820 mono_thread_pool_init ()
822 gint threads_per_cpu = 1;
824 gint cpu_count = mono_cpu_count ();
830 result = InterlockedCompareExchange (&tp_inited, 1, 0);
839 MONO_GC_REGISTER_ROOT_FIXED (socket_io_data.sock_to_state);
840 InitializeCriticalSection (&socket_io_data.io_lock);
841 if (g_getenv ("MONO_THREADS_PER_CPU") != NULL) {
842 threads_per_cpu = atoi (g_getenv ("MONO_THREADS_PER_CPU"));
843 if (threads_per_cpu < 1)
847 thread_count = MIN (cpu_count * threads_per_cpu, 100 * cpu_count);
848 threadpool_init (&async_tp, thread_count, MAX (100 * cpu_count, thread_count), async_invoke_thread);
849 threadpool_init (&async_io_tp, cpu_count * 2, cpu_count * 4, async_invoke_thread);
850 async_io_tp.is_io = TRUE;
852 async_call_klass = mono_class_from_name (mono_defaults.corlib, "System", "MonoAsyncCall");
853 g_assert (async_call_klass);
855 InitializeCriticalSection (&wsqs_lock);
856 wsqs = g_ptr_array_sized_new (MAX (100 * cpu_count, thread_count));
859 async_tp.pc_nitems = init_perf_counter ("Mono Threadpool", "Work Items Added");
860 g_assert (async_tp.pc_nitems);
862 async_io_tp.pc_nitems = init_perf_counter ("Mono Threadpool", "IO Work Items Added");
863 g_assert (async_io_tp.pc_nitems);
865 async_tp.pc_nthreads = init_perf_counter ("Mono Threadpool", "# of Threads");
866 g_assert (async_tp.pc_nthreads);
868 async_io_tp.pc_nthreads = init_perf_counter ("Mono Threadpool", "# of IO Threads");
869 g_assert (async_io_tp.pc_nthreads);
872 signal (SIGALRM, signal_handler);
877 static MonoAsyncResult *
878 create_simple_asyncresult (MonoObject *target, MonoObject *state)
880 MonoDomain *domain = mono_domain_get ();
881 MonoAsyncResult *ares;
883 /* Don't call mono_async_result_new() to avoid capturing the context */
884 ares = (MonoAsyncResult *) mono_object_new (domain, mono_defaults.asyncresult_class);
885 MONO_OBJECT_SETREF (ares, async_delegate, target);
886 MONO_OBJECT_SETREF (ares, async_state, state);
891 icall_append_io_job (MonoObject *target, MonoSocketAsyncResult *state)
893 MonoAsyncResult *ares;
895 ares = create_simple_asyncresult (target, (MonoObject *) state);
896 socket_io_add (ares, state);
900 mono_thread_pool_add (MonoObject *target, MonoMethodMessage *msg, MonoDelegate *async_callback,
903 MonoDomain *domain = mono_domain_get ();
904 MonoAsyncResult *ares;
907 ac = (ASyncCall*)mono_object_new (domain, async_call_klass);
908 MONO_OBJECT_SETREF (ac, msg, msg);
909 MONO_OBJECT_SETREF (ac, state, state);
911 if (async_callback) {
912 ac->cb_method = mono_get_delegate_invoke (((MonoObject *)async_callback)->vtable->klass);
913 MONO_OBJECT_SETREF (ac, cb_target, async_callback);
916 ares = mono_async_result_new (domain, NULL, ac->state, NULL, (MonoObject*)ac);
917 MONO_OBJECT_SETREF (ares, async_delegate, target);
919 #ifndef DISABLE_SOCKETS
920 if (socket_io_filter (target, state)) {
921 socket_io_add (ares, (MonoSocketAsyncResult *) state);
925 threadpool_append_job (&async_tp, (MonoObject *) ares);
930 mono_thread_pool_finish (MonoAsyncResult *ares, MonoArray **out_args, MonoObject **exc)
938 /* check if already finished */
939 mono_monitor_enter ((MonoObject *) ares);
941 if (ares->endinvoke_called) {
942 *exc = (MonoObject *) mono_get_exception_invalid_operation (NULL);
943 mono_monitor_exit ((MonoObject *) ares);
947 ares->endinvoke_called = 1;
948 /* wait until we are really finished */
949 if (!ares->completed) {
950 if (ares->handle == NULL) {
951 wait_event = CreateEvent (NULL, TRUE, FALSE, NULL);
952 g_assert(wait_event != 0);
953 MONO_OBJECT_SETREF (ares, handle, (MonoObject *) mono_wait_handle_new (mono_object_domain (ares), wait_event));
955 wait_event = mono_wait_handle_get_handle ((MonoWaitHandle *) ares->handle);
957 mono_monitor_exit ((MonoObject *) ares);
958 WaitForSingleObjectEx (wait_event, INFINITE, TRUE);
960 mono_monitor_exit ((MonoObject *) ares);
963 ac = (ASyncCall *) ares->object_data;
964 g_assert (ac != NULL);
965 *exc = ac->msg->exc; /* FIXME: GC add write barrier */
966 *out_args = ac->out_args;
972 threadpool_kill_idle_threads (ThreadPool *tp)
976 n = (gint) InterlockedCompareExchange (&tp->max_threads, 0, -1);
979 MONO_SEM_POST (&tp->new_job);
984 mono_thread_pool_cleanup (void)
986 if (InterlockedExchange (&async_io_tp.pool_status, 2) == 1) {
987 socket_io_cleanup (&socket_io_data); /* Empty when DISABLE_SOCKETS is defined */
988 threadpool_kill_idle_threads (&async_io_tp);
991 if (async_io_tp.queue != NULL) {
992 MONO_SEM_DESTROY (&async_io_tp.new_job);
993 threadpool_free_queue (&async_io_tp);
997 if (InterlockedExchange (&async_tp.pool_status, 2) == 1) {
998 threadpool_kill_idle_threads (&async_tp);
999 threadpool_free_queue (&async_tp);
1003 EnterCriticalSection (&wsqs_lock);
1004 mono_wsq_cleanup ();
1006 g_ptr_array_free (wsqs, TRUE);
1008 LeaveCriticalSection (&wsqs_lock);
1009 MONO_SEM_DESTROY (&async_tp.new_job);
1014 threadpool_start_thread (ThreadPool *tp)
1019 stack_size = (!tp->is_io) ? 0 : SMALL_STACK;
1020 while (!mono_runtime_is_shutting_down () && (n = tp->nthreads) < tp->max_threads) {
1021 if (InterlockedCompareExchange (&tp->nthreads, n + 1, n) == n) {
1022 mono_perfcounter_update_value (tp->pc_nthreads, TRUE, 1);
1023 mono_thread_create_internal (mono_get_root_domain (), tp->async_invoke, tp, TRUE, stack_size);
1032 pulse_on_new_job (ThreadPool *tp)
1035 MONO_SEM_POST (&tp->new_job);
1039 icall_append_job (MonoObject *ar)
1041 threadpool_append_jobs (&async_tp, &ar, 1);
1045 threadpool_append_job (ThreadPool *tp, MonoObject *ar)
1047 threadpool_append_jobs (tp, &ar, 1);
1051 threadpool_append_jobs (ThreadPool *tp, MonoObject **jobs, gint njobs)
1053 static int job_counter;
1057 if (mono_runtime_is_shutting_down ())
1060 if (tp->pool_status == 0 && InterlockedCompareExchange (&tp->pool_status, 1, 0) == 0) {
1062 mono_thread_create_internal (mono_get_root_domain (), monitor_thread, NULL, TRUE, SMALL_STACK);
1063 threadpool_start_thread (tp);
1065 /* Create on demand up to min_threads to avoid startup penalty for apps that don't use
1066 * the threadpool that much
1067 * mono_thread_create_internal (mono_get_root_domain (), threadpool_start_idle_threads, tp, TRUE, SMALL_STACK);
1071 for (i = 0; i < njobs; i++) {
1073 if (ar == NULL || mono_domain_is_unloading (ar->vtable->domain))
1074 continue; /* Might happen when cleaning domain jobs */
1075 if (!tp->is_io && (InterlockedIncrement (&job_counter) % 10) == 0) {
1076 MonoAsyncResult *o = (MonoAsyncResult *) ar;
1077 o->add_time = mono_100ns_ticks ();
1079 threadpool_jobs_inc (ar);
1080 mono_perfcounter_update_value (tp->pc_nitems, TRUE, 1);
1081 if (!tp->is_io && mono_wsq_local_push (ar))
1084 mono_cq_enqueue (tp->queue, ar);
1087 for (i = 0; tp->waiting > 0 && i < MIN(njobs, tp->max_threads); i++)
1088 pulse_on_new_job (tp);
1092 threadpool_clear_queue (ThreadPool *tp, MonoDomain *domain)
1098 while (mono_cq_dequeue (tp->queue, &obj)) {
1101 if (obj->vtable->domain != domain)
1102 other = mono_mlist_prepend (other, obj);
1103 threadpool_jobs_dec (obj);
1107 threadpool_append_job (tp, (MonoObject *) mono_mlist_get_data (other));
1108 other = mono_mlist_next (other);
1113 remove_sockstate_for_domain (gpointer key, gpointer value, gpointer user_data)
1115 MonoMList *list = value;
1116 gboolean remove = FALSE;
1118 MonoObject *data = mono_mlist_get_data (list);
1119 if (mono_object_domain (data) == user_data) {
1121 mono_mlist_set_data (list, NULL);
1123 list = mono_mlist_next (list);
1125 //FIXME is there some sort of additional unregistration we need to perform here?
1130 * Clean up the threadpool of all domain jobs.
1131 * Can only be called as part of the domain unloading process as
1132 * it will wait for all jobs to be visible to the interruption code.
1135 mono_thread_pool_remove_domain_jobs (MonoDomain *domain, int timeout)
1139 guint32 start_time = 0;
1141 g_assert (domain->state == MONO_APPDOMAIN_UNLOADING);
1143 threadpool_clear_queue (&async_tp, domain);
1144 threadpool_clear_queue (&async_io_tp, domain);
1146 EnterCriticalSection (&socket_io_data.io_lock);
1147 if (socket_io_data.sock_to_state)
1148 mono_g_hash_table_foreach_remove (socket_io_data.sock_to_state, remove_sockstate_for_domain, domain);
1150 LeaveCriticalSection (&socket_io_data.io_lock);
1153 * There might be some threads out that could be about to execute stuff from the given domain.
1154 * We avoid that by setting up a semaphore to be pulsed by the thread that reaches zero.
1156 sem_handle = CreateSemaphore (NULL, 0, 1, NULL);
1158 domain->cleanup_semaphore = sem_handle;
1160 * The memory barrier here is required to have global ordering between assigning to cleanup_semaphone
1161 * and reading threadpool_jobs.
1162 * Otherwise this thread could read a stale version of threadpool_jobs and wait forever.
1164 mono_memory_write_barrier ();
1166 if (domain->threadpool_jobs && timeout != -1)
1167 start_time = mono_msec_ticks ();
1168 while (domain->threadpool_jobs) {
1169 WaitForSingleObject (sem_handle, timeout);
1170 if (timeout != -1 && (mono_msec_ticks () - start_time) > timeout) {
1176 domain->cleanup_semaphore = NULL;
1177 CloseHandle (sem_handle);
1182 threadpool_free_queue (ThreadPool *tp)
1184 mono_cq_destroy (tp->queue);
1189 mono_thread_pool_is_queue_array (MonoArray *o)
1191 // gpointer obj = o;
1193 // FIXME: need some fix in sgen code.
1203 EnterCriticalSection (&wsqs_lock);
1204 wsq = mono_wsq_create ();
1206 LeaveCriticalSection (&wsqs_lock);
1209 for (i = 0; i < wsqs->len; i++) {
1210 if (g_ptr_array_index (wsqs, i) == NULL) {
1211 wsqs->pdata [i] = wsq;
1212 LeaveCriticalSection (&wsqs_lock);
1216 g_ptr_array_add (wsqs, wsq);
1217 LeaveCriticalSection (&wsqs_lock);
1222 remove_wsq (MonoWSQ *wsq)
1229 EnterCriticalSection (&wsqs_lock);
1231 LeaveCriticalSection (&wsqs_lock);
1234 g_ptr_array_remove_fast (wsqs, wsq);
1237 * Only clean this up when shutting down, any other case will error out
1238 * if we're removing a queue that still has work items.
1240 if (mono_runtime_is_shutting_down ()) {
1241 while (mono_wsq_local_pop (&data)) {
1242 threadpool_jobs_dec (data);
1246 mono_wsq_destroy (wsq);
1247 LeaveCriticalSection (&wsqs_lock);
1251 try_steal (MonoWSQ *local_wsq, gpointer *data, gboolean retry)
1256 if (wsqs == NULL || data == NULL || *data != NULL)
1261 if (mono_runtime_is_shutting_down ())
1264 EnterCriticalSection (&wsqs_lock);
1265 for (i = 0; wsqs != NULL && i < wsqs->len; i++) {
1268 wsq = wsqs->pdata [i];
1269 if (wsq == local_wsq || mono_wsq_count (wsq) == 0)
1271 mono_wsq_try_steal (wsqs->pdata [i], data, ms);
1272 if (*data != NULL) {
1273 LeaveCriticalSection (&wsqs_lock);
1277 LeaveCriticalSection (&wsqs_lock);
1279 } while (retry && ms < 11);
1283 dequeue_or_steal (ThreadPool *tp, gpointer *data, MonoWSQ *local_wsq)
1285 if (mono_runtime_is_shutting_down ())
1287 mono_cq_dequeue (tp->queue, (MonoObject **) data);
1288 if (!tp->is_io && !*data)
1289 try_steal (local_wsq, data, FALSE);
1290 return (*data != NULL);
1294 process_idle_times (ThreadPool *tp, gint64 t)
1298 gboolean compute_avg;
1302 if (tp->ignore_times || t <= 0)
1305 compute_avg = FALSE;
1306 ticks = mono_100ns_ticks ();
1308 SPIN_LOCK (tp->sp_lock);
1309 if (tp->ignore_times) {
1310 SPIN_UNLOCK (tp->sp_lock);
1315 if (tp->last_check == 0)
1316 tp->last_check = ticks;
1317 else if (tp->last_check > 0 && (ticks - tp->last_check) > 5000000) {
1318 tp->ignore_times = 1;
1321 SPIN_UNLOCK (tp->sp_lock);
1326 //printf ("Items: %d Time elapsed: %.3fs\n", tp->n_sum, (ticks - tp->last_check) / 10000.0);
1327 tp->last_check = ticks;
1329 avg = tp->time_sum / tp->n_sum;
1330 if (tp->averages [1] == 0) {
1331 tp->averages [1] = avg;
1333 per1 = ((100 * (ABS (avg - tp->averages [1]))) / tp->averages [1]);
1335 if (avg > tp->averages [1]) {
1336 if (tp->averages [1] < tp->averages [0]) {
1341 } else if (avg < tp->averages [1] && tp->averages [1] < tp->averages [0]) {
1346 min = tp->min_threads;
1348 if ((n - min) < min && tp->busy_threads == n)
1352 if (new_threads != 0) {
1353 printf ("n: %d per1: %lld avg=%lld avg1=%lld avg0=%lld\n", new_threads, per1, avg, tp->averages [1], tp->averages [0]);
1361 tp->averages [0] = tp->averages [1];
1362 tp->averages [1] = avg;
1363 tp->ignore_times = 0;
1365 if (new_threads == -1) {
1366 if (tp->destroy_thread == 0 && InterlockedCompareExchange (&tp->destroy_thread, 1, 0) == 0)
1367 pulse_on_new_job (tp);
1372 should_i_die (ThreadPool *tp)
1374 gboolean result = FALSE;
1375 if (tp->destroy_thread == 1 && InterlockedCompareExchange (&tp->destroy_thread, 0, 1) == 1)
1376 result = (tp->nthreads > tp->min_threads);
1381 async_invoke_thread (gpointer data)
1384 MonoInternalThread *thread;
1395 thread = mono_thread_internal_current ();
1397 mono_profiler_thread_start (thread->tid);
1398 name = (tp->is_io) ? "IO Threadpool worker" : "Threadpool worker";
1399 mono_thread_set_name_internal (thread, mono_string_new (mono_domain_get (), name), FALSE);
1402 tp_start_func (tp_hooks_user_data);
1406 MonoAsyncResult *ar;
1408 gboolean is_io_task;
1413 ar = (MonoAsyncResult *) data;
1415 InterlockedIncrement (&tp->busy_threads);
1416 domain = ((MonoObject *)ar)->vtable->domain;
1417 #ifndef DISABLE_SOCKETS
1418 klass = ((MonoObject *) data)->vtable->klass;
1419 is_io_task = !is_corlib_asyncresult (domain, klass);
1422 MonoSocketAsyncResult *state = (MonoSocketAsyncResult *) data;
1423 is_socket = is_socketasyncresult (domain, klass);
1425 switch (state->operation) {
1426 case AIO_OP_RECEIVE:
1427 state->total = ICALL_RECV (state);
1430 state->total = ICALL_SEND (state);
1435 /* worker threads invokes methods in different domains,
1436 * so we need to set the right domain here */
1439 if (mono_domain_is_unloading (domain) || mono_runtime_is_shutting_down ()) {
1440 threadpool_jobs_dec ((MonoObject *)ar);
1443 InterlockedDecrement (&tp->busy_threads);
1445 mono_thread_push_appdomain_ref (domain);
1446 if (threadpool_jobs_dec ((MonoObject *)ar)) {
1449 mono_thread_pop_appdomain_ref ();
1450 InterlockedDecrement (&tp->busy_threads);
1454 if (mono_domain_set (domain, FALSE)) {
1457 if (tp_item_begin_func)
1458 tp_item_begin_func (tp_item_user_data);
1460 if (!is_io_task && ar->add_time > 0)
1461 process_idle_times (tp, ar->add_time);
1462 exc = mono_async_invoke (tp, ar);
1463 if (tp_item_end_func)
1464 tp_item_end_func (tp_item_user_data);
1466 mono_internal_thread_unhandled_exception (exc);
1467 if (is_socket && tp->is_io) {
1468 MonoSocketAsyncResult *state = (MonoSocketAsyncResult *) data;
1470 if (state->completed && state->callback) {
1471 MonoAsyncResult *cb_ares;
1472 cb_ares = create_simple_asyncresult ((MonoObject *) state->callback,
1473 (MonoObject *) state);
1474 icall_append_job ((MonoObject *) cb_ares);
1477 mono_domain_set (mono_get_root_domain (), TRUE);
1479 mono_thread_pop_appdomain_ref ();
1480 InterlockedDecrement (&tp->busy_threads);
1481 /* If the callee changes the background status, set it back to TRUE */
1482 mono_thread_clr_state (thread , ~ThreadState_Background);
1483 if (!mono_thread_test_state (thread , ThreadState_Background))
1484 ves_icall_System_Threading_Thread_SetState (thread, ThreadState_Background);
1490 must_die = should_i_die (tp);
1491 if (!must_die && (tp->is_io || !mono_wsq_local_pop (&data)))
1492 dequeue_or_steal (tp, &data, wsq);
1495 while (!must_die && !data && n_naps < 4) {
1498 InterlockedIncrement (&tp->waiting);
1500 // Another thread may have added a job into its wsq since the last call to dequeue_or_steal
1501 // Check all the queues again before entering the wait loop
1502 dequeue_or_steal (tp, &data, wsq);
1504 InterlockedDecrement (&tp->waiting);
1508 mono_gc_set_skip_thread (TRUE);
1510 #if defined(__OpenBSD__)
1511 while (mono_cq_count (tp->queue) == 0 && (res = mono_sem_wait (&tp->new_job, TRUE)) == -1) {// && errno == EINTR) {
1513 while (mono_cq_count (tp->queue) == 0 && (res = mono_sem_timedwait (&tp->new_job, 2000, TRUE)) == -1) {// && errno == EINTR) {
1515 if (mono_runtime_is_shutting_down ())
1517 if (THREAD_WANTS_A_BREAK (thread))
1518 mono_thread_interruption_checkpoint ();
1520 InterlockedDecrement (&tp->waiting);
1522 mono_gc_set_skip_thread (FALSE);
1524 if (mono_runtime_is_shutting_down ())
1526 must_die = should_i_die (tp);
1527 dequeue_or_steal (tp, &data, wsq);
1531 if (!data && !tp->is_io && !mono_runtime_is_shutting_down ()) {
1532 mono_wsq_local_pop (&data);
1533 if (data && must_die) {
1534 InterlockedCompareExchange (&tp->destroy_thread, 1, 0);
1535 pulse_on_new_job (tp);
1544 down = mono_runtime_is_shutting_down ();
1545 if (!down && nt <= tp->min_threads)
1547 if (down || InterlockedCompareExchange (&tp->nthreads, nt - 1, nt) == nt) {
1548 mono_perfcounter_update_value (tp->pc_nthreads, TRUE, -1);
1553 mono_profiler_thread_end (thread->tid);
1556 tp_finish_func (tp_hooks_user_data);
1563 g_assert_not_reached ();
1567 ves_icall_System_Threading_ThreadPool_GetAvailableThreads (gint *workerThreads, gint *completionPortThreads)
1569 *workerThreads = async_tp.max_threads - async_tp.busy_threads;
1570 *completionPortThreads = async_io_tp.max_threads - async_io_tp.busy_threads;
1574 ves_icall_System_Threading_ThreadPool_GetMaxThreads (gint *workerThreads, gint *completionPortThreads)
1576 *workerThreads = async_tp.max_threads;
1577 *completionPortThreads = async_io_tp.max_threads;
1581 ves_icall_System_Threading_ThreadPool_GetMinThreads (gint *workerThreads, gint *completionPortThreads)
1583 *workerThreads = async_tp.min_threads;
1584 *completionPortThreads = async_io_tp.min_threads;
1588 ves_icall_System_Threading_ThreadPool_SetMinThreads (gint workerThreads, gint completionPortThreads)
1591 gint max_io_threads;
1593 max_threads = async_tp.max_threads;
1594 if (workerThreads <= 0 || workerThreads > max_threads)
1597 max_io_threads = async_io_tp.max_threads;
1598 if (completionPortThreads <= 0 || completionPortThreads > max_io_threads)
1601 InterlockedExchange (&async_tp.min_threads, workerThreads);
1602 InterlockedExchange (&async_io_tp.min_threads, completionPortThreads);
1603 if (workerThreads > async_tp.nthreads)
1604 mono_thread_create_internal (mono_get_root_domain (), threadpool_start_idle_threads, &async_tp, TRUE, SMALL_STACK);
1605 if (completionPortThreads > async_io_tp.nthreads)
1606 mono_thread_create_internal (mono_get_root_domain (), threadpool_start_idle_threads, &async_io_tp, TRUE, SMALL_STACK);
1611 ves_icall_System_Threading_ThreadPool_SetMaxThreads (gint workerThreads, gint completionPortThreads)
1614 gint min_io_threads;
1617 cpu_count = mono_cpu_count ();
1618 min_threads = async_tp.min_threads;
1619 if (workerThreads < min_threads || workerThreads < cpu_count)
1622 /* We don't really have the concept of completion ports. Do we care here? */
1623 min_io_threads = async_io_tp.min_threads;
1624 if (completionPortThreads < min_io_threads || completionPortThreads < cpu_count)
1627 InterlockedExchange (&async_tp.max_threads, workerThreads);
1628 InterlockedExchange (&async_io_tp.max_threads, completionPortThreads);
1633 * mono_install_threadpool_thread_hooks
1634 * @start_func: the function to be called right after a new threadpool thread is created. Can be NULL.
1635 * @finish_func: the function to be called right before a thredpool thread is exiting. Can be NULL.
1636 * @user_data: argument passed to @start_func and @finish_func.
1638 * @start_fun will be called right after a threadpool thread is created and @finish_func right before a threadpool thread exits.
1639 * The calls will be made from the thread itself.
1642 mono_install_threadpool_thread_hooks (MonoThreadPoolFunc start_func, MonoThreadPoolFunc finish_func, gpointer user_data)
1644 tp_start_func = start_func;
1645 tp_finish_func = finish_func;
1646 tp_hooks_user_data = user_data;
1650 * mono_install_threadpool_item_hooks
1651 * @begin_func: the function to be called before a threadpool work item processing starts.
1652 * @end_func: the function to be called after a threadpool work item is finished.
1653 * @user_data: argument passed to @begin_func and @end_func.
1655 * The calls will be made from the thread itself and from the same AppDomain
1656 * where the work item was executed.
1660 mono_install_threadpool_item_hooks (MonoThreadPoolItemFunc begin_func, MonoThreadPoolItemFunc end_func, gpointer user_data)
1662 tp_item_begin_func = begin_func;
1663 tp_item_end_func = end_func;
1664 tp_item_user_data = user_data;
1668 mono_internal_thread_unhandled_exception (MonoObject* exc)
1670 if (mono_runtime_unhandled_exception_policy_get () == MONO_UNHANDLED_POLICY_CURRENT) {
1674 klass = exc->vtable->klass;
1675 unloaded = is_appdomainunloaded_exception (exc->vtable->domain, klass);
1676 if (!unloaded && klass != mono_defaults.threadabortexception_class) {
1677 mono_unhandled_exception (exc);
1678 if (mono_environment_exitcode_get () == 1)
1681 if (klass == mono_defaults.threadabortexception_class)
1682 mono_thread_internal_reset_abort (mono_thread_internal_current ());