[threadpool] Add support for kqueue asynch. IO.
authorGonzalo Paniagua Javier <gonzalo.mono@gmail.com>
Sat, 19 Feb 2011 06:20:18 +0000 (01:20 -0500)
committerGonzalo Paniagua Javier <gonzalo.mono@gmail.com>
Sat, 19 Feb 2011 06:30:34 +0000 (01:30 -0500)
configure.in
mono/metadata/Makefile.am
mono/metadata/threadpool.c
mono/metadata/tpool-kqueue.c [new file with mode: 0644]

index 0b45466f88a44b091f04faf53b4f0a639078997d..572c37bacc7eaa1e3bf25f797374f5386e5addaa 100644 (file)
@@ -1547,6 +1547,12 @@ if test x$target_win32 = xno; then
                AC_DEFINE(HAVE_EPOLL, 1, [epoll supported])
        fi
 
+       havekqueue=no
+        AC_CHECK_FUNCS(kqueue, , AC_MSG_CHECKING(for kqueue in sys/event.h)
+                AC_TRY_LINK([#include <sys/event.h>], 
+                [ kqueue(); ], 
+                AC_DEFINE(HAVE_KQUEUE, 1, [Have kqueue]) AC_MSG_RESULT(yes),
+                AC_MSG_RESULT(no)))
        dnl ******************************
        dnl *** Checks for SIOCGIFCONF ***
        dnl ******************************
index 61855e37e0ad76db415a97a2b61f1e0af2fa3c19..11c656d43ae79e2df5e002e6e80ea22c3d53650c 100644 (file)
@@ -291,5 +291,5 @@ endif
 endif
 
 EXTRA_DIST = make-bundle.pl sample-bundle $(win32_sources) $(unix_sources) $(null_sources) $(sgen_sources) runtime.h \
-               tpool-poll.c tpool-epoll.c
+               tpool-poll.c tpool-epoll.c tpool-kqueue.c
 
index 984d2a08ae56be1706077a1837ed3f8feef9d222..0ca4889825800f75f5fa54929e6227d50dff50ba 100644 (file)
 #ifdef HAVE_EPOLL
 #include <sys/epoll.h>
 #endif
+#ifdef HAVE_KQUEUE
+#include <sys/event.h>
+#endif
+
 
 #ifndef DISABLE_SOCKETS
 #include "mono/io-layer/socket-wrappers.h"
@@ -69,7 +73,8 @@ static volatile int tp_inited;
 
 enum {
        POLL_BACKEND,
-       EPOLL_BACKEND
+       EPOLL_BACKEND,
+       KQUEUE_BACKEND
 };
 
 typedef struct {
@@ -180,6 +185,8 @@ enum {
 #include <mono/metadata/tpool-poll.c>
 #ifdef HAVE_EPOLL
 #include <mono/metadata/tpool-epoll.c>
+#elif defined(HAVE_KQUEUE)
+#include <mono/metadata/tpool-kqueue.c>
 #endif
 /*
  * Functions to check whenever a class is given system class. We need to cache things in MonoDomain since some of the
@@ -473,6 +480,9 @@ init_event_system (SocketIOData *data)
 #ifdef HAVE_EPOLL
        if (data->event_system == EPOLL_BACKEND)
                data->event_data = tp_epoll_init (data);
+#elif defined(HAVE_KQUEUE)
+       if (data->event_system == KQUEUE_BACKEND)
+               data->event_data = tp_kqueue_init (data);
 #endif
        if (data->event_system == POLL_BACKEND)
                data->event_data = tp_poll_init (data);
@@ -500,6 +510,8 @@ socket_io_init (SocketIOData *data)
        data->sock_to_state = mono_g_hash_table_new_type (g_direct_hash, g_direct_equal, MONO_HASH_VALUE_GC);
 #ifdef HAVE_EPOLL
        data->event_system = EPOLL_BACKEND;
+#elif defined(HAVE_KQUEUE)
+       data->event_system = KQUEUE_BACKEND;
 #else
        data->event_system = POLL_BACKEND;
 #endif
diff --git a/mono/metadata/tpool-kqueue.c b/mono/metadata/tpool-kqueue.c
new file mode 100644 (file)
index 0000000..1550cef
--- /dev/null
@@ -0,0 +1,158 @@
+/*
+ * tpool-kqueue.c: kqueue related stuff
+ *
+ * Authors:
+ *   Gonzalo Paniagua Javier (gonzalo@ximian.com)
+ *
+ * Copyright 2011 Novell, Inc (http://www.novell.com)
+ */
+
+struct _tp_kqueue_data {
+       int fd;
+};
+
+typedef struct _tp_kqueue_data tp_kqueue_data;
+static void tp_kqueue_modify (gpointer event_data, int fd, int operation, int events, gboolean is_new);
+static void tp_kqueue_shutdown (gpointer event_data);
+static void tp_kqueue_wait (gpointer event_data);
+
+static gpointer
+tp_kqueue_init (SocketIOData *data)
+{
+       tp_kqueue_data *result;
+
+       result = g_new0 (tp_kqueue_data, 1);
+       result->fd = kqueue ();
+       if (result->fd == -1)
+               return NULL;
+
+       data->shutdown = tp_kqueue_shutdown;
+       data->modify = tp_kqueue_modify;
+       data->wait = tp_kqueue_wait;
+       return result;
+}
+
+static void
+kevent_change (int kfd, struct kevent *evt, const char *error_str)
+{
+       if (kevent (kfd, evt, 1, NULL, 0, NULL) == -1) {
+               int err = errno;        
+               g_message ("kqueue(%s): %d %s", error_str, err, g_strerror (err));
+       }
+}
+
+static void
+tp_kqueue_modify (gpointer event_data, int fd, int operation, int events, gboolean is_new)
+{
+       tp_kqueue_data *data = event_data;
+       struct kevent evt;
+
+       if ((events & MONO_POLLIN) != 0) {
+               EV_SET (&evt, fd, EVFILT_READ, EV_ADD | EV_ENABLE | EV_ONESHOT, 0, 0, 0);
+               kevent_change (data->fd, &evt, "ADD read");
+       }
+
+       if ((events & MONO_POLLOUT) != 0) {
+               EV_SET (&evt, fd, EVFILT_WRITE, EV_ADD | EV_ENABLE | EV_ONESHOT, 0, 0, 0);
+               kevent_change (data->fd, &evt, "ADD write");
+       }
+}
+
+static void
+tp_kqueue_shutdown (gpointer event_data)
+{
+       tp_kqueue_data *data = event_data;
+
+       close (data->fd);
+       g_free (data);
+}
+
+#define KQUEUE_NEVENTS 128
+static void
+tp_kqueue_wait (gpointer p)
+{
+       SocketIOData *socket_io_data;
+       int kfd;
+       MonoInternalThread *thread;
+       struct kevent *events, *evt;
+       int ready = 0, i;
+       gpointer async_results [KQUEUE_NEVENTS * 2]; // * 2 because each loop can add up to 2 results here
+       gint nresults;
+       tp_kqueue_data *data;
+
+       socket_io_data = p;
+       data = socket_io_data->event_data;
+       kfd = data->fd;
+       thread = mono_thread_internal_current ();
+       events = g_new0 (struct kevent, KQUEUE_NEVENTS);
+
+       while (1) {
+               do {
+                       if (ready == -1) {
+                               if (THREAD_WANTS_A_BREAK (thread))
+                                       mono_thread_interruption_checkpoint ();
+                       }
+                       ready = kevent (kfd, NULL, 0, events, KQUEUE_NEVENTS, NULL);
+               } while (ready == -1 && errno == EINTR);
+
+               if (ready == -1) {
+                       int err = errno;
+                       g_free (events);
+                       if (err != EBADF)
+                               g_warning ("kevent wait: %d %s", err, g_strerror (err));
+
+                       return;
+               }
+
+               EnterCriticalSection (&socket_io_data->io_lock);
+               if (socket_io_data->inited == 3) {
+                       g_free (events);
+                       LeaveCriticalSection (&socket_io_data->io_lock);
+                       return; /* cleanup called */
+               }
+
+               nresults = 0;
+               for (i = 0; i < ready; i++) {
+                       int fd;
+                       MonoMList *list;
+                       MonoObject *ares;
+
+                       evt = &events [i];
+                       fd = evt->ident;
+                       list = mono_g_hash_table_lookup (socket_io_data->sock_to_state, GINT_TO_POINTER (fd));
+                       if (list != NULL && (evt->filter == EVFILT_READ || (evt->flags & EV_ERROR) != 0)) {
+                               ares = get_io_event (&list, MONO_POLLIN);
+                               if (ares != NULL)
+                                       async_results [nresults++] = ares;
+                       }
+                       if (list != NULL && (evt->filter == EVFILT_WRITE || (evt->flags & EV_ERROR) != 0)) {
+                               ares = get_io_event (&list, MONO_POLLOUT);
+                               if (ares != NULL)
+                                       async_results [nresults++] = ares;
+                       }
+
+                       if (list != NULL) {
+                               int p;
+
+                               mono_g_hash_table_replace (socket_io_data->sock_to_state, GINT_TO_POINTER (fd), list);
+                               p = get_events_from_list (list);
+                               if (evt->filter == EVFILT_READ && (p & MONO_POLLIN) != 0) {
+                                       EV_SET (evt, fd, EVFILT_READ, EV_ADD | EV_ENABLE | EV_ONESHOT, 0, 0, 0);
+                                       kevent_change (kfd, evt, "READD read");
+                               }
+
+                               if (evt->filter == EVFILT_WRITE && (p & MONO_POLLOUT) != 0) {
+                                       EV_SET (evt, fd, EVFILT_WRITE, EV_ADD | EV_ENABLE | EV_ONESHOT, 0, 0, 0);
+                                       kevent_change (kfd, evt, "READD write");
+                               }
+                       } else {
+                               mono_g_hash_table_remove (socket_io_data->sock_to_state, GINT_TO_POINTER (fd));
+                       }
+               }
+               LeaveCriticalSection (&socket_io_data->io_lock);
+               threadpool_append_jobs (&async_io_tp, (MonoObject **) async_results, nresults);
+               memset (async_results, 0, sizeof (gpointer) * nresults);
+       }
+}
+#undef KQUEUE_NEVENTS
+