Added tests for Task.WhenAll w/ empty list
[mono.git] / mono / metadata / tpool-poll.c
1 /*
2  * tpool-poll.c: poll related stuff
3  *
4  * Authors:
5  *   Dietmar Maurer (dietmar@ximian.com)
6  *   Gonzalo Paniagua Javier (gonzalo@ximian.com)
7  *
8  * Copyright 2001-2003 Ximian, Inc (http://www.ximian.com)
9  * Copyright 2004-2011 Novell, Inc (http://www.novell.com)
10  */
11
12 #define INIT_POLLFD(a, b, c) {(a)->fd = b; (a)->events = c; (a)->revents = 0;}
13 struct _tp_poll_data {
14         int pipe [2];
15         MonoSemType new_sem;
16         mono_pollfd newpfd;
17 };
18
19 typedef struct _tp_poll_data tp_poll_data;
20
21 static void tp_poll_shutdown (gpointer event_data);
22 static void tp_poll_modify (gpointer event_data, int fd, int operation, int events, gboolean is_new);
23 static void tp_poll_wait (gpointer p);
24
25 static gpointer
26 tp_poll_init (SocketIOData *data)
27 {
28         tp_poll_data *result;
29 #ifdef HOST_WIN32
30         struct sockaddr_in client;
31         struct sockaddr_in server;
32         SOCKET srv;
33         int len;
34 #endif
35
36         result = g_new0 (tp_poll_data, 1);
37 #ifndef HOST_WIN32
38         if (pipe (result->pipe) != 0) {
39                 int err = errno;
40                 perror ("mono");
41                 g_assert (err);
42         }
43 #else
44         srv = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
45         g_assert (srv != INVALID_SOCKET);
46         result->pipe [1] = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
47         g_assert (result->pipe [1] != INVALID_SOCKET);
48
49         server.sin_family = AF_INET;
50         server.sin_addr.s_addr = inet_addr ("127.0.0.1");
51         server.sin_port = 0;
52         if (bind (srv, (SOCKADDR *) &server, sizeof (struct sockaddr_in))) {
53                 g_print ("%d\n", WSAGetLastError ());
54                 g_assert (1 != 0);
55         }
56
57         len = sizeof (server);
58         getsockname (srv, (SOCKADDR *) &server, &len);
59         listen (srv, 1);
60         if (connect ((SOCKET) result->pipe [1], (SOCKADDR *) &server, sizeof (server)) == SOCKET_ERROR) {
61                 g_print ("%d\n", WSAGetLastError ());
62                 g_assert (1 != 0);
63         }
64         len = sizeof (client);
65         result->pipe [0] = accept (srv, (SOCKADDR *) &client, &len);
66         g_assert (result->pipe [0] != INVALID_SOCKET);
67         closesocket (srv);
68 #endif
69         MONO_SEM_INIT (&result->new_sem, 1);
70         data->shutdown = tp_poll_shutdown;
71         data->modify = tp_poll_modify;
72         data->wait = tp_poll_wait;
73         return result;
74 }
75
76 static void
77 tp_poll_modify (gpointer event_data, int fd, int operation, int events, gboolean is_new)
78 {
79         tp_poll_data *data = event_data;
80         char msg [1];
81         int unused;
82
83         MONO_SEM_WAIT (&data->new_sem);
84         INIT_POLLFD (&data->newpfd, GPOINTER_TO_INT (fd), events);
85         *msg = (char) operation;
86 #ifndef HOST_WIN32
87         unused = write (data->pipe [1], msg, 1);
88 #else
89         unused = send ((SOCKET) data->pipe [1], msg, 1, 0);
90 #endif
91 }
92
93 static void
94 tp_poll_shutdown (gpointer event_data)
95 {
96         tp_poll_data *data = event_data;
97
98 #ifdef HOST_WIN32
99         closesocket (data->pipe [0]);
100         closesocket (data->pipe [1]);
101 #else
102         if (data->pipe [0] > -1)
103                 close (data->pipe [0]);
104         if (data->pipe [1] > -1)
105                 close (data->pipe [1]);
106 #endif
107         data->pipe [0] = -1;
108         data->pipe [1] = -1;
109         MONO_SEM_DESTROY (&data->new_sem);
110 }
111
112 static int
113 mark_bad_fds (mono_pollfd *pfds, int nfds)
114 {
115         int i, ret;
116         mono_pollfd *pfd;
117         int count = 0;
118
119         for (i = 0; i < nfds; i++) {
120                 pfd = &pfds [i];
121                 if (pfd->fd == -1)
122                         continue;
123
124                 ret = mono_poll (pfd, 1, 0);
125                 if (ret == -1 && errno == EBADF) {
126                         pfd->revents |= MONO_POLLNVAL;
127                         count++;
128                 } else if (ret == 1) {
129                         count++;
130                 }
131         }
132
133         return count;
134 }
135
136 static void
137 tp_poll_wait (gpointer p)
138 {
139 #if MONO_SMALL_CONFIG
140 #define INITIAL_POLLFD_SIZE     128
141 #else
142 #define INITIAL_POLLFD_SIZE     1024
143 #endif
144 #define POLL_ERRORS (MONO_POLLERR | MONO_POLLHUP | MONO_POLLNVAL)
145
146 #ifdef DISABLE_SOCKETS
147 #define socket_io_cleanup(x)
148 #endif
149         mono_pollfd *pfds;
150         gint maxfd = 1;
151         gint allocated;
152         gint i;
153         MonoInternalThread *thread;
154         tp_poll_data *data;
155         SocketIOData *socket_io_data = p;
156         MonoPtrArray async_results;
157         gint nresults;
158
159         thread = mono_thread_internal_current ();
160
161         data = socket_io_data->event_data;
162         allocated = INITIAL_POLLFD_SIZE;
163         pfds = g_new0 (mono_pollfd, allocated);
164         mono_ptr_array_init (async_results, allocated * 2);
165         INIT_POLLFD (pfds, data->pipe [0], MONO_POLLIN);
166         for (i = 1; i < allocated; i++)
167                 INIT_POLLFD (&pfds [i], -1, 0);
168
169         while (1) {
170                 int nsock = 0;
171                 mono_pollfd *pfd;
172                 char one [1];
173                 MonoMList *list;
174                 MonoObject *ares;
175
176                 mono_gc_set_skip_thread (TRUE);
177
178                 do {
179                         if (nsock == -1) {
180                                 if (THREAD_WANTS_A_BREAK (thread))
181                                         mono_thread_interruption_checkpoint ();
182                         }
183
184                         nsock = mono_poll (pfds, maxfd, -1);
185                 } while (nsock == -1 && errno == EINTR);
186
187                 mono_gc_set_skip_thread (FALSE);
188
189                 /* 
190                  * Apart from EINTR, we only check EBADF, for the rest:
191                  *  EINVAL: mono_poll() 'protects' us from descriptor
192                  *      numbers above the limit if using select() by marking
193                  *      then as MONO_POLLERR.  If a system poll() is being
194                  *      used, the number of descriptor we're passing will not
195                  *      be over sysconf(_SC_OPEN_MAX), as the error would have
196                  *      happened when opening.
197                  *
198                  *  EFAULT: we own the memory pointed by pfds.
199                  *  ENOMEM: we're doomed anyway
200                  *
201                  */
202
203                 if (nsock == -1 && errno == EBADF) {
204                         pfds->revents = 0; /* Just in case... */
205                         nsock = mark_bad_fds (pfds, maxfd);
206                 }
207
208                 if ((pfds->revents & POLL_ERRORS) != 0) {
209                         /* We're supposed to die now, as the pipe has been closed */
210                         g_free (pfds);
211                         mono_ptr_array_destroy (async_results);
212                         socket_io_cleanup (socket_io_data);
213                         return;
214                 }
215
216                 /* Got a new socket */
217                 if ((pfds->revents & MONO_POLLIN) != 0) {
218                         int nread;
219                         gboolean found = FALSE;
220
221                         for (i = 1; i < allocated; i++) {
222                                 pfd = &pfds [i];
223                                 if (pfd->fd == data->newpfd.fd) {
224                                         found = TRUE;
225                                         break;
226                                 }
227                         }
228
229                         if (!found) {
230                                 for (i = 1; i < allocated; i++) {
231                                         pfd = &pfds [i];
232                                         if (pfd->fd == -1)
233                                                 break;
234                                 }
235                         }
236
237                         if (i == allocated) {
238                                 mono_pollfd *oldfd;
239
240                                 oldfd = pfds;
241                                 i = allocated;
242                                 allocated = allocated * 2;
243                                 pfds = g_renew (mono_pollfd, oldfd, allocated);
244                                 g_free (oldfd);
245                                 for (; i < allocated; i++)
246                                         INIT_POLLFD (&pfds [i], -1, 0);
247                                 //async_results = g_renew (gpointer, async_results, allocated * 2);
248                         }
249 #ifndef HOST_WIN32
250                         nread = read (data->pipe [0], one, 1);
251 #else
252                         nread = recv ((SOCKET) data->pipe [0], one, 1, 0);
253 #endif
254                         if (nread <= 0) {
255                                 g_free (pfds);
256                                 mono_ptr_array_destroy (async_results);
257                                 return; /* we're closed */
258                         }
259
260                         INIT_POLLFD (&pfds [i], data->newpfd.fd, data->newpfd.events);
261                         memset (&data->newpfd, 0, sizeof (mono_pollfd));
262                         MONO_SEM_POST (&data->new_sem);
263                         if (i >= maxfd)
264                                 maxfd = i + 1;
265                         nsock--;
266                 }
267
268                 if (nsock == 0)
269                         continue;
270
271                 EnterCriticalSection (&socket_io_data->io_lock);
272                 if (socket_io_data->inited == 3) {
273                         g_free (pfds);
274                         mono_ptr_array_destroy (async_results);
275                         LeaveCriticalSection (&socket_io_data->io_lock);
276                         return; /* cleanup called */
277                 }
278
279                 nresults = 0;
280                 mono_ptr_array_clear (async_results);
281
282                 for (i = 1; i < maxfd && nsock > 0; i++) {
283                         pfd = &pfds [i];
284                         if (pfd->fd == -1 || pfd->revents == 0)
285                                 continue;
286
287                         nsock--;
288                         list = mono_g_hash_table_lookup (socket_io_data->sock_to_state, GINT_TO_POINTER (pfd->fd));
289                         if (list != NULL && (pfd->revents & (MONO_POLLIN | POLL_ERRORS)) != 0) {
290                                 ares = get_io_event (&list, MONO_POLLIN);
291                                 if (ares != NULL) {
292                                         mono_ptr_array_append (async_results, ares);
293                                         ++nresults;
294                                 }
295                         }
296
297                         if (list != NULL && (pfd->revents & (MONO_POLLOUT | POLL_ERRORS)) != 0) {
298                                 ares = get_io_event (&list, MONO_POLLOUT);
299                                 if (ares != NULL) {
300                                         mono_ptr_array_append (async_results, ares);
301                                         ++nresults;
302                                 }
303                         }
304
305                         if (list != NULL) {
306                                 mono_g_hash_table_replace (socket_io_data->sock_to_state, GINT_TO_POINTER (pfd->fd), list);
307                                 pfd->events = get_events_from_list (list);
308                         } else {
309                                 mono_g_hash_table_remove (socket_io_data->sock_to_state, GINT_TO_POINTER (pfd->fd));
310                                 pfd->fd = -1;
311                                 if (i == maxfd - 1)
312                                         maxfd--;
313                         }
314                 }
315                 LeaveCriticalSection (&socket_io_data->io_lock);
316                 threadpool_append_jobs (&async_io_tp, (MonoObject **) async_results.data, nresults);
317                 mono_ptr_array_clear (async_results);
318         }
319 }
320