#include <config.h>
#include <glib.h>
-#include <mono/metadata/domain-internals.h>
#include <mono/metadata/profiler-private.h>
-#include <mono/metadata/tabledefs.h>
#include <mono/metadata/threads.h>
#include <mono/metadata/threads-types.h>
#include <mono/metadata/threadpool-internals.h>
#include <mono/metadata/exception.h>
-#include <mono/metadata/file-io.h>
-#include <mono/metadata/monitor.h>
#include <mono/metadata/mono-mlist.h>
-#include <mono/metadata/marshal.h>
#include <mono/metadata/mono-perfcounters.h>
#include <mono/metadata/socket-io.h>
#include <mono/metadata/mono-cq.h>
#include <mono/metadata/mono-wsq.h>
#include <mono/io-layer/io-layer.h>
-#include <mono/metadata/gc-internal.h>
#include <mono/utils/mono-time.h>
#include <mono/utils/mono-proclib.h>
#include <mono/utils/mono-semaphore.h>
#define SPIN_UNLOCK(i) i = 0
-#define EPOLL_DEBUG(...)
-#define EPOLL_DEBUG_STMT(...)
-#define TP_DEBUG(...)
-#define TP_DEBUG_STMT(...)
-
/* DEBUG: prints tp data every 2s */
#undef DEBUG
-/*
-#define EPOLL_DEBUG(...) g_message(__VA_ARGS__)
-#define EPOLL_DEBUG_STMT(...) do { __VA_ARGS__ } while (0)
-#define TP_DEBUG(...) g_message(__VA_ARGS__)
-#define TP_DEBUG_STMT(...) do { __VA_ARGS__ } while (0)
-*/
-
-/* map of CounterSample.cs */
-struct _MonoCounterSample {
- gint64 rawValue;
- gint64 baseValue;
- gint64 counterFrequency;
- gint64 systemFrequency;
- gint64 timeStamp;
- gint64 timeStamp100nSec;
- gint64 counterTimeStamp;
- int counterType;
-};
-
/* mono_thread_pool_init called */
static volatile int tp_inited;
+enum {
+ POLL_BACKEND,
+ EPOLL_BACKEND
+};
+
typedef struct {
CRITICAL_SECTION io_lock; /* access to sock_to_state */
int inited; // 0 -> not initialized , 1->initializing, 2->initialized, 3->cleaned up
- int pipe [2];
MonoGHashTable *sock_to_state;
- HANDLE new_sem; /* access to newpfd and write side of the pipe */
- mono_pollfd *newpfd;
- gboolean epoll_disabled;
-#ifdef HAVE_EPOLL
- int epollfd;
-#endif
+ gint event_system;
+ gpointer event_data;
+ void (*modify) (gpointer event_data, int fd, int operation, int events, gboolean is_new);
+ void (*wait) (gpointer sock_data);
+ void (*shutdown) (gpointer event_data);
} SocketIOData;
static SocketIOData socket_io_data;
volatile gint64 time_sum;
volatile gint n_sum;
gint64 averages [2];
- /**/
- //TP_DEBUG_ONLY (gint nodes_created);
- //TP_DEBUG_ONLY (gint nodes_reused);
gboolean is_io;
} ThreadPool;
static void threadpool_kill_idle_threads (ThreadPool *tp);
static gboolean threadpool_start_thread (ThreadPool *tp);
static void monitor_thread (gpointer data);
+static void socket_io_cleanup (SocketIOData *data);
+static MonoObject *get_io_event (MonoMList **list, gint event);
+static int get_events_from_list (MonoMList *list);
+static int get_event_from_state (MonoSocketAsyncResult *state);
static MonoClass *async_call_klass;
static MonoClass *socket_async_call_klass;
static MonoThreadPoolItemFunc tp_item_end_func;
static gpointer tp_item_user_data;
-#define INIT_POLLFD(a, b, c) {(a)->fd = b; (a)->events = c; (a)->revents = 0;}
enum {
AIO_OP_FIRST,
AIO_OP_ACCEPT = 0,
AIO_OP_LAST
};
+#include <mono/metadata/tpool-poll.c>
+#ifdef HAVE_EPOLL
+#include <mono/metadata/tpool-epoll.c>
+#endif
/*
* Functions to check whenever a class is given system class. We need to cache things in MonoDomain since some of the
* assemblies can be unloaded.
return;
}
data->inited = 3;
-
-#ifdef HOST_WIN32
- closesocket (data->pipe [0]);
- closesocket (data->pipe [1]);
-#else
- if (data->pipe [0] > -1)
- close (data->pipe [0]);
- if (data->pipe [1] > -1)
- close (data->pipe [1]);
-#endif
- data->pipe [0] = -1;
- data->pipe [1] = -1;
- if (data->new_sem)
- CloseHandle (data->new_sem);
- data->new_sem = NULL;
- mono_g_hash_table_destroy (data->sock_to_state);
- data->sock_to_state = NULL;
- g_free (data->newpfd);
- data->newpfd = NULL;
-#ifdef HAVE_EPOLL
- if (FALSE == data->epoll_disabled)
- close (data->epollfd);
-#endif
+ data->shutdown (data->event_data);
LeaveCriticalSection (&data->io_lock);
}
return FALSE;
}
-#ifdef HAVE_EPOLL
static MonoObject *
get_io_event (MonoMList **list, gint event)
{
return state;
}
-#endif
-
-static MonoMList *
-process_io_event (MonoMList *list, int event)
-{
- MonoSocketAsyncResult *state;
- MonoMList *oldlist;
-
- oldlist = list;
- state = NULL;
- while (list) {
- state = (MonoSocketAsyncResult *) mono_mlist_get_data (list);
- if (get_event_from_state (state) == event)
- break;
-
- list = mono_mlist_next (list);
- }
-
- if (list != NULL) {
- oldlist = mono_mlist_remove_item (oldlist, list);
- EPOLL_DEBUG ("Dispatching event %d on socket %p", event, state->handle);
- threadpool_append_job (&async_io_tp, (MonoObject *) state);
- }
-
- return oldlist;
-}
-
-static int
-mark_bad_fds (mono_pollfd *pfds, int nfds)
-{
- int i, ret;
- mono_pollfd *pfd;
- int count = 0;
-
- for (i = 0; i < nfds; i++) {
- pfd = &pfds [i];
- if (pfd->fd == -1)
- continue;
-
- ret = mono_poll (pfd, 1, 0);
- if (ret == -1 && errno == EBADF) {
- pfd->revents |= MONO_POLLNVAL;
- count++;
- } else if (ret == 1) {
- count++;
- }
- }
-
- return count;
-}
-
-static void
-socket_io_poll_main (gpointer p)
-{
-#if MONO_SMALL_CONFIG
-#define INITIAL_POLLFD_SIZE 128
-#else
-#define INITIAL_POLLFD_SIZE 1024
-#endif
-#define POLL_ERRORS (MONO_POLLERR | MONO_POLLHUP | MONO_POLLNVAL)
- SocketIOData *data = p;
- mono_pollfd *pfds;
- gint maxfd = 1;
- gint allocated;
- gint i;
- MonoInternalThread *thread;
-
- thread = mono_thread_internal_current ();
-
- allocated = INITIAL_POLLFD_SIZE;
- pfds = g_new0 (mono_pollfd, allocated);
- INIT_POLLFD (pfds, data->pipe [0], MONO_POLLIN);
- for (i = 1; i < allocated; i++)
- INIT_POLLFD (&pfds [i], -1, 0);
-
- while (1) {
- int nsock = 0;
- mono_pollfd *pfd;
- char one [1];
- MonoMList *list;
-
- do {
- if (nsock == -1) {
- if (THREAD_WANTS_A_BREAK (thread))
- mono_thread_interruption_checkpoint ();
- }
-
- nsock = mono_poll (pfds, maxfd, -1);
- } while (nsock == -1 && errno == EINTR);
-
- /*
- * Apart from EINTR, we only check EBADF, for the rest:
- * EINVAL: mono_poll() 'protects' us from descriptor
- * numbers above the limit if using select() by marking
- * then as MONO_POLLERR. If a system poll() is being
- * used, the number of descriptor we're passing will not
- * be over sysconf(_SC_OPEN_MAX), as the error would have
- * happened when opening.
- *
- * EFAULT: we own the memory pointed by pfds.
- * ENOMEM: we're doomed anyway
- *
- */
-
- if (nsock == -1 && errno == EBADF) {
- pfds->revents = 0; /* Just in case... */
- nsock = mark_bad_fds (pfds, maxfd);
- }
-
- if ((pfds->revents & POLL_ERRORS) != 0) {
- /* We're supposed to die now, as the pipe has been closed */
- g_free (pfds);
- socket_io_cleanup (data);
- return;
- }
-
- /* Got a new socket */
- if ((pfds->revents & MONO_POLLIN) != 0) {
- int nread;
-
- for (i = 1; i < allocated; i++) {
- pfd = &pfds [i];
- if (pfd->fd == -1 || data->newpfd == NULL ||
- pfd->fd == data->newpfd->fd)
- break;
- }
-
- if (i == allocated) {
- mono_pollfd *oldfd;
-
- oldfd = pfds;
- i = allocated;
- allocated = allocated * 2;
- pfds = g_renew (mono_pollfd, oldfd, allocated);
- g_free (oldfd);
- for (; i < allocated; i++)
- INIT_POLLFD (&pfds [i], -1, 0);
- }
-#ifndef HOST_WIN32
- nread = read (data->pipe [0], one, 1);
-#else
- nread = recv ((SOCKET) data->pipe [0], one, 1, 0);
-#endif
- if (nread <= 0) {
- g_free (pfds);
- return; /* we're closed */
- }
-
- INIT_POLLFD (&pfds [i], data->newpfd->fd, data->newpfd->events);
- ReleaseSemaphore (data->new_sem, 1, NULL);
- if (i >= maxfd)
- maxfd = i + 1;
- nsock--;
- }
-
- if (nsock == 0)
- continue;
-
- EnterCriticalSection (&data->io_lock);
- if (data->inited == 3) {
- g_free (pfds);
- LeaveCriticalSection (&data->io_lock);
- return; /* cleanup called */
- }
-
- for (i = 1; i < maxfd && nsock > 0; i++) {
- pfd = &pfds [i];
- if (pfd->fd == -1 || pfd->revents == 0)
- continue;
-
- nsock--;
- list = mono_g_hash_table_lookup (data->sock_to_state, GINT_TO_POINTER (pfd->fd));
- if (list != NULL && (pfd->revents & (MONO_POLLIN | POLL_ERRORS)) != 0) {
- list = process_io_event (list, MONO_POLLIN);
- }
-
- if (list != NULL && (pfd->revents & (MONO_POLLOUT | POLL_ERRORS)) != 0) {
- list = process_io_event (list, MONO_POLLOUT);
- }
-
- if (list != NULL) {
- mono_g_hash_table_replace (data->sock_to_state, GINT_TO_POINTER (pfd->fd), list);
- pfd->events = get_events_from_list (list);
- } else {
- mono_g_hash_table_remove (data->sock_to_state, GINT_TO_POINTER (pfd->fd));
- pfd->fd = -1;
- if (i == maxfd - 1)
- maxfd--;
- }
- }
- LeaveCriticalSection (&data->io_lock);
- }
-}
-
-#ifdef HAVE_EPOLL
-#define EPOLL_ERRORS (EPOLLERR | EPOLLHUP)
-#define EPOLL_NEVENTS 128
-static void
-socket_io_epoll_main (gpointer p)
-{
- SocketIOData *data;
- int epollfd;
- MonoInternalThread *thread;
- struct epoll_event *events, *evt;
- int ready = 0, i;
- gpointer async_results [EPOLL_NEVENTS * 2]; // * 2 because each loop can add up to 2 results here
- gint nresults;
-
- data = p;
- epollfd = data->epollfd;
- thread = mono_thread_internal_current ();
- events = g_new0 (struct epoll_event, EPOLL_NEVENTS);
-
- while (1) {
- do {
- if (ready == -1) {
- if (THREAD_WANTS_A_BREAK (thread))
- mono_thread_interruption_checkpoint ();
- }
- EPOLL_DEBUG ("epoll_wait init");
- ready = epoll_wait (epollfd, events, EPOLL_NEVENTS, -1);
- EPOLL_DEBUG_STMT(
- int err = errno;
- EPOLL_DEBUG ("epoll_wait end with %d ready sockets (%d %s).", ready, err, (ready == -1) ? g_strerror (err) : "");
- errno = err;
- );
- } while (ready == -1 && errno == EINTR);
-
- if (ready == -1) {
- int err = errno;
- g_free (events);
- if (err != EBADF)
- g_warning ("epoll_wait: %d %s", err, g_strerror (err));
-
- close (epollfd);
- return;
- }
-
- EnterCriticalSection (&data->io_lock);
- if (data->inited == 3) {
- g_free (events);
- close (epollfd);
- LeaveCriticalSection (&data->io_lock);
- return; /* cleanup called */
- }
-
- nresults = 0;
- for (i = 0; i < ready; i++) {
- int fd;
- MonoMList *list;
- MonoObject *ares;
-
- evt = &events [i];
- fd = evt->data.fd;
- list = mono_g_hash_table_lookup (data->sock_to_state, GINT_TO_POINTER (fd));
- EPOLL_DEBUG ("Event %d on %d list length: %d", evt->events, fd, mono_mlist_length (list));
- if (list != NULL && (evt->events & (EPOLLIN | EPOLL_ERRORS)) != 0) {
- ares = get_io_event (&list, MONO_POLLIN);
- if (ares != NULL)
- async_results [nresults++] = ares;
- }
-
- if (list != NULL && (evt->events & (EPOLLOUT | EPOLL_ERRORS)) != 0) {
- ares = get_io_event (&list, MONO_POLLOUT);
- if (ares != NULL)
- async_results [nresults++] = ares;
- }
-
- if (list != NULL) {
- mono_g_hash_table_replace (data->sock_to_state, GINT_TO_POINTER (fd), list);
- evt->events = get_events_from_list (list);
- EPOLL_DEBUG ("MOD %d to %d", fd, evt->events);
- if (epoll_ctl (epollfd, EPOLL_CTL_MOD, fd, evt)) {
- if (epoll_ctl (epollfd, EPOLL_CTL_ADD, fd, evt) == -1) {
- EPOLL_DEBUG_STMT (
- int err = errno;
- EPOLL_DEBUG ("epoll_ctl(MOD): %d %s fd: %d events: %d", err, g_strerror (err), fd, evt->events);
- errno = err;
- );
- }
- }
- } else {
- mono_g_hash_table_remove (data->sock_to_state, GINT_TO_POINTER (fd));
- EPOLL_DEBUG ("DEL %d", fd);
- epoll_ctl (epollfd, EPOLL_CTL_DEL, fd, evt);
- }
- }
- LeaveCriticalSection (&data->io_lock);
- threadpool_append_jobs (&async_io_tp, (MonoObject **) async_results, nresults);
- memset (async_results, 0, sizeof (gpointer) * nresults);
- }
-}
-#undef EPOLL_NEVENTS
-#endif
/*
* select/poll wake up when a socket is closed, but epoll just removes
void
mono_thread_pool_remove_socket (int sock)
{
- MonoMList *list, *next;
+ MonoMList *list;
MonoSocketAsyncResult *state;
+ MonoObject *ares;
if (socket_io_data.inited == 0)
return;
else if (state->operation == AIO_OP_SEND)
state->operation = AIO_OP_SEND_JUST_CALLBACK;
- next = mono_mlist_remove_item (list, list);
- list = process_io_event (list, MONO_POLLIN);
- if (list)
- process_io_event (list, MONO_POLLOUT);
-
- list = next;
+ ares = get_io_event (&list, MONO_POLLIN);
+ threadpool_append_job (&async_io_tp, ares);
+ if (list) {
+ ares = get_io_event (&list, MONO_POLLOUT);
+ threadpool_append_job (&async_io_tp, ares);
+ }
}
}
-#ifdef HOST_WIN32
static void
-connect_hack (gpointer x)
+init_event_system (SocketIOData *data)
{
- struct sockaddr_in *addr = (struct sockaddr_in *) x;
- int count = 0;
-
- while (connect ((SOCKET) socket_io_data.pipe [1], (SOCKADDR *) addr, sizeof (struct sockaddr_in))) {
- Sleep (500);
- if (++count > 3) {
- g_warning ("Error initializing async. sockets %d.", WSAGetLastError ());
- g_assert (WSAGetLastError ());
- }
- }
-}
+#ifdef HAVE_EPOLL
+ if (data->event_system == EPOLL_BACKEND)
+ data->event_data = tp_epoll_init (data);
#endif
+ if (data->event_system == POLL_BACKEND)
+ data->event_data = tp_poll_init (data);
+}
static void
socket_io_init (SocketIOData *data)
{
-#ifdef HOST_WIN32
- struct sockaddr_in server;
- struct sockaddr_in client;
- SOCKET srv;
- int len;
-#endif
int inited;
guint32 stack_size;
}
EnterCriticalSection (&data->io_lock);
-
+ data->sock_to_state = mono_g_hash_table_new_type (g_direct_hash, g_direct_equal, MONO_HASH_VALUE_GC);
#ifdef HAVE_EPOLL
- data->epoll_disabled = (g_getenv ("MONO_DISABLE_AIO") != NULL);
- if (FALSE == data->epoll_disabled) {
- data->epollfd = epoll_create (256);
- data->epoll_disabled = (data->epollfd == -1);
- if (data->epoll_disabled && g_getenv ("MONO_DEBUG"))
- g_message ("epoll_create() failed. Using plain poll().");
- } else {
- data->epollfd = -1;
- }
+ data->event_system = EPOLL_BACKEND;
#else
- data->epoll_disabled = TRUE;
-#endif
-
-#ifndef HOST_WIN32
- if (data->epoll_disabled) {
- if (pipe (data->pipe) != 0) {
- int err = errno;
- perror ("mono");
- g_assert (err);
- }
- } else {
- data->pipe [0] = -1;
- data->pipe [1] = -1;
- }
-#else
- srv = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
- g_assert (srv != INVALID_SOCKET);
- data->pipe [1] = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
- g_assert (data->pipe [1] != INVALID_SOCKET);
-
- server.sin_family = AF_INET;
- server.sin_addr.s_addr = inet_addr ("127.0.0.1");
- server.sin_port = 0;
- if (bind (srv, (SOCKADDR *) &server, sizeof (server))) {
- g_print ("%d\n", WSAGetLastError ());
- g_assert (1 != 0);
- }
-
- len = sizeof (server);
- getsockname (srv, (SOCKADDR *) &server, &len);
- listen (srv, 1);
- mono_thread_create (mono_get_root_domain (), connect_hack, &server);
- len = sizeof (server);
- data->pipe [0] = accept (srv, (SOCKADDR *) &client, &len);
- g_assert (data->pipe [0] != INVALID_SOCKET);
- closesocket (srv);
+ data->event_system = POLL_BACKEND;
#endif
- data->sock_to_state = mono_g_hash_table_new_type (g_direct_hash, g_direct_equal, MONO_HASH_VALUE_GC);
-
- if (data->epoll_disabled) {
- data->new_sem = CreateSemaphore (NULL, 1, 1, NULL);
- g_assert (data->new_sem != NULL);
- }
+ if (g_getenv ("MONO_DISABLE_AIO") != NULL)
+ data->event_system = POLL_BACKEND;
+ init_event_system (data);
stack_size = mono_threads_get_default_stacksize ();
- /* 128k stacks could lead to problems on 64 bit systems with large pagesizes+altstack */
mono_threads_set_default_stacksize (128 * (sizeof (gpointer) / 4) * 1024);
- if (data->epoll_disabled) {
- mono_thread_create_internal (mono_get_root_domain (), socket_io_poll_main, data, TRUE);
- }
-#ifdef HAVE_EPOLL
- else {
- mono_thread_create_internal (mono_get_root_domain (), socket_io_epoll_main, data, TRUE);
- }
-#endif
+ mono_thread_create_internal (mono_get_root_domain (), data->wait, data, TRUE);
mono_threads_set_default_stacksize (stack_size);
LeaveCriticalSection (&data->io_lock);
data->inited = 2;
}
static void
-socket_io_add_poll (MonoSocketAsyncResult *state)
+socket_io_add (MonoAsyncResult *ares, MonoSocketAsyncResult *state)
{
- int events;
- char msg [1];
MonoMList *list;
SocketIOData *data = &socket_io_data;
- int w;
+ int fd;
+ gboolean is_new;
+ int ievt;
+ socket_io_init (&socket_io_data);
if (mono_runtime_is_shutting_down () || data->inited == 3 || data->sock_to_state == NULL)
return;
-
-#if defined(PLATFORM_MACOSX) || defined(PLATFORM_BSD) || defined(HOST_WIN32) || defined(PLATFORM_SOLARIS)
- /* select() for connect() does not work well on the Mac. Bug #75436. */
- /* Bug #77637 for the BSD 6 case */
- /* Bug #78888 for the Windows case */
- if (state->operation == AIO_OP_CONNECT && state->blocking == TRUE) {
- //FIXME: increment number of threads while this one is waiting?
- threadpool_append_job (&async_io_tp, (MonoObject *) state);
- return;
- }
-#endif
- WaitForSingleObject (data->new_sem, INFINITE);
- if (data->newpfd == NULL)
- data->newpfd = g_new0 (mono_pollfd, 1);
-
- EnterCriticalSection (&data->io_lock);
- if (data->sock_to_state == NULL) {
- LeaveCriticalSection (&data->io_lock);
+ if (async_tp.pool_status == 2)
return;
- }
- /* FIXME: 64 bit issue: handle can be a pointer on windows? */
- list = mono_g_hash_table_lookup (data->sock_to_state, GINT_TO_POINTER (state->handle));
- if (list == NULL) {
- list = mono_mlist_alloc ((MonoObject*)state);
- } else {
- list = mono_mlist_append (list, (MonoObject*)state);
- }
-
- events = get_events_from_list (list);
- INIT_POLLFD (data->newpfd, GPOINTER_TO_INT (state->handle), events);
- mono_g_hash_table_replace (data->sock_to_state, GINT_TO_POINTER (state->handle), list);
- LeaveCriticalSection (&data->io_lock);
- *msg = (char) state->operation;
-#ifndef HOST_WIN32
- w = write (data->pipe [1], msg, 1);
- w = w;
-#else
- send ((SOCKET) data->pipe [1], msg, 1, 0);
-#endif
-}
-
-#ifdef HAVE_EPOLL
-static gboolean
-socket_io_add_epoll (MonoSocketAsyncResult *state)
-{
- MonoMList *list;
- SocketIOData *data = &socket_io_data;
- struct epoll_event event;
- int epoll_op, ievt;
- int fd;
-
- if (mono_runtime_is_shutting_down () || data->inited == 3 || data->sock_to_state == NULL)
- return TRUE;
+ MONO_OBJECT_SETREF (state, ares, ares);
- memset (&event, 0, sizeof (struct epoll_event));
fd = GPOINTER_TO_INT (state->handle);
EnterCriticalSection (&data->io_lock);
if (data->sock_to_state == NULL) {
LeaveCriticalSection (&data->io_lock);
- return TRUE;
+ return;
}
list = mono_g_hash_table_lookup (data->sock_to_state, GINT_TO_POINTER (fd));
if (list == NULL) {
list = mono_mlist_alloc ((MonoObject*)state);
- epoll_op = EPOLL_CTL_ADD;
+ is_new = TRUE;
} else {
list = mono_mlist_append (list, (MonoObject*)state);
- epoll_op = EPOLL_CTL_MOD;
+ is_new = FALSE;
}
- ievt = get_events_from_list (list);
- if ((ievt & MONO_POLLIN) != 0)
- event.events |= EPOLLIN;
- if ((ievt & MONO_POLLOUT) != 0)
- event.events |= EPOLLOUT;
-
mono_g_hash_table_replace (data->sock_to_state, state->handle, list);
- event.data.fd = fd;
- EPOLL_DEBUG ("%s %d with %d", epoll_op == EPOLL_CTL_ADD ? "ADD" : "MOD", fd, event.events);
- if (epoll_ctl (data->epollfd, epoll_op, fd, &event) == -1) {
- int err = errno;
- if (epoll_op == EPOLL_CTL_ADD && err == EEXIST) {
- epoll_op = EPOLL_CTL_MOD;
- if (epoll_ctl (data->epollfd, epoll_op, fd, &event) == -1) {
- g_message ("epoll_ctl(MOD): %d %s", err, g_strerror (err));
- }
- }
- }
+ ievt = get_events_from_list (list);
LeaveCriticalSection (&data->io_lock);
-
- return TRUE;
-}
-#endif
-
-static void
-socket_io_add (MonoAsyncResult *ares, MonoSocketAsyncResult *state)
-{
- if (async_tp.pool_status == 2 || mono_runtime_is_shutting_down ())
- return;
-
- socket_io_init (&socket_io_data);
-
- MONO_OBJECT_SETREF (state, ares, ares);
-#ifdef HAVE_EPOLL
- if (socket_io_data.epoll_disabled == FALSE) {
- if (socket_io_add_epoll (state))
- return;
- }
-#endif
- socket_io_add_poll (state);
+ data->modify (data->event_data, fd, state->operation, ievt, is_new);
}
#ifndef DISABLE_SOCKETS
{
if (mono_runtime_is_shutting_down ())
return FALSE;
- TP_DEBUG ("Dequeue");
mono_cq_dequeue (tp->queue, (MonoObject **) data);
if (!tp->is_io && !*data)
try_steal (data, FALSE);
ar = NULL;
data = NULL;
must_die = should_i_die (tp);
- TP_DEBUG ("Trying to get a job");
if (!must_die && (tp->is_io || !mono_wsq_local_pop (&data)))
dequeue_or_steal (tp, &data);
- TP_DEBUG ("Done trying to get a job %p", data);
n_naps = 0;
while (!must_die && !data && n_naps < 4) {
gboolean res;
- TP_DEBUG ("Waiting");
InterlockedIncrement (&tp->waiting);
#if defined(__OpenBSD__)
while ((res = mono_sem_wait (&tp->new_job, TRUE)) == -1) {// && errno == EINTR) {
if (THREAD_WANTS_A_BREAK (thread))
mono_thread_interruption_checkpoint ();
}
- TP_DEBUG ("Done waiting");
InterlockedDecrement (&tp->waiting);
if (mono_runtime_is_shutting_down ())
break;
break;
if (down || InterlockedCompareExchange (&tp->nthreads, nt - 1, nt) == nt) {
mono_perfcounter_update_value (tp->pc_nthreads, TRUE, -1);
- TP_DEBUG ("DIE");
if (!tp->is_io) {
remove_wsq (wsq);
}
--- /dev/null
+struct _tp_epoll_data {
+ int epollfd;
+};
+
+typedef struct _tp_epoll_data tp_epoll_data;
+static void tp_epoll_modify (gpointer event_data, int fd, int operation, int events, gboolean is_new);
+static void tp_epoll_shutdown (gpointer event_data);
+static void tp_epoll_wait (gpointer event_data);
+
+static gpointer
+tp_epoll_init (SocketIOData *data)
+{
+ tp_epoll_data *result;
+
+ result = g_new0 (tp_epoll_data, 1);
+#ifdef EPOLL_CLOEXEC
+ result->epollfd = epoll_create1 (EPOLL_CLOEXEC);
+#else
+ result->epollfd = epoll_create (256); /* The number does not really matter */
+ fcntl (result->epollfd, F_SETFD, FD_CLOEXEC);
+#endif
+ if (result->epollfd == -1)
+ return NULL;
+
+ data->shutdown = tp_epoll_shutdown;
+ data->modify = tp_epoll_modify;
+ data->wait = tp_epoll_wait;
+ return result;
+}
+
+static void
+tp_epoll_modify (gpointer event_data, int fd, int operation, int events, gboolean is_new)
+{
+ tp_epoll_data *data = event_data;
+ struct epoll_event evt;
+ int epoll_op;
+
+ evt.data.fd = fd;
+ if ((events & MONO_POLLIN) != 0)
+ evt.events |= EPOLLIN;
+ if ((events & MONO_POLLOUT) != 0)
+ evt.events |= EPOLLOUT;
+
+ epoll_op = (is_new) ? EPOLL_CTL_ADD : EPOLL_CTL_MOD;
+ if (epoll_ctl (data->epollfd, epoll_op, fd, &evt) == -1) {
+ int err = errno;
+ if (epoll_op == EPOLL_CTL_ADD && err == EEXIST) {
+ epoll_op = EPOLL_CTL_MOD;
+ if (epoll_ctl (data->epollfd, epoll_op, fd, &evt) == -1) {
+ g_message ("epoll_ctl(MOD): %d %s", err, g_strerror (err));
+ }
+ }
+ }
+}
+
+static void
+tp_epoll_shutdown (gpointer event_data)
+{
+ tp_epoll_data *data = event_data;
+
+ close (data->epollfd);
+ data->epollfd = -1;
+}
+
+#define EPOLL_ERRORS (EPOLLERR | EPOLLHUP)
+#define EPOLL_NEVENTS 128
+static void
+tp_epoll_wait (gpointer p)
+{
+ SocketIOData *socket_io_data;
+ int epollfd;
+ MonoInternalThread *thread;
+ struct epoll_event *events, *evt;
+ int ready = 0, i;
+ gpointer async_results [EPOLL_NEVENTS * 2]; // * 2 because each loop can add up to 2 results here
+ gint nresults;
+ tp_epoll_data *data;
+
+ socket_io_data = p;
+ data = socket_io_data->event_data;
+ epollfd = data->epollfd;
+ thread = mono_thread_internal_current ();
+ events = g_new0 (struct epoll_event, EPOLL_NEVENTS);
+
+ printf ("epoll_wait\n");
+ while (1) {
+ do {
+ if (ready == -1) {
+ if (THREAD_WANTS_A_BREAK (thread))
+ mono_thread_interruption_checkpoint ();
+ }
+ ready = epoll_wait (epollfd, events, EPOLL_NEVENTS, -1);
+ } while (ready == -1 && errno == EINTR);
+
+ if (ready == -1) {
+ int err = errno;
+ g_free (events);
+ if (err != EBADF)
+ g_warning ("epoll_wait: %d %s", err, g_strerror (err));
+
+ close (epollfd);
+ return;
+ }
+
+ EnterCriticalSection (&socket_io_data->io_lock);
+ if (socket_io_data->inited == 3) {
+ g_free (events);
+ close (epollfd);
+ LeaveCriticalSection (&socket_io_data->io_lock);
+ return; /* cleanup called */
+ }
+
+ nresults = 0;
+ for (i = 0; i < ready; i++) {
+ int fd;
+ MonoMList *list;
+ MonoObject *ares;
+
+ evt = &events [i];
+ fd = evt->data.fd;
+ list = mono_g_hash_table_lookup (socket_io_data->sock_to_state, GINT_TO_POINTER (fd));
+ if (list != NULL && (evt->events & (EPOLLIN | EPOLL_ERRORS)) != 0) {
+ ares = get_io_event (&list, MONO_POLLIN);
+ if (ares != NULL)
+ async_results [nresults++] = ares;
+ }
+
+ if (list != NULL && (evt->events & (EPOLLOUT | EPOLL_ERRORS)) != 0) {
+ ares = get_io_event (&list, MONO_POLLOUT);
+ if (ares != NULL)
+ async_results [nresults++] = ares;
+ }
+
+ if (list != NULL) {
+ mono_g_hash_table_replace (socket_io_data->sock_to_state, GINT_TO_POINTER (fd), list);
+ evt->events = get_events_from_list (list);
+ if (epoll_ctl (epollfd, EPOLL_CTL_MOD, fd, evt)) {
+ epoll_ctl (epollfd, EPOLL_CTL_ADD, fd, evt); /* ignoring error here */
+ }
+ } else {
+ mono_g_hash_table_remove (socket_io_data->sock_to_state, GINT_TO_POINTER (fd));
+ epoll_ctl (epollfd, EPOLL_CTL_DEL, fd, evt);
+ }
+ }
+ LeaveCriticalSection (&socket_io_data->io_lock);
+ threadpool_append_jobs (&async_io_tp, (MonoObject **) async_results, nresults);
+ memset (async_results, 0, sizeof (gpointer) * nresults);
+ }
+}
+#undef EPOLL_NEVENTS
--- /dev/null
+#define INIT_POLLFD(a, b, c) {(a)->fd = b; (a)->events = c; (a)->revents = 0;}
+struct _tp_poll_data {
+ int pipe [2];
+ MonoSemType new_sem;
+ mono_pollfd newpfd;
+};
+
+typedef struct _tp_poll_data tp_poll_data;
+
+static void tp_poll_shutdown (gpointer event_data);
+static void tp_poll_modify (gpointer event_data, int fd, int operation, int events, gboolean is_new);
+static void tp_poll_wait (gpointer p);
+
+#ifdef HOST_WIN32
+static void
+connect_hack (gpointer x)
+{
+ struct sockaddr_in *addr = (struct sockaddr_in *) x;
+ int count = 0;
+
+ while (connect ((SOCKET) socket_io_data.pipe [1], (SOCKADDR *) addr, sizeof (struct sockaddr_in))) {
+ Sleep (500);
+ if (++count > 3) {
+ g_warning ("Error initializing async. sockets %d.", WSAGetLastError ());
+ g_assert (WSAGetLastError ());
+ }
+ }
+}
+#endif
+
+static gpointer
+tp_poll_init (SocketIOData *data)
+{
+ tp_poll_data *result;
+#ifdef HOST_WIN32
+ struct sockaddr_in server;
+ struct sockaddr_in client;
+ SOCKET srv;
+ int len;
+#endif
+
+ result = g_new0 (tp_poll_data, 1);
+#ifndef HOST_WIN32
+ if (pipe (result->pipe) != 0) {
+ int err = errno;
+ perror ("mono");
+ g_assert (err);
+ }
+#else
+ srv = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
+ g_assert (srv != INVALID_SOCKET);
+ result->pipe [1] = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
+ g_assert (result->pipe [1] != INVALID_SOCKET);
+
+ server.sin_family = AF_INET;
+ server.sin_addr.s_addr = inet_addr ("127.0.0.1");
+ server.sin_port = 0;
+ if (bind (srv, (SOCKADDR *) &server, sizeof (server))) {
+ g_print ("%d\n", WSAGetLastError ());
+ g_assert (1 != 0);
+ }
+
+ len = sizeof (server);
+ getsockname (srv, (SOCKADDR *) &server, &len);
+ listen (srv, 1);
+ mono_thread_create (mono_get_root_domain (), connect_hack, &server);
+ len = sizeof (server);
+ result->pipe [0] = accept (srv, (SOCKADDR *) &client, &len);
+ g_assert (result->pipe [0] != INVALID_SOCKET);
+ closesocket (srv);
+#endif
+ MONO_SEM_INIT (&result->new_sem, 1);
+ data->shutdown = tp_poll_shutdown;
+ data->modify = tp_poll_modify;
+ data->wait = tp_poll_wait;
+ return result;
+}
+
+static void
+tp_poll_modify (gpointer event_data, int fd, int operation, int events, gboolean is_new)
+{
+ tp_poll_data *data = event_data;
+ char msg [1];
+ int w;
+
+ MONO_SEM_WAIT (&data->new_sem);
+ INIT_POLLFD (&data->newpfd, GPOINTER_TO_INT (fd), events);
+ *msg = (char) operation;
+#ifndef HOST_WIN32
+ w = write (data->pipe [1], msg, 1);
+#else
+ send ((SOCKET) data->pipe [1], msg, 1, 0);
+#endif
+}
+
+static void
+tp_poll_shutdown (gpointer event_data)
+{
+ tp_poll_data *data = event_data;
+
+#ifdef HOST_WIN32
+ closesocket (data->pipe [0]);
+ closesocket (data->pipe [1]);
+#else
+ if (data->pipe [0] > -1)
+ close (data->pipe [0]);
+ if (data->pipe [1] > -1)
+ close (data->pipe [1]);
+#endif
+ data->pipe [0] = -1;
+ data->pipe [1] = -1;
+ MONO_SEM_DESTROY (&data->new_sem);
+}
+
+static int
+mark_bad_fds (mono_pollfd *pfds, int nfds)
+{
+ int i, ret;
+ mono_pollfd *pfd;
+ int count = 0;
+
+ for (i = 0; i < nfds; i++) {
+ pfd = &pfds [i];
+ if (pfd->fd == -1)
+ continue;
+
+ ret = mono_poll (pfd, 1, 0);
+ if (ret == -1 && errno == EBADF) {
+ pfd->revents |= MONO_POLLNVAL;
+ count++;
+ } else if (ret == 1) {
+ count++;
+ }
+ }
+
+ return count;
+}
+
+static void
+tp_poll_wait (gpointer p)
+{
+#if MONO_SMALL_CONFIG
+#define INITIAL_POLLFD_SIZE 128
+#else
+#define INITIAL_POLLFD_SIZE 1024
+#endif
+#define POLL_ERRORS (MONO_POLLERR | MONO_POLLHUP | MONO_POLLNVAL)
+ mono_pollfd *pfds;
+ gint maxfd = 1;
+ gint allocated;
+ gint i;
+ MonoInternalThread *thread;
+ tp_poll_data *data;
+ SocketIOData *socket_io_data = p;
+ gpointer *async_results;
+ gint nresults;
+
+ thread = mono_thread_internal_current ();
+
+ data = socket_io_data->event_data;
+ allocated = INITIAL_POLLFD_SIZE;
+ pfds = g_new0 (mono_pollfd, allocated);
+ async_results = g_new0 (gpointer, allocated * 2);
+ INIT_POLLFD (pfds, data->pipe [0], MONO_POLLIN);
+ for (i = 1; i < allocated; i++)
+ INIT_POLLFD (&pfds [i], -1, 0);
+
+ printf ("poll_wait\n");
+ while (1) {
+ int nsock = 0;
+ mono_pollfd *pfd;
+ char one [1];
+ MonoMList *list;
+ MonoObject *ares;
+
+ do {
+ if (nsock == -1) {
+ if (THREAD_WANTS_A_BREAK (thread))
+ mono_thread_interruption_checkpoint ();
+ }
+
+ nsock = mono_poll (pfds, maxfd, -1);
+ } while (nsock == -1 && errno == EINTR);
+
+ /*
+ * Apart from EINTR, we only check EBADF, for the rest:
+ * EINVAL: mono_poll() 'protects' us from descriptor
+ * numbers above the limit if using select() by marking
+ * then as MONO_POLLERR. If a system poll() is being
+ * used, the number of descriptor we're passing will not
+ * be over sysconf(_SC_OPEN_MAX), as the error would have
+ * happened when opening.
+ *
+ * EFAULT: we own the memory pointed by pfds.
+ * ENOMEM: we're doomed anyway
+ *
+ */
+
+ if (nsock == -1 && errno == EBADF) {
+ pfds->revents = 0; /* Just in case... */
+ nsock = mark_bad_fds (pfds, maxfd);
+ }
+
+ if ((pfds->revents & POLL_ERRORS) != 0) {
+ /* We're supposed to die now, as the pipe has been closed */
+ g_free (pfds);
+ g_free (async_results);
+ socket_io_cleanup (socket_io_data);
+ return;
+ }
+
+ /* Got a new socket */
+ if ((pfds->revents & MONO_POLLIN) != 0) {
+ int nread;
+
+ for (i = 1; i < allocated; i++) {
+ pfd = &pfds [i];
+ if (pfd->fd == -1 || pfd->fd == data->newpfd.fd)
+ break;
+ }
+
+ if (i == allocated) {
+ mono_pollfd *oldfd;
+
+ oldfd = pfds;
+ i = allocated;
+ allocated = allocated * 2;
+ pfds = g_renew (mono_pollfd, oldfd, allocated);
+ g_free (oldfd);
+ for (; i < allocated; i++)
+ INIT_POLLFD (&pfds [i], -1, 0);
+ async_results = g_renew (gpointer, async_results, allocated * 2);
+ }
+#ifndef HOST_WIN32
+ nread = read (data->pipe [0], one, 1);
+#else
+ nread = recv ((SOCKET) data->pipe [0], one, 1, 0);
+#endif
+ if (nread <= 0) {
+ g_free (pfds);
+ g_free (async_results);
+ return; /* we're closed */
+ }
+
+ INIT_POLLFD (&pfds [i], data->newpfd.fd, data->newpfd.events);
+ memset (&data->newpfd, 0, sizeof (mono_pollfd));
+ MONO_SEM_POST (&data->new_sem);
+ if (i >= maxfd)
+ maxfd = i + 1;
+ nsock--;
+ }
+
+ if (nsock == 0)
+ continue;
+
+ EnterCriticalSection (&socket_io_data->io_lock);
+ if (socket_io_data->inited == 3) {
+ g_free (pfds);
+ g_free (async_results);
+ LeaveCriticalSection (&socket_io_data->io_lock);
+ return; /* cleanup called */
+ }
+
+ nresults = 0;
+ for (i = 1; i < maxfd && nsock > 0; i++) {
+ pfd = &pfds [i];
+ if (pfd->fd == -1 || pfd->revents == 0)
+ continue;
+
+ nsock--;
+ list = mono_g_hash_table_lookup (socket_io_data->sock_to_state, GINT_TO_POINTER (pfd->fd));
+ if (list != NULL && (pfd->revents & (MONO_POLLIN | POLL_ERRORS)) != 0) {
+ ares = get_io_event (&list, MONO_POLLIN);
+ if (ares != NULL)
+ async_results [nresults++] = ares;
+ }
+
+ if (list != NULL && (pfd->revents & (MONO_POLLOUT | POLL_ERRORS)) != 0) {
+ ares = get_io_event (&list, MONO_POLLOUT);
+ if (ares != NULL)
+ async_results [nresults++] = ares;
+ }
+
+ if (list != NULL) {
+ mono_g_hash_table_replace (socket_io_data->sock_to_state, GINT_TO_POINTER (pfd->fd), list);
+ pfd->events = get_events_from_list (list);
+ } else {
+ mono_g_hash_table_remove (socket_io_data->sock_to_state, GINT_TO_POINTER (pfd->fd));
+ pfd->fd = -1;
+ if (i == maxfd - 1)
+ maxfd--;
+ }
+ }
+ LeaveCriticalSection (&socket_io_data->io_lock);
+ threadpool_append_jobs (&async_io_tp, (MonoObject **) async_results, nresults);
+ memset (async_results, 0, sizeof (gpointer) * nresults);
+ }
+}
+