[threadpool] Split the socket code in separate files.
[mono.git] / mono / metadata / tpool-epoll.c
diff --git a/mono/metadata/tpool-epoll.c b/mono/metadata/tpool-epoll.c
new file mode 100644 (file)
index 0000000..12431f0
--- /dev/null
@@ -0,0 +1,150 @@
+struct _tp_epoll_data {
+       int epollfd;
+};
+
+typedef struct _tp_epoll_data tp_epoll_data;
+static void tp_epoll_modify (gpointer event_data, int fd, int operation, int events, gboolean is_new);
+static void tp_epoll_shutdown (gpointer event_data);
+static void tp_epoll_wait (gpointer event_data);
+
+static gpointer
+tp_epoll_init (SocketIOData *data)
+{
+       tp_epoll_data *result;
+
+       result = g_new0 (tp_epoll_data, 1);
+#ifdef EPOLL_CLOEXEC
+       result->epollfd = epoll_create1 (EPOLL_CLOEXEC);
+#else
+       result->epollfd = epoll_create (256); /* The number does not really matter */
+       fcntl (result->epollfd, F_SETFD, FD_CLOEXEC);
+#endif
+       if (result->epollfd == -1)
+               return NULL;
+
+       data->shutdown = tp_epoll_shutdown;
+       data->modify = tp_epoll_modify;
+       data->wait = tp_epoll_wait;
+       return result;
+}
+
+static void
+tp_epoll_modify (gpointer event_data, int fd, int operation, int events, gboolean is_new)
+{
+       tp_epoll_data *data = event_data;
+       struct epoll_event evt;
+       int epoll_op;
+
+       evt.data.fd = fd;
+       if ((events & MONO_POLLIN) != 0)
+               evt.events |= EPOLLIN;
+       if ((events & MONO_POLLOUT) != 0)
+               evt.events |= EPOLLOUT;
+
+       epoll_op = (is_new) ? EPOLL_CTL_ADD : EPOLL_CTL_MOD;
+       if (epoll_ctl (data->epollfd, epoll_op, fd, &evt) == -1) {
+               int err = errno;
+               if (epoll_op == EPOLL_CTL_ADD && err == EEXIST) {
+                       epoll_op = EPOLL_CTL_MOD;
+                       if (epoll_ctl (data->epollfd, epoll_op, fd, &evt) == -1) {
+                               g_message ("epoll_ctl(MOD): %d %s", err, g_strerror (err));
+                       }
+               }
+       }
+}
+
+static void
+tp_epoll_shutdown (gpointer event_data)
+{
+       tp_epoll_data *data = event_data;
+
+       close (data->epollfd);
+       data->epollfd = -1;
+}
+
+#define EPOLL_ERRORS (EPOLLERR | EPOLLHUP)
+#define EPOLL_NEVENTS  128
+static void
+tp_epoll_wait (gpointer p)
+{
+       SocketIOData *socket_io_data;
+       int epollfd;
+       MonoInternalThread *thread;
+       struct epoll_event *events, *evt;
+       int ready = 0, i;
+       gpointer async_results [EPOLL_NEVENTS * 2]; // * 2 because each loop can add up to 2 results here
+       gint nresults;
+       tp_epoll_data *data;
+
+       socket_io_data = p;
+       data = socket_io_data->event_data;
+       epollfd = data->epollfd;
+       thread = mono_thread_internal_current ();
+       events = g_new0 (struct epoll_event, EPOLL_NEVENTS);
+
+       printf ("epoll_wait\n");
+       while (1) {
+               do {
+                       if (ready == -1) {
+                               if (THREAD_WANTS_A_BREAK (thread))
+                                       mono_thread_interruption_checkpoint ();
+                       }
+                       ready = epoll_wait (epollfd, events, EPOLL_NEVENTS, -1);
+               } while (ready == -1 && errno == EINTR);
+
+               if (ready == -1) {
+                       int err = errno;
+                       g_free (events);
+                       if (err != EBADF)
+                               g_warning ("epoll_wait: %d %s", err, g_strerror (err));
+
+                       close (epollfd);
+                       return;
+               }
+
+               EnterCriticalSection (&socket_io_data->io_lock);
+               if (socket_io_data->inited == 3) {
+                       g_free (events);
+                       close (epollfd);
+                       LeaveCriticalSection (&socket_io_data->io_lock);
+                       return; /* cleanup called */
+               }
+
+               nresults = 0;
+               for (i = 0; i < ready; i++) {
+                       int fd;
+                       MonoMList *list;
+                       MonoObject *ares;
+
+                       evt = &events [i];
+                       fd = evt->data.fd;
+                       list = mono_g_hash_table_lookup (socket_io_data->sock_to_state, GINT_TO_POINTER (fd));
+                       if (list != NULL && (evt->events & (EPOLLIN | EPOLL_ERRORS)) != 0) {
+                               ares = get_io_event (&list, MONO_POLLIN);
+                               if (ares != NULL)
+                                       async_results [nresults++] = ares;
+                       }
+
+                       if (list != NULL && (evt->events & (EPOLLOUT | EPOLL_ERRORS)) != 0) {
+                               ares = get_io_event (&list, MONO_POLLOUT);
+                               if (ares != NULL)
+                                       async_results [nresults++] = ares;
+                       }
+
+                       if (list != NULL) {
+                               mono_g_hash_table_replace (socket_io_data->sock_to_state, GINT_TO_POINTER (fd), list);
+                               evt->events = get_events_from_list (list);
+                               if (epoll_ctl (epollfd, EPOLL_CTL_MOD, fd, evt)) {
+                                       epoll_ctl (epollfd, EPOLL_CTL_ADD, fd, evt); /* ignoring error here */
+                               }
+                       } else {
+                               mono_g_hash_table_remove (socket_io_data->sock_to_state, GINT_TO_POINTER (fd));
+                               epoll_ctl (epollfd, EPOLL_CTL_DEL, fd, evt);
+                       }
+               }
+               LeaveCriticalSection (&socket_io_data->io_lock);
+               threadpool_append_jobs (&async_io_tp, (MonoObject **) async_results, nresults);
+               memset (async_results, 0, sizeof (gpointer) * nresults);
+       }
+}
+#undef EPOLL_NEVENTS