Merge pull request #799 from kebby/master
[mono.git] / mono / metadata / tpool-poll.c
1 /*
2  * tpool-poll.c: poll related stuff
3  *
4  * Authors:
5  *   Dietmar Maurer (dietmar@ximian.com)
6  *   Gonzalo Paniagua Javier (gonzalo@ximian.com)
7  *
8  * Copyright 2001-2003 Ximian, Inc (http://www.ximian.com)
9  * Copyright 2004-2011 Novell, Inc (http://www.novell.com)
10  */
11
12 #define INIT_POLLFD(a, b, c) {(a)->fd = b; (a)->events = c; (a)->revents = 0;}
13 struct _tp_poll_data {
14         int pipe [2];
15         MonoSemType new_sem;
16         mono_pollfd newpfd;
17 };
18
19 typedef struct _tp_poll_data tp_poll_data;
20
21 static void tp_poll_shutdown (gpointer event_data);
22 static void tp_poll_modify (gpointer p, int fd, int operation, int events, gboolean is_new);
23 static void tp_poll_wait (gpointer p);
24
25 static gpointer
26 tp_poll_init (SocketIOData *data)
27 {
28         tp_poll_data *result;
29 #ifdef HOST_WIN32
30         struct sockaddr_in client;
31         struct sockaddr_in server;
32         SOCKET srv;
33         int len;
34 #endif
35
36         result = g_new0 (tp_poll_data, 1);
37 #ifndef HOST_WIN32
38         if (pipe (result->pipe) != 0) {
39                 int err = errno;
40                 perror ("mono");
41                 g_assert (err);
42         }
43 #else
44         srv = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
45         g_assert (srv != INVALID_SOCKET);
46         result->pipe [1] = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
47         g_assert (result->pipe [1] != INVALID_SOCKET);
48
49         server.sin_family = AF_INET;
50         server.sin_addr.s_addr = inet_addr ("127.0.0.1");
51         server.sin_port = 0;
52         if (bind (srv, (SOCKADDR *) &server, sizeof (struct sockaddr_in))) {
53                 g_print ("%d\n", WSAGetLastError ());
54                 g_assert (1 != 0);
55         }
56
57         len = sizeof (server);
58         getsockname (srv, (SOCKADDR *) &server, &len);
59         listen (srv, 1);
60         if (connect ((SOCKET) result->pipe [1], (SOCKADDR *) &server, sizeof (server)) == SOCKET_ERROR) {
61                 g_print ("%d\n", WSAGetLastError ());
62                 g_assert (1 != 0);
63         }
64         len = sizeof (client);
65         result->pipe [0] = accept (srv, (SOCKADDR *) &client, &len);
66         g_assert (result->pipe [0] != INVALID_SOCKET);
67         closesocket (srv);
68 #endif
69         MONO_SEM_INIT (&result->new_sem, 1);
70         data->shutdown = tp_poll_shutdown;
71         data->modify = tp_poll_modify;
72         data->wait = tp_poll_wait;
73         return result;
74 }
75
76 static void
77 tp_poll_modify (gpointer p, int fd, int operation, int events, gboolean is_new)
78 {
79         SocketIOData *socket_io_data;
80         tp_poll_data *data;
81         char msg [1];
82         int unused;
83
84         socket_io_data = p;
85         data = socket_io_data->event_data;
86
87         LeaveCriticalSection (&socket_io_data->io_lock);
88         
89         MONO_SEM_WAIT (&data->new_sem);
90         INIT_POLLFD (&data->newpfd, GPOINTER_TO_INT (fd), events);
91         *msg = (char) operation;
92 #ifndef HOST_WIN32
93         unused = write (data->pipe [1], msg, 1);
94 #else
95         unused = send ((SOCKET) data->pipe [1], msg, 1, 0);
96 #endif
97 }
98
99 static void
100 tp_poll_shutdown (gpointer event_data)
101 {
102         tp_poll_data *data = event_data;
103
104 #ifdef HOST_WIN32
105         closesocket (data->pipe [0]);
106         closesocket (data->pipe [1]);
107 #else
108         if (data->pipe [0] > -1)
109                 close (data->pipe [0]);
110         if (data->pipe [1] > -1)
111                 close (data->pipe [1]);
112 #endif
113         data->pipe [0] = -1;
114         data->pipe [1] = -1;
115         MONO_SEM_DESTROY (&data->new_sem);
116 }
117
118 static int
119 mark_bad_fds (mono_pollfd *pfds, int nfds)
120 {
121         int i, ret;
122         mono_pollfd *pfd;
123         int count = 0;
124
125         for (i = 0; i < nfds; i++) {
126                 pfd = &pfds [i];
127                 if (pfd->fd == -1)
128                         continue;
129
130                 ret = mono_poll (pfd, 1, 0);
131                 if (ret == -1 && errno == EBADF) {
132                         pfd->revents |= MONO_POLLNVAL;
133                         count++;
134                 } else if (ret == 1) {
135                         count++;
136                 }
137         }
138
139         return count;
140 }
141
142 static void
143 tp_poll_wait (gpointer p)
144 {
145 #if MONO_SMALL_CONFIG
146 #define INITIAL_POLLFD_SIZE     128
147 #else
148 #define INITIAL_POLLFD_SIZE     1024
149 #endif
150 #define POLL_ERRORS (MONO_POLLERR | MONO_POLLHUP | MONO_POLLNVAL)
151
152 #ifdef DISABLE_SOCKETS
153 #define socket_io_cleanup(x)
154 #endif
155         mono_pollfd *pfds;
156         gint maxfd = 1;
157         gint allocated;
158         gint i;
159         tp_poll_data *data;
160         SocketIOData *socket_io_data = p;
161         MonoPtrArray async_results;
162         gint nresults;
163
164         data = socket_io_data->event_data;
165         allocated = INITIAL_POLLFD_SIZE;
166         pfds = g_new0 (mono_pollfd, allocated);
167         mono_ptr_array_init (async_results, allocated * 2);
168         INIT_POLLFD (pfds, data->pipe [0], MONO_POLLIN);
169         for (i = 1; i < allocated; i++)
170                 INIT_POLLFD (&pfds [i], -1, 0);
171
172         while (1) {
173                 int nsock = 0;
174                 mono_pollfd *pfd;
175                 char one [1];
176                 MonoMList *list;
177                 MonoObject *ares;
178
179                 mono_gc_set_skip_thread (TRUE);
180
181                 do {
182                         if (nsock == -1) {
183                                 check_for_interruption_critical ();
184                         }
185
186                         nsock = mono_poll (pfds, maxfd, -1);
187                 } while (nsock == -1 && errno == EINTR);
188
189                 mono_gc_set_skip_thread (FALSE);
190
191                 /* 
192                  * Apart from EINTR, we only check EBADF, for the rest:
193                  *  EINVAL: mono_poll() 'protects' us from descriptor
194                  *      numbers above the limit if using select() by marking
195                  *      then as MONO_POLLERR.  If a system poll() is being
196                  *      used, the number of descriptor we're passing will not
197                  *      be over sysconf(_SC_OPEN_MAX), as the error would have
198                  *      happened when opening.
199                  *
200                  *  EFAULT: we own the memory pointed by pfds.
201                  *  ENOMEM: we're doomed anyway
202                  *
203                  */
204
205                 if (nsock == -1 && errno == EBADF) {
206                         pfds->revents = 0; /* Just in case... */
207                         nsock = mark_bad_fds (pfds, maxfd);
208                 }
209
210                 if ((pfds->revents & POLL_ERRORS) != 0) {
211                         /* We're supposed to die now, as the pipe has been closed */
212                         g_free (pfds);
213                         mono_ptr_array_destroy (async_results);
214                         socket_io_cleanup (socket_io_data);
215                         return;
216                 }
217
218                 /* Got a new socket */
219                 if ((pfds->revents & MONO_POLLIN) != 0) {
220                         int nread;
221                         gboolean found = FALSE;
222
223                         for (i = 1; i < allocated; i++) {
224                                 pfd = &pfds [i];
225                                 if (pfd->fd == data->newpfd.fd) {
226                                         found = TRUE;
227                                         break;
228                                 }
229                         }
230
231                         if (!found) {
232                                 for (i = 1; i < allocated; i++) {
233                                         pfd = &pfds [i];
234                                         if (pfd->fd == -1)
235                                                 break;
236                                 }
237                         }
238
239                         if (i == allocated) {
240                                 mono_pollfd *oldfd;
241
242                                 oldfd = pfds;
243                                 i = allocated;
244                                 allocated = allocated * 2;
245                                 pfds = g_renew (mono_pollfd, oldfd, allocated);
246                                 g_free (oldfd);
247                                 for (; i < allocated; i++)
248                                         INIT_POLLFD (&pfds [i], -1, 0);
249                                 //async_results = g_renew (gpointer, async_results, allocated * 2);
250                         }
251 #ifndef HOST_WIN32
252                         nread = read (data->pipe [0], one, 1);
253 #else
254                         nread = recv ((SOCKET) data->pipe [0], one, 1, 0);
255 #endif
256                         if (nread <= 0) {
257                                 g_free (pfds);
258                                 mono_ptr_array_destroy (async_results);
259                                 return; /* we're closed */
260                         }
261
262                         INIT_POLLFD (&pfds [i], data->newpfd.fd, data->newpfd.events);
263                         memset (&data->newpfd, 0, sizeof (mono_pollfd));
264                         MONO_SEM_POST (&data->new_sem);
265                         if (i >= maxfd)
266                                 maxfd = i + 1;
267                         nsock--;
268                 }
269
270                 if (nsock == 0)
271                         continue;
272
273                 EnterCriticalSection (&socket_io_data->io_lock);
274                 if (socket_io_data->inited == 3) {
275                         g_free (pfds);
276                         mono_ptr_array_destroy (async_results);
277                         LeaveCriticalSection (&socket_io_data->io_lock);
278                         return; /* cleanup called */
279                 }
280
281                 nresults = 0;
282                 mono_ptr_array_clear (async_results);
283
284                 for (i = 1; i < maxfd && nsock > 0; i++) {
285                         pfd = &pfds [i];
286                         if (pfd->fd == -1 || pfd->revents == 0)
287                                 continue;
288
289                         nsock--;
290                         list = mono_g_hash_table_lookup (socket_io_data->sock_to_state, GINT_TO_POINTER (pfd->fd));
291                         if (list != NULL && (pfd->revents & (MONO_POLLIN | POLL_ERRORS)) != 0) {
292                                 ares = get_io_event (&list, MONO_POLLIN);
293                                 if (ares != NULL) {
294                                         mono_ptr_array_append (async_results, ares);
295                                         ++nresults;
296                                 }
297                         }
298
299                         if (list != NULL && (pfd->revents & (MONO_POLLOUT | POLL_ERRORS)) != 0) {
300                                 ares = get_io_event (&list, MONO_POLLOUT);
301                                 if (ares != NULL) {
302                                         mono_ptr_array_append (async_results, ares);
303                                         ++nresults;
304                                 }
305                         }
306
307                         if (list != NULL) {
308                                 mono_g_hash_table_replace (socket_io_data->sock_to_state, GINT_TO_POINTER (pfd->fd), list);
309                                 pfd->events = get_events_from_list (list);
310                         } else {
311                                 mono_g_hash_table_remove (socket_io_data->sock_to_state, GINT_TO_POINTER (pfd->fd));
312                                 pfd->fd = -1;
313                                 if (i == maxfd - 1)
314                                         maxfd--;
315                         }
316                 }
317                 LeaveCriticalSection (&socket_io_data->io_lock);
318                 threadpool_append_jobs (&async_io_tp, (MonoObject **) async_results.data, nresults);
319                 mono_ptr_array_clear (async_results);
320         }
321 }
322