Merge pull request #1633 from BrzVlad/fix-w32-pinvoke
[mono.git] / mono / metadata / tpool-poll.c
1 /*
2  * tpool-poll.c: poll related stuff
3  *
4  * Authors:
5  *   Dietmar Maurer (dietmar@ximian.com)
6  *   Gonzalo Paniagua Javier (gonzalo@ximian.com)
7  *
8  * Copyright 2001-2003 Ximian, Inc (http://www.ximian.com)
9  * Copyright 2004-2011 Novell, Inc (http://www.novell.com)
10  */
11 #include <config.h>
12 #include <glib.h>
13 #include <errno.h>
14
15 #include <mono/metadata/mono-ptr-array.h>
16 #include <mono/metadata/threadpool.h>
17 #include <mono/metadata/threadpool-internals.h>
18 #include <mono/utils/mono-semaphore.h>
19 #include <mono/utils/mono-poll.h>
20
21 #define INIT_POLLFD(a, b, c) {(a)->fd = b; (a)->events = c; (a)->revents = 0;}
22 struct _tp_poll_data {
23         int pipe [2];
24         MonoSemType new_sem;
25         mono_pollfd newpfd;
26 };
27
28 typedef struct _tp_poll_data tp_poll_data;
29
30 static void tp_poll_shutdown (gpointer event_data);
31 static void tp_poll_modify (gpointer p, int fd, int operation, int events, gboolean is_new);
32 static void tp_poll_wait (gpointer p);
33
34 gpointer
35 tp_poll_init (SocketIOData *data)
36 {
37         tp_poll_data *result;
38 #ifdef HOST_WIN32
39         struct sockaddr_in client;
40         struct sockaddr_in server;
41         SOCKET srv;
42         int len;
43 #endif
44
45         result = g_new0 (tp_poll_data, 1);
46 #ifndef HOST_WIN32
47         if (pipe (result->pipe) != 0) {
48                 int err = errno;
49                 perror ("mono");
50                 g_assert (err);
51         }
52 #else
53         srv = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
54         g_assert (srv != INVALID_SOCKET);
55         result->pipe [1] = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
56         g_assert (result->pipe [1] != INVALID_SOCKET);
57
58         server.sin_family = AF_INET;
59         server.sin_addr.s_addr = inet_addr ("127.0.0.1");
60         server.sin_port = 0;
61         if (bind (srv, (SOCKADDR *) &server, sizeof (struct sockaddr_in))) {
62                 g_print ("%d\n", WSAGetLastError ());
63                 g_assert (1 != 0);
64         }
65
66         len = sizeof (server);
67         getsockname (srv, (SOCKADDR *) &server, &len);
68         listen (srv, 1);
69         if (connect ((SOCKET) result->pipe [1], (SOCKADDR *) &server, sizeof (server)) == SOCKET_ERROR) {
70                 g_print ("%d\n", WSAGetLastError ());
71                 g_assert (1 != 0);
72         }
73         len = sizeof (client);
74         result->pipe [0] = accept (srv, (SOCKADDR *) &client, &len);
75         g_assert (result->pipe [0] != INVALID_SOCKET);
76         closesocket (srv);
77 #endif
78         MONO_SEM_INIT (&result->new_sem, 1);
79         data->shutdown = tp_poll_shutdown;
80         data->modify = tp_poll_modify;
81         data->wait = tp_poll_wait;
82         return result;
83 }
84
85 static void
86 tp_poll_modify (gpointer p, int fd, int operation, int events, gboolean is_new)
87 {
88         SocketIOData *socket_io_data;
89         tp_poll_data *data;
90         char msg [1];
91         int unused G_GNUC_UNUSED;
92
93         socket_io_data = p;
94         data = socket_io_data->event_data;
95
96         mono_mutex_unlock (&socket_io_data->io_lock);
97         
98         MONO_SEM_WAIT (&data->new_sem);
99         INIT_POLLFD (&data->newpfd, GPOINTER_TO_INT (fd), events);
100         *msg = (char) operation;
101 #ifndef HOST_WIN32
102         unused = write (data->pipe [1], msg, 1);
103 #else
104         unused = send ((SOCKET) data->pipe [1], msg, 1, 0);
105 #endif
106 }
107
108 static void
109 tp_poll_shutdown (gpointer event_data)
110 {
111         tp_poll_data *data = event_data;
112
113 #ifdef HOST_WIN32
114         closesocket (data->pipe [0]);
115         closesocket (data->pipe [1]);
116 #else
117         if (data->pipe [0] > -1)
118                 close (data->pipe [0]);
119         if (data->pipe [1] > -1)
120                 close (data->pipe [1]);
121 #endif
122         data->pipe [0] = -1;
123         data->pipe [1] = -1;
124         MONO_SEM_DESTROY (&data->new_sem);
125 }
126
127 static int
128 mark_bad_fds (mono_pollfd *pfds, int nfds)
129 {
130         int i, ret;
131         mono_pollfd *pfd;
132         int count = 0;
133
134         for (i = 0; i < nfds; i++) {
135                 pfd = &pfds [i];
136                 if (pfd->fd == -1)
137                         continue;
138
139                 ret = mono_poll (pfd, 1, 0);
140                 if (ret == -1 && errno == EBADF) {
141                         pfd->revents |= MONO_POLLNVAL;
142                         count++;
143                 } else if (ret == 1) {
144                         count++;
145                 }
146         }
147
148         return count;
149 }
150
151 static void
152 tp_poll_wait (gpointer p)
153 {
154 #if MONO_SMALL_CONFIG
155 #define INITIAL_POLLFD_SIZE     128
156 #else
157 #define INITIAL_POLLFD_SIZE     1024
158 #endif
159 #define POLL_ERRORS (MONO_POLLERR | MONO_POLLHUP | MONO_POLLNVAL)
160
161 #ifdef DISABLE_SOCKETS
162 #define socket_io_cleanup(x)
163 #endif
164         mono_pollfd *pfds;
165         gint maxfd = 1;
166         gint allocated;
167         gint i;
168         tp_poll_data *data;
169         SocketIOData *socket_io_data = p;
170         MonoPtrArray async_results;
171         gint nresults;
172
173         data = socket_io_data->event_data;
174         allocated = INITIAL_POLLFD_SIZE;
175         pfds = g_new0 (mono_pollfd, allocated);
176         mono_ptr_array_init (async_results, allocated * 2);
177         INIT_POLLFD (pfds, data->pipe [0], MONO_POLLIN);
178         for (i = 1; i < allocated; i++)
179                 INIT_POLLFD (&pfds [i], -1, 0);
180
181         while (1) {
182                 int nsock = 0;
183                 mono_pollfd *pfd;
184                 char one [1];
185                 MonoMList *list;
186                 MonoObject *ares;
187
188                 mono_gc_set_skip_thread (TRUE);
189
190                 do {
191                         if (nsock == -1) {
192                                 check_for_interruption_critical ();
193                         }
194
195                         nsock = mono_poll (pfds, maxfd, -1);
196                 } while (nsock == -1 && errno == EINTR);
197
198                 mono_gc_set_skip_thread (FALSE);
199
200                 /* 
201                  * Apart from EINTR, we only check EBADF, for the rest:
202                  *  EINVAL: mono_poll() 'protects' us from descriptor
203                  *      numbers above the limit if using select() by marking
204                  *      then as MONO_POLLERR.  If a system poll() is being
205                  *      used, the number of descriptor we're passing will not
206                  *      be over sysconf(_SC_OPEN_MAX), as the error would have
207                  *      happened when opening.
208                  *
209                  *  EFAULT: we own the memory pointed by pfds.
210                  *  ENOMEM: we're doomed anyway
211                  *
212                  */
213
214                 if (nsock == -1 && errno == EBADF) {
215                         pfds->revents = 0; /* Just in case... */
216                         nsock = mark_bad_fds (pfds, maxfd);
217                 }
218
219                 if ((pfds->revents & POLL_ERRORS) != 0) {
220                         /* We're supposed to die now, as the pipe has been closed */
221                         g_free (pfds);
222                         mono_ptr_array_destroy (async_results);
223                         socket_io_cleanup (socket_io_data);
224                         return;
225                 }
226
227                 /* Got a new socket */
228                 if ((pfds->revents & MONO_POLLIN) != 0) {
229                         int nread;
230                         gboolean found = FALSE;
231
232                         for (i = 1; i < allocated; i++) {
233                                 pfd = &pfds [i];
234                                 if (pfd->fd == data->newpfd.fd) {
235                                         found = TRUE;
236                                         break;
237                                 }
238                         }
239
240                         if (!found) {
241                                 for (i = 1; i < allocated; i++) {
242                                         pfd = &pfds [i];
243                                         if (pfd->fd == -1)
244                                                 break;
245                                 }
246                         }
247
248                         if (i == allocated) {
249                                 mono_pollfd *oldfd;
250
251                                 oldfd = pfds;
252                                 i = allocated;
253                                 allocated = allocated * 2;
254                                 pfds = g_renew (mono_pollfd, oldfd, allocated);
255                                 g_free (oldfd);
256                                 for (; i < allocated; i++)
257                                         INIT_POLLFD (&pfds [i], -1, 0);
258                                 //async_results = g_renew (gpointer, async_results, allocated * 2);
259                         }
260 #ifndef HOST_WIN32
261                         nread = read (data->pipe [0], one, 1);
262 #else
263                         nread = recv ((SOCKET) data->pipe [0], one, 1, 0);
264 #endif
265                         if (nread <= 0) {
266                                 g_free (pfds);
267                                 mono_ptr_array_destroy (async_results);
268                                 return; /* we're closed */
269                         }
270
271                         INIT_POLLFD (&pfds [i], data->newpfd.fd, data->newpfd.events);
272                         memset (&data->newpfd, 0, sizeof (mono_pollfd));
273                         MONO_SEM_POST (&data->new_sem);
274                         if (i >= maxfd)
275                                 maxfd = i + 1;
276                         nsock--;
277                 }
278
279                 if (nsock == 0)
280                         continue;
281
282                 mono_mutex_lock (&socket_io_data->io_lock);
283                 if (socket_io_data->inited == 3) {
284                         g_free (pfds);
285                         mono_ptr_array_destroy (async_results);
286                         mono_mutex_unlock (&socket_io_data->io_lock);
287                         return; /* cleanup called */
288                 }
289
290                 nresults = 0;
291                 mono_ptr_array_clear (async_results);
292
293                 for (i = 1; i < maxfd && nsock > 0; i++) {
294                         pfd = &pfds [i];
295                         if (pfd->fd == -1 || pfd->revents == 0)
296                                 continue;
297
298                         nsock--;
299                         list = mono_g_hash_table_lookup (socket_io_data->sock_to_state, GINT_TO_POINTER (pfd->fd));
300                         if (list != NULL && (pfd->revents & (MONO_POLLIN | POLL_ERRORS)) != 0) {
301                                 ares = get_io_event (&list, MONO_POLLIN);
302                                 if (ares != NULL) {
303                                         mono_ptr_array_append (async_results, ares);
304                                         ++nresults;
305                                 }
306                         }
307
308                         if (list != NULL && (pfd->revents & (MONO_POLLOUT | POLL_ERRORS)) != 0) {
309                                 ares = get_io_event (&list, MONO_POLLOUT);
310                                 if (ares != NULL) {
311                                         mono_ptr_array_append (async_results, ares);
312                                         ++nresults;
313                                 }
314                         }
315
316                         if (list != NULL) {
317                                 mono_g_hash_table_replace (socket_io_data->sock_to_state, GINT_TO_POINTER (pfd->fd), list);
318                                 pfd->events = get_events_from_list (list);
319                         } else {
320                                 mono_g_hash_table_remove (socket_io_data->sock_to_state, GINT_TO_POINTER (pfd->fd));
321                                 pfd->fd = -1;
322                                 if (i == maxfd - 1)
323                                         maxfd--;
324                         }
325                 }
326                 mono_mutex_unlock (&socket_io_data->io_lock);
327                 threadpool_append_async_io_jobs ((MonoObject **) async_results.data, nresults);
328                 mono_ptr_array_clear (async_results);
329         }
330 }
331