51887d3bc6b3706c1d0745bce5e15cc150a73ae8
[mono.git] / mono / metadata / tpool-poll.c
1 #define INIT_POLLFD(a, b, c) {(a)->fd = b; (a)->events = c; (a)->revents = 0;}
2 struct _tp_poll_data {
3         int pipe [2];
4         MonoSemType new_sem;
5         mono_pollfd newpfd;
6 };
7
8 typedef struct _tp_poll_data tp_poll_data;
9
10 static void tp_poll_shutdown (gpointer event_data);
11 static void tp_poll_modify (gpointer event_data, int fd, int operation, int events, gboolean is_new);
12 static void tp_poll_wait (gpointer p);
13
14 #ifdef HOST_WIN32
15 static void
16 connect_hack (gpointer x)
17 {
18         struct sockaddr_in *addr = (struct sockaddr_in *) x;
19         tp_poll_data *data = socket_io_data.event_data;
20         int count = 0;
21
22         while (connect ((SOCKET) data->pipe [1], (SOCKADDR *) addr, sizeof (struct sockaddr_in))) {
23                 Sleep (500);
24                 if (++count > 3) {
25                         g_warning ("Error initializing async. sockets %d.", WSAGetLastError ());
26                         g_assert (WSAGetLastError ());
27                 }
28         }
29 }
30 #endif
31
32 static gpointer
33 tp_poll_init (SocketIOData *data)
34 {
35         tp_poll_data *result;
36 #ifdef HOST_WIN32
37         struct sockaddr_in server;
38         struct sockaddr_in client;
39         SOCKET srv;
40         int len;
41 #endif
42
43         result = g_new0 (tp_poll_data, 1);
44 #ifndef HOST_WIN32
45         if (pipe (result->pipe) != 0) {
46                 int err = errno;
47                 perror ("mono");
48                 g_assert (err);
49         }
50 #else
51         srv = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
52         g_assert (srv != INVALID_SOCKET);
53         result->pipe [1] = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
54         g_assert (result->pipe [1] != INVALID_SOCKET);
55
56         server.sin_family = AF_INET;
57         server.sin_addr.s_addr = inet_addr ("127.0.0.1");
58         server.sin_port = 0;
59         if (bind (srv, (SOCKADDR *) &server, sizeof (server))) {
60                 g_print ("%d\n", WSAGetLastError ());
61                 g_assert (1 != 0);
62         }
63
64         len = sizeof (server);
65         getsockname (srv, (SOCKADDR *) &server, &len);
66         listen (srv, 1);
67         mono_thread_create (mono_get_root_domain (), connect_hack, &server);
68         len = sizeof (server);
69         result->pipe [0] = accept (srv, (SOCKADDR *) &client, &len);
70         g_assert (result->pipe [0] != INVALID_SOCKET);
71         closesocket (srv);
72 #endif
73         MONO_SEM_INIT (&result->new_sem, 1);
74         data->shutdown = tp_poll_shutdown;
75         data->modify = tp_poll_modify;
76         data->wait = tp_poll_wait;
77         return result;
78 }
79
80 static void
81 tp_poll_modify (gpointer event_data, int fd, int operation, int events, gboolean is_new)
82 {
83         tp_poll_data *data = event_data;
84         char msg [1];
85         int w;
86
87         MONO_SEM_WAIT (&data->new_sem);
88         INIT_POLLFD (&data->newpfd, GPOINTER_TO_INT (fd), events);
89         *msg = (char) operation;
90 #ifndef HOST_WIN32
91         w = write (data->pipe [1], msg, 1);
92 #else
93         send ((SOCKET) data->pipe [1], msg, 1, 0);
94 #endif
95 }
96
97 static void
98 tp_poll_shutdown (gpointer event_data)
99 {
100         tp_poll_data *data = event_data;
101
102 #ifdef HOST_WIN32
103         closesocket (data->pipe [0]);
104         closesocket (data->pipe [1]);
105 #else
106         if (data->pipe [0] > -1)
107                 close (data->pipe [0]);
108         if (data->pipe [1] > -1)
109                 close (data->pipe [1]);
110 #endif
111         data->pipe [0] = -1;
112         data->pipe [1] = -1;
113         MONO_SEM_DESTROY (&data->new_sem);
114 }
115
116 static int
117 mark_bad_fds (mono_pollfd *pfds, int nfds)
118 {
119         int i, ret;
120         mono_pollfd *pfd;
121         int count = 0;
122
123         for (i = 0; i < nfds; i++) {
124                 pfd = &pfds [i];
125                 if (pfd->fd == -1)
126                         continue;
127
128                 ret = mono_poll (pfd, 1, 0);
129                 if (ret == -1 && errno == EBADF) {
130                         pfd->revents |= MONO_POLLNVAL;
131                         count++;
132                 } else if (ret == 1) {
133                         count++;
134                 }
135         }
136
137         return count;
138 }
139
140 static void
141 tp_poll_wait (gpointer p)
142 {
143 #if MONO_SMALL_CONFIG
144 #define INITIAL_POLLFD_SIZE     128
145 #else
146 #define INITIAL_POLLFD_SIZE     1024
147 #endif
148 #define POLL_ERRORS (MONO_POLLERR | MONO_POLLHUP | MONO_POLLNVAL)
149         mono_pollfd *pfds;
150         gint maxfd = 1;
151         gint allocated;
152         gint i;
153         MonoInternalThread *thread;
154         tp_poll_data *data;
155         SocketIOData *socket_io_data = p;
156         gpointer *async_results;
157         gint nresults;
158
159         thread = mono_thread_internal_current ();
160
161         data = socket_io_data->event_data;
162         allocated = INITIAL_POLLFD_SIZE;
163         pfds = g_new0 (mono_pollfd, allocated);
164         async_results = g_new0 (gpointer, allocated * 2);
165         INIT_POLLFD (pfds, data->pipe [0], MONO_POLLIN);
166         for (i = 1; i < allocated; i++)
167                 INIT_POLLFD (&pfds [i], -1, 0);
168
169         printf ("poll_wait\n");
170         while (1) {
171                 int nsock = 0;
172                 mono_pollfd *pfd;
173                 char one [1];
174                 MonoMList *list;
175                 MonoObject *ares;
176
177                 do {
178                         if (nsock == -1) {
179                                 if (THREAD_WANTS_A_BREAK (thread))
180                                         mono_thread_interruption_checkpoint ();
181                         }
182
183                         nsock = mono_poll (pfds, maxfd, -1);
184                 } while (nsock == -1 && errno == EINTR);
185
186                 /* 
187                  * Apart from EINTR, we only check EBADF, for the rest:
188                  *  EINVAL: mono_poll() 'protects' us from descriptor
189                  *      numbers above the limit if using select() by marking
190                  *      then as MONO_POLLERR.  If a system poll() is being
191                  *      used, the number of descriptor we're passing will not
192                  *      be over sysconf(_SC_OPEN_MAX), as the error would have
193                  *      happened when opening.
194                  *
195                  *  EFAULT: we own the memory pointed by pfds.
196                  *  ENOMEM: we're doomed anyway
197                  *
198                  */
199
200                 if (nsock == -1 && errno == EBADF) {
201                         pfds->revents = 0; /* Just in case... */
202                         nsock = mark_bad_fds (pfds, maxfd);
203                 }
204
205                 if ((pfds->revents & POLL_ERRORS) != 0) {
206                         /* We're supposed to die now, as the pipe has been closed */
207                         g_free (pfds);
208                         g_free (async_results);
209                         socket_io_cleanup (socket_io_data);
210                         return;
211                 }
212
213                 /* Got a new socket */
214                 if ((pfds->revents & MONO_POLLIN) != 0) {
215                         int nread;
216
217                         for (i = 1; i < allocated; i++) {
218                                 pfd = &pfds [i];
219                                 if (pfd->fd == -1 || pfd->fd == data->newpfd.fd)
220                                         break;
221                         }
222
223                         if (i == allocated) {
224                                 mono_pollfd *oldfd;
225
226                                 oldfd = pfds;
227                                 i = allocated;
228                                 allocated = allocated * 2;
229                                 pfds = g_renew (mono_pollfd, oldfd, allocated);
230                                 g_free (oldfd);
231                                 for (; i < allocated; i++)
232                                         INIT_POLLFD (&pfds [i], -1, 0);
233                                 async_results = g_renew (gpointer, async_results, allocated * 2);
234                         }
235 #ifndef HOST_WIN32
236                         nread = read (data->pipe [0], one, 1);
237 #else
238                         nread = recv ((SOCKET) data->pipe [0], one, 1, 0);
239 #endif
240                         if (nread <= 0) {
241                                 g_free (pfds);
242                                 g_free (async_results);
243                                 return; /* we're closed */
244                         }
245
246                         INIT_POLLFD (&pfds [i], data->newpfd.fd, data->newpfd.events);
247                         memset (&data->newpfd, 0, sizeof (mono_pollfd));
248                         MONO_SEM_POST (&data->new_sem);
249                         if (i >= maxfd)
250                                 maxfd = i + 1;
251                         nsock--;
252                 }
253
254                 if (nsock == 0)
255                         continue;
256
257                 EnterCriticalSection (&socket_io_data->io_lock);
258                 if (socket_io_data->inited == 3) {
259                         g_free (pfds);
260                         g_free (async_results);
261                         LeaveCriticalSection (&socket_io_data->io_lock);
262                         return; /* cleanup called */
263                 }
264
265                 nresults = 0;
266                 for (i = 1; i < maxfd && nsock > 0; i++) {
267                         pfd = &pfds [i];
268                         if (pfd->fd == -1 || pfd->revents == 0)
269                                 continue;
270
271                         nsock--;
272                         list = mono_g_hash_table_lookup (socket_io_data->sock_to_state, GINT_TO_POINTER (pfd->fd));
273                         if (list != NULL && (pfd->revents & (MONO_POLLIN | POLL_ERRORS)) != 0) {
274                                 ares = get_io_event (&list, MONO_POLLIN);
275                                 if (ares != NULL)
276                                         async_results [nresults++] = ares;
277                         }
278
279                         if (list != NULL && (pfd->revents & (MONO_POLLOUT | POLL_ERRORS)) != 0) {
280                                 ares = get_io_event (&list, MONO_POLLOUT);
281                                 if (ares != NULL)
282                                         async_results [nresults++] = ares;
283                         }
284
285                         if (list != NULL) {
286                                 mono_g_hash_table_replace (socket_io_data->sock_to_state, GINT_TO_POINTER (pfd->fd), list);
287                                 pfd->events = get_events_from_list (list);
288                         } else {
289                                 mono_g_hash_table_remove (socket_io_data->sock_to_state, GINT_TO_POINTER (pfd->fd));
290                                 pfd->fd = -1;
291                                 if (i == maxfd - 1)
292                                         maxfd--;
293                         }
294                 }
295                 LeaveCriticalSection (&socket_io_data->io_lock);
296                 threadpool_append_jobs (&async_io_tp, (MonoObject **) async_results, nresults);
297                 memset (async_results, 0, sizeof (gpointer) * nresults);
298         }
299 }
300