3 * Support for threaded ops which may block (read, write, connect, etc.).
5 * Copyright (c) 1996 T. J. Wilkinson & Associates, London, UK.
7 * See the file "license.terms" for information on usage and redistribution
8 * of this file, and for a DISCLAIMER OF ALL WARRANTIES.
10 * Written by Tim Wilkinson <tim@tjwassoc.demon.co.uk>, 1996.
15 #include <sys/types.h>
17 #include <sys/socket.h>
25 #include "threads/thread.h"
27 #if !defined(NATIVE_THREADS)
31 #define TH_ACCEPT TH_READ
32 #define TH_CONNECT TH_WRITE
35 static fd_set readsPending;
36 static fd_set writesPending;
37 static thread* readQ[FD_SETSIZE];
38 static thread* writeQ[FD_SETSIZE];
40 void blockOnFile(int, int);
41 void waitOnEvents(void);
43 extern thread* currentThread;
45 /* These are undefined because we do not yet support async I/O */
52 * Create a threaded file descriptor.
55 threadedFileDescriptor(int fd)
57 #if !defined(BLOCKING_CALLS)
59 #if defined(HAVE_IOCTL) && defined(FIOASYNC)
64 /* Make non-blocking */
65 #if defined(HAVE_FCNTL) && defined(O_NONBLOCK)
66 r = fcntl(fd, F_GETFL, 0);
67 r = fcntl(fd, F_SETFL, r|O_NONBLOCK);
68 #elif defined(HAVE_IOCTL) && defined(FIONBIO)
69 r = ioctl(fd, FIONBIO, &on);
78 /* Allow socket to signal this process when new data is available */
80 #if defined(HAVE_FCNTL) && defined(F_SETOWN)
81 r = fcntl(fd, F_SETOWN, pid);
82 #elif defined(HAVE_IOCTL) && defined(FIOSETOWN)
83 r = ioctl(fd, FIOSETOWN, &pid);
92 #if defined(HAVE_FCNTL) && defined(O_ASYNC)
93 r = fcntl(fd, F_GETFL, 0);
94 r = fcntl(fd, F_SETFL, r|O_ASYNC);
95 #elif defined(HAVE_IOCTL) && defined(FIOASYNC)
96 r = ioctl(fd, FIOASYNC, &on);
108 void clear_thread_flags(void)
110 #if !defined(BLOCKING_CALLS)
111 #if defined(HAVE_FCNTL) && defined(O_NONBLOCK)
115 fl = fcntl(fd, F_GETFL, 0);
116 fl = fcntl(fd, F_SETFL, fl & (~O_NONBLOCK));
119 fl = fcntl(fd, F_GETFL, 0);
120 fl = fcntl(fd, F_SETFL, fl & (~O_NONBLOCK));
123 fl = fcntl(fd, F_GETFL, 0);
124 fl = fcntl(fd, F_SETFL, fl & (~O_NONBLOCK));
126 #elif defined(HAVE_IOCTL) && defined(FIONBIO)
131 (void) ioctl(fd, FIONBIO, &fl);
134 (void) ioctl(fd, FIONBIO, &fl);
137 (void) ioctl(fd, FIONBIO, &fl);
147 * Threaded create socket.
150 threadedSocket(int af, int type, int proto)
154 fd = socket(af, type, proto);
155 return (threadedFileDescriptor(fd));
159 * Threaded file open.
162 threadedOpen(char* path, int flags, int mode)
166 fd = open(path, flags, mode);
167 return (threadedFileDescriptor(fd));
171 * Threaded socket connect.
174 threadedConnect(int fd, struct sockaddr* addr, int len)
178 r = connect(fd, addr, len);
179 #if !defined(BLOCKING_CALLS)
181 && (errno == EINPROGRESS || errno == EALREADY
182 || errno == EWOULDBLOCK)) {
183 blockOnFile(fd, TH_CONNECT);
184 r = 0; /* Assume it's okay when we get released */
192 * Threaded socket accept.
195 threadedAccept(int fd, struct sockaddr* addr, int* len)
201 #if defined(BLOCKING_CALLS)
202 blockOnFile(fd, TH_ACCEPT);
204 r = accept(fd, addr, (int*)len);
206 || !(errno == EINPROGRESS || errno == EALREADY
207 || errno == EWOULDBLOCK))
211 #if !defined(BLOCKING_CALLS)
212 blockOnFile(fd, TH_ACCEPT);
215 return (threadedFileDescriptor(r));
219 * Read but only if we can.
222 threadedRead(int fd, char* buf, int len)
226 DBG( printf("threadedRead\n"); )
228 #if defined(BLOCKING_CALLS)
229 blockOnFile(fd, TH_READ);
233 r = read(fd, buf, len);
235 && (errno == EAGAIN || errno == EWOULDBLOCK
238 blockOnFile(fd, TH_READ);
246 * Write but only if we can.
249 threadedWrite(int fd, char* buf, int len)
257 DBG( printf("threadedWrite %dbytes\n",len); )
259 while (len > 0 && r > 0)
261 #if defined(BLOCKING_CALLS)
262 blockOnFile(fd, TH_WRITE);
264 r = write(fd, ptr, len);
266 && (errno == EAGAIN || errno == EWOULDBLOCK
269 #if !defined(BLOCKING_CALLS)
270 blockOnFile(fd, TH_WRITE);
284 * Receive, but only if we can.
287 threadedRecvfrom (int fd, void *buf, size_t len, int flags, struct sockaddr *addr, int *addrlen)
291 DBG( printf("threadedRecvfrom\n"); )
293 #if defined(BLOCKING_CALLS)
294 blockOnFile(fd, TH_READ);
298 r = recvfrom(fd, buf, len, flags, addr, addrlen);
300 && (errno == EAGAIN || errno == EWOULDBLOCK
303 blockOnFile(fd, TH_READ);
311 * Send, but only if we can.
314 threadedSendto (int fd, void *buf, size_t len, int flags, struct sockaddr *addr, int addrlen)
318 DBG( printf("threadedSendto\n"); )
320 #if defined(BLOCKING_CALLS)
321 blockOnFile(fd, TH_WRITE);
325 r = sendto(fd, buf, len, flags, addr, addrlen);
327 && (errno == EAGAIN || errno == EWOULDBLOCK
330 blockOnFile(fd, TH_WRITE);
338 * An attempt to access a file would block, so suspend the thread until
342 blockOnFile(int fd, int op)
344 DBG( printf("blockOnFile()\n"); )
355 FD_SET(fd, &readsPending);
356 suspendOnQThread(currentThread, &readQ[fd]);
357 FD_CLR(fd, &readsPending);
361 FD_SET(fd, &writesPending);
362 suspendOnQThread(currentThread, &writeQ[fd]);
363 FD_CLR(fd, &writesPending);
370 * Check if some file descriptor or other event to become ready.
371 * Block if required (but make sure we can still take timer interrupts).
374 checkEvents(bool block)
384 struct timeval *timeout;
386 assert(blockInts > 0);
388 DBG( printf("checkEvents block:%d\n", block); )
390 if (sleepThreads != 0)
392 time = currentTime();
393 while (sleepThreads != 0 && time >= CONTEXT(sleepThreads).time)
396 sleepThreads = sleepThreads->vmThread->next;
397 tid->vmThread->next = 0;
405 if (sleepThreads != 0)
407 s8 wait_time = CONTEXT(sleepThreads).time - time;
409 tv.tv_sec = wait_time / 1000;
410 tv.tv_usec = (wait_time % 1000) * 1000;
424 FD_COPY(&readsPending, &rd);
425 FD_COPY(&writesPending, &wr);
427 memcpy(&rd, &readsPending, sizeof(rd));
428 memcpy(&wr, &writesPending, sizeof(wr));
431 r = select(maxFd+1, &rd, &wr, 0, timeout);
433 /* We must be holding off interrupts before we start playing with
434 * the read and write queues. This should be already done but a
435 * quick check never hurt anyone.
437 assert(blockInts > 0);
439 DBG( printf("Select returns %d\n", r); )
441 /* Some threads may have finished sleeping.
443 if (block && sleepThreads != 0)
445 time = currentTime();
446 while (sleepThreads != 0 && time >= CONTEXT(sleepThreads).time)
449 sleepThreads = sleepThreads->vmThread->next;
450 tid->vmThread->next = 0;
456 for (i = 0; r > 0 && i <= maxFd; i++)
458 if (readQ[i] != 0 && FD_ISSET(i, &rd))
460 for (tid = readQ[i]; tid != 0; tid = ntid)
462 ntid = tid->vmThread->next;
468 if (writeQ[i] != 0 && FD_ISSET(i, &wr))
470 for (tid = writeQ[i]; tid != 0; tid = ntid)
472 ntid = tid->vmThread->next;