Fix infinite loop in DiscoveryMessageSequence equality comparison.
[mono.git] / mono / metadata / tpool-poll.c
1 #define INIT_POLLFD(a, b, c) {(a)->fd = b; (a)->events = c; (a)->revents = 0;}
2 struct _tp_poll_data {
3         int pipe [2];
4         MonoSemType new_sem;
5         mono_pollfd newpfd;
6 };
7
8 typedef struct _tp_poll_data tp_poll_data;
9
10 static void tp_poll_shutdown (gpointer event_data);
11 static void tp_poll_modify (gpointer event_data, int fd, int operation, int events, gboolean is_new);
12 static void tp_poll_wait (gpointer p);
13
14 #ifdef HOST_WIN32
15 static void
16 connect_hack (gpointer x)
17 {
18         struct sockaddr_in *addr = (struct sockaddr_in *) x;
19         int count = 0;
20
21         while (connect ((SOCKET) socket_io_data.pipe [1], (SOCKADDR *) addr, sizeof (struct sockaddr_in))) {
22                 Sleep (500);
23                 if (++count > 3) {
24                         g_warning ("Error initializing async. sockets %d.", WSAGetLastError ());
25                         g_assert (WSAGetLastError ());
26                 }
27         }
28 }
29 #endif
30
31 static gpointer
32 tp_poll_init (SocketIOData *data)
33 {
34         tp_poll_data *result;
35 #ifdef HOST_WIN32
36         struct sockaddr_in server;
37         struct sockaddr_in client;
38         SOCKET srv;
39         int len;
40 #endif
41
42         result = g_new0 (tp_poll_data, 1);
43 #ifndef HOST_WIN32
44         if (pipe (result->pipe) != 0) {
45                 int err = errno;
46                 perror ("mono");
47                 g_assert (err);
48         }
49 #else
50         srv = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
51         g_assert (srv != INVALID_SOCKET);
52         result->pipe [1] = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
53         g_assert (result->pipe [1] != INVALID_SOCKET);
54
55         server.sin_family = AF_INET;
56         server.sin_addr.s_addr = inet_addr ("127.0.0.1");
57         server.sin_port = 0;
58         if (bind (srv, (SOCKADDR *) &server, sizeof (server))) {
59                 g_print ("%d\n", WSAGetLastError ());
60                 g_assert (1 != 0);
61         }
62
63         len = sizeof (server);
64         getsockname (srv, (SOCKADDR *) &server, &len);
65         listen (srv, 1);
66         mono_thread_create (mono_get_root_domain (), connect_hack, &server);
67         len = sizeof (server);
68         result->pipe [0] = accept (srv, (SOCKADDR *) &client, &len);
69         g_assert (result->pipe [0] != INVALID_SOCKET);
70         closesocket (srv);
71 #endif
72         MONO_SEM_INIT (&result->new_sem, 1);
73         data->shutdown = tp_poll_shutdown;
74         data->modify = tp_poll_modify;
75         data->wait = tp_poll_wait;
76         return result;
77 }
78
79 static void
80 tp_poll_modify (gpointer event_data, int fd, int operation, int events, gboolean is_new)
81 {
82         tp_poll_data *data = event_data;
83         char msg [1];
84         int w;
85
86         MONO_SEM_WAIT (&data->new_sem);
87         INIT_POLLFD (&data->newpfd, GPOINTER_TO_INT (fd), events);
88         *msg = (char) operation;
89 #ifndef HOST_WIN32
90         w = write (data->pipe [1], msg, 1);
91 #else
92         send ((SOCKET) data->pipe [1], msg, 1, 0);
93 #endif
94 }
95
96 static void
97 tp_poll_shutdown (gpointer event_data)
98 {
99         tp_poll_data *data = event_data;
100
101 #ifdef HOST_WIN32
102         closesocket (data->pipe [0]);
103         closesocket (data->pipe [1]);
104 #else
105         if (data->pipe [0] > -1)
106                 close (data->pipe [0]);
107         if (data->pipe [1] > -1)
108                 close (data->pipe [1]);
109 #endif
110         data->pipe [0] = -1;
111         data->pipe [1] = -1;
112         MONO_SEM_DESTROY (&data->new_sem);
113 }
114
115 static int
116 mark_bad_fds (mono_pollfd *pfds, int nfds)
117 {
118         int i, ret;
119         mono_pollfd *pfd;
120         int count = 0;
121
122         for (i = 0; i < nfds; i++) {
123                 pfd = &pfds [i];
124                 if (pfd->fd == -1)
125                         continue;
126
127                 ret = mono_poll (pfd, 1, 0);
128                 if (ret == -1 && errno == EBADF) {
129                         pfd->revents |= MONO_POLLNVAL;
130                         count++;
131                 } else if (ret == 1) {
132                         count++;
133                 }
134         }
135
136         return count;
137 }
138
139 static void
140 tp_poll_wait (gpointer p)
141 {
142 #if MONO_SMALL_CONFIG
143 #define INITIAL_POLLFD_SIZE     128
144 #else
145 #define INITIAL_POLLFD_SIZE     1024
146 #endif
147 #define POLL_ERRORS (MONO_POLLERR | MONO_POLLHUP | MONO_POLLNVAL)
148         mono_pollfd *pfds;
149         gint maxfd = 1;
150         gint allocated;
151         gint i;
152         MonoInternalThread *thread;
153         tp_poll_data *data;
154         SocketIOData *socket_io_data = p;
155         gpointer *async_results;
156         gint nresults;
157
158         thread = mono_thread_internal_current ();
159
160         data = socket_io_data->event_data;
161         allocated = INITIAL_POLLFD_SIZE;
162         pfds = g_new0 (mono_pollfd, allocated);
163         async_results = g_new0 (gpointer, allocated * 2);
164         INIT_POLLFD (pfds, data->pipe [0], MONO_POLLIN);
165         for (i = 1; i < allocated; i++)
166                 INIT_POLLFD (&pfds [i], -1, 0);
167
168         printf ("poll_wait\n");
169         while (1) {
170                 int nsock = 0;
171                 mono_pollfd *pfd;
172                 char one [1];
173                 MonoMList *list;
174                 MonoObject *ares;
175
176                 do {
177                         if (nsock == -1) {
178                                 if (THREAD_WANTS_A_BREAK (thread))
179                                         mono_thread_interruption_checkpoint ();
180                         }
181
182                         nsock = mono_poll (pfds, maxfd, -1);
183                 } while (nsock == -1 && errno == EINTR);
184
185                 /* 
186                  * Apart from EINTR, we only check EBADF, for the rest:
187                  *  EINVAL: mono_poll() 'protects' us from descriptor
188                  *      numbers above the limit if using select() by marking
189                  *      then as MONO_POLLERR.  If a system poll() is being
190                  *      used, the number of descriptor we're passing will not
191                  *      be over sysconf(_SC_OPEN_MAX), as the error would have
192                  *      happened when opening.
193                  *
194                  *  EFAULT: we own the memory pointed by pfds.
195                  *  ENOMEM: we're doomed anyway
196                  *
197                  */
198
199                 if (nsock == -1 && errno == EBADF) {
200                         pfds->revents = 0; /* Just in case... */
201                         nsock = mark_bad_fds (pfds, maxfd);
202                 }
203
204                 if ((pfds->revents & POLL_ERRORS) != 0) {
205                         /* We're supposed to die now, as the pipe has been closed */
206                         g_free (pfds);
207                         g_free (async_results);
208                         socket_io_cleanup (socket_io_data);
209                         return;
210                 }
211
212                 /* Got a new socket */
213                 if ((pfds->revents & MONO_POLLIN) != 0) {
214                         int nread;
215
216                         for (i = 1; i < allocated; i++) {
217                                 pfd = &pfds [i];
218                                 if (pfd->fd == -1 || pfd->fd == data->newpfd.fd)
219                                         break;
220                         }
221
222                         if (i == allocated) {
223                                 mono_pollfd *oldfd;
224
225                                 oldfd = pfds;
226                                 i = allocated;
227                                 allocated = allocated * 2;
228                                 pfds = g_renew (mono_pollfd, oldfd, allocated);
229                                 g_free (oldfd);
230                                 for (; i < allocated; i++)
231                                         INIT_POLLFD (&pfds [i], -1, 0);
232                                 async_results = g_renew (gpointer, async_results, allocated * 2);
233                         }
234 #ifndef HOST_WIN32
235                         nread = read (data->pipe [0], one, 1);
236 #else
237                         nread = recv ((SOCKET) data->pipe [0], one, 1, 0);
238 #endif
239                         if (nread <= 0) {
240                                 g_free (pfds);
241                                 g_free (async_results);
242                                 return; /* we're closed */
243                         }
244
245                         INIT_POLLFD (&pfds [i], data->newpfd.fd, data->newpfd.events);
246                         memset (&data->newpfd, 0, sizeof (mono_pollfd));
247                         MONO_SEM_POST (&data->new_sem);
248                         if (i >= maxfd)
249                                 maxfd = i + 1;
250                         nsock--;
251                 }
252
253                 if (nsock == 0)
254                         continue;
255
256                 EnterCriticalSection (&socket_io_data->io_lock);
257                 if (socket_io_data->inited == 3) {
258                         g_free (pfds);
259                         g_free (async_results);
260                         LeaveCriticalSection (&socket_io_data->io_lock);
261                         return; /* cleanup called */
262                 }
263
264                 nresults = 0;
265                 for (i = 1; i < maxfd && nsock > 0; i++) {
266                         pfd = &pfds [i];
267                         if (pfd->fd == -1 || pfd->revents == 0)
268                                 continue;
269
270                         nsock--;
271                         list = mono_g_hash_table_lookup (socket_io_data->sock_to_state, GINT_TO_POINTER (pfd->fd));
272                         if (list != NULL && (pfd->revents & (MONO_POLLIN | POLL_ERRORS)) != 0) {
273                                 ares = get_io_event (&list, MONO_POLLIN);
274                                 if (ares != NULL)
275                                         async_results [nresults++] = ares;
276                         }
277
278                         if (list != NULL && (pfd->revents & (MONO_POLLOUT | POLL_ERRORS)) != 0) {
279                                 ares = get_io_event (&list, MONO_POLLOUT);
280                                 if (ares != NULL)
281                                         async_results [nresults++] = ares;
282                         }
283
284                         if (list != NULL) {
285                                 mono_g_hash_table_replace (socket_io_data->sock_to_state, GINT_TO_POINTER (pfd->fd), list);
286                                 pfd->events = get_events_from_list (list);
287                         } else {
288                                 mono_g_hash_table_remove (socket_io_data->sock_to_state, GINT_TO_POINTER (pfd->fd));
289                                 pfd->fd = -1;
290                                 if (i == maxfd - 1)
291                                         maxfd--;
292                         }
293                 }
294                 LeaveCriticalSection (&socket_io_data->io_lock);
295                 threadpool_append_jobs (&async_io_tp, (MonoObject **) async_results, nresults);
296                 memset (async_results, 0, sizeof (gpointer) * nresults);
297         }
298 }
299