1 #define INIT_POLLFD(a, b, c) {(a)->fd = b; (a)->events = c; (a)->revents = 0;}
8 typedef struct _tp_poll_data tp_poll_data;
10 static void tp_poll_shutdown (gpointer event_data);
11 static void tp_poll_modify (gpointer event_data, int fd, int operation, int events, gboolean is_new);
12 static void tp_poll_wait (gpointer p);
16 connect_hack (gpointer x)
18 struct sockaddr_in *addr = (struct sockaddr_in *) x;
21 while (connect ((SOCKET) socket_io_data.pipe [1], (SOCKADDR *) addr, sizeof (struct sockaddr_in))) {
24 g_warning ("Error initializing async. sockets %d.", WSAGetLastError ());
25 g_assert (WSAGetLastError ());
32 tp_poll_init (SocketIOData *data)
36 struct sockaddr_in server;
37 struct sockaddr_in client;
42 result = g_new0 (tp_poll_data, 1);
44 if (pipe (result->pipe) != 0) {
50 srv = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
51 g_assert (srv != INVALID_SOCKET);
52 result->pipe [1] = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
53 g_assert (result->pipe [1] != INVALID_SOCKET);
55 server.sin_family = AF_INET;
56 server.sin_addr.s_addr = inet_addr ("127.0.0.1");
58 if (bind (srv, (SOCKADDR *) &server, sizeof (server))) {
59 g_print ("%d\n", WSAGetLastError ());
63 len = sizeof (server);
64 getsockname (srv, (SOCKADDR *) &server, &len);
66 mono_thread_create (mono_get_root_domain (), connect_hack, &server);
67 len = sizeof (server);
68 result->pipe [0] = accept (srv, (SOCKADDR *) &client, &len);
69 g_assert (result->pipe [0] != INVALID_SOCKET);
72 MONO_SEM_INIT (&result->new_sem, 1);
73 data->shutdown = tp_poll_shutdown;
74 data->modify = tp_poll_modify;
75 data->wait = tp_poll_wait;
80 tp_poll_modify (gpointer event_data, int fd, int operation, int events, gboolean is_new)
82 tp_poll_data *data = event_data;
86 MONO_SEM_WAIT (&data->new_sem);
87 INIT_POLLFD (&data->newpfd, GPOINTER_TO_INT (fd), events);
88 *msg = (char) operation;
90 w = write (data->pipe [1], msg, 1);
92 send ((SOCKET) data->pipe [1], msg, 1, 0);
97 tp_poll_shutdown (gpointer event_data)
99 tp_poll_data *data = event_data;
102 closesocket (data->pipe [0]);
103 closesocket (data->pipe [1]);
105 if (data->pipe [0] > -1)
106 close (data->pipe [0]);
107 if (data->pipe [1] > -1)
108 close (data->pipe [1]);
112 MONO_SEM_DESTROY (&data->new_sem);
116 mark_bad_fds (mono_pollfd *pfds, int nfds)
122 for (i = 0; i < nfds; i++) {
127 ret = mono_poll (pfd, 1, 0);
128 if (ret == -1 && errno == EBADF) {
129 pfd->revents |= MONO_POLLNVAL;
131 } else if (ret == 1) {
140 tp_poll_wait (gpointer p)
142 #if MONO_SMALL_CONFIG
143 #define INITIAL_POLLFD_SIZE 128
145 #define INITIAL_POLLFD_SIZE 1024
147 #define POLL_ERRORS (MONO_POLLERR | MONO_POLLHUP | MONO_POLLNVAL)
152 MonoInternalThread *thread;
154 SocketIOData *socket_io_data = p;
155 gpointer *async_results;
158 thread = mono_thread_internal_current ();
160 data = socket_io_data->event_data;
161 allocated = INITIAL_POLLFD_SIZE;
162 pfds = g_new0 (mono_pollfd, allocated);
163 async_results = g_new0 (gpointer, allocated * 2);
164 INIT_POLLFD (pfds, data->pipe [0], MONO_POLLIN);
165 for (i = 1; i < allocated; i++)
166 INIT_POLLFD (&pfds [i], -1, 0);
168 printf ("poll_wait\n");
178 if (THREAD_WANTS_A_BREAK (thread))
179 mono_thread_interruption_checkpoint ();
182 nsock = mono_poll (pfds, maxfd, -1);
183 } while (nsock == -1 && errno == EINTR);
186 * Apart from EINTR, we only check EBADF, for the rest:
187 * EINVAL: mono_poll() 'protects' us from descriptor
188 * numbers above the limit if using select() by marking
189 * then as MONO_POLLERR. If a system poll() is being
190 * used, the number of descriptor we're passing will not
191 * be over sysconf(_SC_OPEN_MAX), as the error would have
192 * happened when opening.
194 * EFAULT: we own the memory pointed by pfds.
195 * ENOMEM: we're doomed anyway
199 if (nsock == -1 && errno == EBADF) {
200 pfds->revents = 0; /* Just in case... */
201 nsock = mark_bad_fds (pfds, maxfd);
204 if ((pfds->revents & POLL_ERRORS) != 0) {
205 /* We're supposed to die now, as the pipe has been closed */
207 g_free (async_results);
208 socket_io_cleanup (socket_io_data);
212 /* Got a new socket */
213 if ((pfds->revents & MONO_POLLIN) != 0) {
216 for (i = 1; i < allocated; i++) {
218 if (pfd->fd == -1 || pfd->fd == data->newpfd.fd)
222 if (i == allocated) {
227 allocated = allocated * 2;
228 pfds = g_renew (mono_pollfd, oldfd, allocated);
230 for (; i < allocated; i++)
231 INIT_POLLFD (&pfds [i], -1, 0);
232 async_results = g_renew (gpointer, async_results, allocated * 2);
235 nread = read (data->pipe [0], one, 1);
237 nread = recv ((SOCKET) data->pipe [0], one, 1, 0);
241 g_free (async_results);
242 return; /* we're closed */
245 INIT_POLLFD (&pfds [i], data->newpfd.fd, data->newpfd.events);
246 memset (&data->newpfd, 0, sizeof (mono_pollfd));
247 MONO_SEM_POST (&data->new_sem);
256 EnterCriticalSection (&socket_io_data->io_lock);
257 if (socket_io_data->inited == 3) {
259 g_free (async_results);
260 LeaveCriticalSection (&socket_io_data->io_lock);
261 return; /* cleanup called */
265 for (i = 1; i < maxfd && nsock > 0; i++) {
267 if (pfd->fd == -1 || pfd->revents == 0)
271 list = mono_g_hash_table_lookup (socket_io_data->sock_to_state, GINT_TO_POINTER (pfd->fd));
272 if (list != NULL && (pfd->revents & (MONO_POLLIN | POLL_ERRORS)) != 0) {
273 ares = get_io_event (&list, MONO_POLLIN);
275 async_results [nresults++] = ares;
278 if (list != NULL && (pfd->revents & (MONO_POLLOUT | POLL_ERRORS)) != 0) {
279 ares = get_io_event (&list, MONO_POLLOUT);
281 async_results [nresults++] = ares;
285 mono_g_hash_table_replace (socket_io_data->sock_to_state, GINT_TO_POINTER (pfd->fd), list);
286 pfd->events = get_events_from_list (list);
288 mono_g_hash_table_remove (socket_io_data->sock_to_state, GINT_TO_POINTER (pfd->fd));
294 LeaveCriticalSection (&socket_io_data->io_lock);
295 threadpool_append_jobs (&async_io_tp, (MonoObject **) async_results, nresults);
296 memset (async_results, 0, sizeof (gpointer) * nresults);