Added tests for Task.WhenAll w/ empty list
[mono.git] / mono / metadata / tpool-kqueue.c
1 /*
2  * tpool-kqueue.c: kqueue related stuff
3  *
4  * Authors:
5  *   Gonzalo Paniagua Javier (gonzalo@ximian.com)
6  *
7  * Copyright 2011 Novell, Inc (http://www.novell.com)
8  */
9
10 struct _tp_kqueue_data {
11         int fd;
12 };
13
14 typedef struct _tp_kqueue_data tp_kqueue_data;
15 static void tp_kqueue_modify (gpointer event_data, int fd, int operation, int events, gboolean is_new);
16 static void tp_kqueue_shutdown (gpointer event_data);
17 static void tp_kqueue_wait (gpointer event_data);
18
19 static gpointer
20 tp_kqueue_init (SocketIOData *data)
21 {
22         tp_kqueue_data *result;
23
24         result = g_new0 (tp_kqueue_data, 1);
25         result->fd = kqueue ();
26         if (result->fd == -1)
27                 return NULL;
28
29         data->shutdown = tp_kqueue_shutdown;
30         data->modify = tp_kqueue_modify;
31         data->wait = tp_kqueue_wait;
32         return result;
33 }
34
35 static void
36 kevent_change (int kfd, struct kevent *evt, const char *error_str)
37 {
38         if (kevent (kfd, evt, 1, NULL, 0, NULL) == -1) {
39                 int err = errno;
40                 g_message ("kqueue(%s): %d %s", error_str, err, g_strerror (err));
41         }
42 }
43
44 static void
45 tp_kqueue_modify (gpointer event_data, int fd, int operation, int events, gboolean is_new)
46 {
47         tp_kqueue_data *data = event_data;
48         struct kevent evt;
49
50         memset (&evt, 0, sizeof (evt));
51         if ((events & MONO_POLLIN) != 0) {
52                 EV_SET (&evt, fd, EVFILT_READ, EV_ADD | EV_ENABLE | EV_ONESHOT, 0, 0, 0);
53                 kevent_change (data->fd, &evt, "ADD read");
54         }
55
56         if ((events & MONO_POLLOUT) != 0) {
57                 EV_SET (&evt, fd, EVFILT_WRITE, EV_ADD | EV_ENABLE | EV_ONESHOT, 0, 0, 0);
58                 kevent_change (data->fd, &evt, "ADD write");
59         }
60 }
61
62 static void
63 tp_kqueue_shutdown (gpointer event_data)
64 {
65         tp_kqueue_data *data = event_data;
66
67         close (data->fd);
68         g_free (data);
69 }
70
71 #define KQUEUE_NEVENTS  128
72 static void
73 tp_kqueue_wait (gpointer p)
74 {
75         SocketIOData *socket_io_data;
76         int kfd;
77         MonoInternalThread *thread;
78         struct kevent *events, *evt;
79         int ready = 0, i;
80         gpointer async_results [KQUEUE_NEVENTS * 2]; // * 2 because each loop can add up to 2 results here
81         gint nresults;
82         tp_kqueue_data *data;
83
84         socket_io_data = p;
85         data = socket_io_data->event_data;
86         kfd = data->fd;
87         thread = mono_thread_internal_current ();
88         events = g_new0 (struct kevent, KQUEUE_NEVENTS);
89
90         while (1) {
91         
92                 mono_gc_set_skip_thread (TRUE);
93
94                 do {
95                         if (ready == -1) {
96                                 if (THREAD_WANTS_A_BREAK (thread))
97                                         mono_thread_interruption_checkpoint ();
98                         }
99                         ready = kevent (kfd, NULL, 0, events, KQUEUE_NEVENTS, NULL);
100                 } while (ready == -1 && errno == EINTR);
101
102                 mono_gc_set_skip_thread (FALSE);
103
104                 if (ready == -1) {
105                         int err = errno;
106                         g_free (events);
107                         if (err != EBADF)
108                                 g_warning ("kevent wait: %d %s", err, g_strerror (err));
109
110                         return;
111                 }
112
113                 EnterCriticalSection (&socket_io_data->io_lock);
114                 if (socket_io_data->inited == 3) {
115                         g_free (events);
116                         LeaveCriticalSection (&socket_io_data->io_lock);
117                         return; /* cleanup called */
118                 }
119
120                 nresults = 0;
121                 for (i = 0; i < ready; i++) {
122                         int fd;
123                         MonoMList *list;
124                         MonoObject *ares;
125
126                         evt = &events [i];
127                         fd = evt->ident;
128                         list = mono_g_hash_table_lookup (socket_io_data->sock_to_state, GINT_TO_POINTER (fd));
129                         if (list != NULL && (evt->filter == EVFILT_READ || (evt->flags & EV_ERROR) != 0)) {
130                                 ares = get_io_event (&list, MONO_POLLIN);
131                                 if (ares != NULL)
132                                         async_results [nresults++] = ares;
133                         }
134                         if (list != NULL && (evt->filter == EVFILT_WRITE || (evt->flags & EV_ERROR) != 0)) {
135                                 ares = get_io_event (&list, MONO_POLLOUT);
136                                 if (ares != NULL)
137                                         async_results [nresults++] = ares;
138                         }
139
140                         if (list != NULL) {
141                                 int p;
142
143                                 mono_g_hash_table_replace (socket_io_data->sock_to_state, GINT_TO_POINTER (fd), list);
144                                 p = get_events_from_list (list);
145                                 if (evt->filter == EVFILT_READ && (p & MONO_POLLIN) != 0) {
146                                         EV_SET (evt, fd, EVFILT_READ, EV_ADD | EV_ENABLE | EV_ONESHOT, 0, 0, 0);
147                                         kevent_change (kfd, evt, "READD read");
148                                 }
149
150                                 if (evt->filter == EVFILT_WRITE && (p & MONO_POLLOUT) != 0) {
151                                         EV_SET (evt, fd, EVFILT_WRITE, EV_ADD | EV_ENABLE | EV_ONESHOT, 0, 0, 0);
152                                         kevent_change (kfd, evt, "READD write");
153                                 }
154                         } else {
155                                 mono_g_hash_table_remove (socket_io_data->sock_to_state, GINT_TO_POINTER (fd));
156                         }
157                 }
158                 LeaveCriticalSection (&socket_io_data->io_lock);
159                 threadpool_append_jobs (&async_io_tp, (MonoObject **) async_results, nresults);
160                 mono_gc_bzero (async_results, sizeof (gpointer) * nresults);
161         }
162 }
163 #undef KQUEUE_NEVENTS
164