3 * Support for threaded ops which may block (read, write, connect, etc.).
5 * Copyright (c) 1996 T. J. Wilkinson & Associates, London, UK.
7 * See the file "license.terms" for information on usage and redistribution
8 * of this file, and for a DISCLAIMER OF ALL WARRANTIES.
10 * Written by Tim Wilkinson <tim@tjwassoc.demon.co.uk>, 1996.
15 #include <sys/types.h>
17 #include <sys/socket.h>
29 #define TH_ACCEPT TH_READ
30 #define TH_CONNECT TH_WRITE
33 static fd_set readsPending;
34 static fd_set writesPending;
35 static thread* readQ[FD_SETSIZE];
36 static thread* writeQ[FD_SETSIZE];
38 void blockOnFile(int, int);
39 void waitOnEvents(void);
41 extern thread* currentThread;
43 /* These are undefined because we do not yet support async I/O */
50 * Create a threaded file descriptor.
53 threadedFileDescriptor(int fd)
55 #if !defined(BLOCKING_CALLS)
57 #if defined(HAVE_IOCTL) && defined(FIOASYNC)
62 /* Make non-blocking */
63 #if defined(HAVE_FCNTL) && defined(O_NONBLOCK)
64 r = fcntl(fd, F_GETFL, 0);
65 r = fcntl(fd, F_SETFL, r|O_NONBLOCK);
66 #elif defined(HAVE_IOCTL) && defined(FIONBIO)
67 r = ioctl(fd, FIONBIO, &on);
76 /* Allow socket to signal this process when new data is available */
78 #if defined(HAVE_FCNTL) && defined(F_SETOWN)
79 r = fcntl(fd, F_SETOWN, pid);
80 #elif defined(HAVE_IOCTL) && defined(FIOSETOWN)
81 r = ioctl(fd, FIOSETOWN, &pid);
90 #if defined(HAVE_FCNTL) && defined(O_ASYNC)
91 r = fcntl(fd, F_GETFL, 0);
92 r = fcntl(fd, F_SETFL, r|O_ASYNC);
93 #elif defined(HAVE_IOCTL) && defined(FIOASYNC)
94 r = ioctl(fd, FIOASYNC, &on);
106 void clear_thread_flags(void)
108 #if !defined(BLOCKING_CALLS)
109 #if defined(HAVE_FCNTL) && defined(O_NONBLOCK)
113 fl = fcntl(fd, F_GETFL, 0);
114 fl = fcntl(fd, F_SETFL, fl & (~O_NONBLOCK));
117 fl = fcntl(fd, F_GETFL, 0);
118 fl = fcntl(fd, F_SETFL, fl & (~O_NONBLOCK));
121 fl = fcntl(fd, F_GETFL, 0);
122 fl = fcntl(fd, F_SETFL, fl & (~O_NONBLOCK));
124 #elif defined(HAVE_IOCTL) && defined(FIONBIO)
129 (void) ioctl(fd, FIONBIO, &fl);
132 (void) ioctl(fd, FIONBIO, &fl);
135 (void) ioctl(fd, FIONBIO, &fl);
145 * Threaded create socket.
148 threadedSocket(int af, int type, int proto)
152 fd = socket(af, type, proto);
153 return (threadedFileDescriptor(fd));
157 * Threaded file open.
160 threadedOpen(char* path, int flags, int mode)
164 fd = open(path, flags, mode);
165 return (threadedFileDescriptor(fd));
169 * Threaded socket connect.
172 threadedConnect(int fd, struct sockaddr* addr, int len)
176 r = connect(fd, addr, len);
177 #if !defined(BLOCKING_CALLS)
179 && (errno == EINPROGRESS || errno == EALREADY
180 || errno == EWOULDBLOCK)) {
181 blockOnFile(fd, TH_CONNECT);
182 r = 0; /* Assume it's okay when we get released */
190 * Threaded socket accept.
193 threadedAccept(int fd, struct sockaddr* addr, int* len)
199 #if defined(BLOCKING_CALLS)
200 blockOnFile(fd, TH_ACCEPT);
202 r = accept(fd, addr, (int*)len);
204 || !(errno == EINPROGRESS || errno == EALREADY
205 || errno == EWOULDBLOCK))
209 #if !defined(BLOCKING_CALLS)
210 blockOnFile(fd, TH_ACCEPT);
213 return (threadedFileDescriptor(r));
217 * Read but only if we can.
220 threadedRead(int fd, char* buf, int len)
224 DBG( printf("threadedRead\n"); )
226 #if defined(BLOCKING_CALLS)
227 blockOnFile(fd, TH_READ);
231 r = read(fd, buf, len);
233 && (errno == EAGAIN || errno == EWOULDBLOCK
236 blockOnFile(fd, TH_READ);
244 * Write but only if we can.
247 threadedWrite(int fd, char* buf, int len)
255 DBG( printf("threadedWrite %dbytes\n",len); )
257 while (len > 0 && r > 0)
259 #if defined(BLOCKING_CALLS)
260 blockOnFile(fd, TH_WRITE);
262 r = write(fd, ptr, len);
264 && (errno == EAGAIN || errno == EWOULDBLOCK
267 #if !defined(BLOCKING_CALLS)
268 blockOnFile(fd, TH_WRITE);
282 * Receive, but only if we can.
285 threadedRecvfrom (int fd, void *buf, size_t len, int flags, struct sockaddr *addr, int *addrlen)
289 DBG( printf("threadedRecvfrom\n"); )
291 #if defined(BLOCKING_CALLS)
292 blockOnFile(fd, TH_READ);
296 r = recvfrom(fd, buf, len, flags, addr, addrlen);
298 && (errno == EAGAIN || errno == EWOULDBLOCK
301 blockOnFile(fd, TH_READ);
309 * Send, but only if we can.
312 threadedSendto (int fd, void *buf, size_t len, int flags, struct sockaddr *addr, int addrlen)
316 DBG( printf("threadedSendto\n"); )
318 #if defined(BLOCKING_CALLS)
319 blockOnFile(fd, TH_WRITE);
323 r = sendto(fd, buf, len, flags, addr, addrlen);
325 && (errno == EAGAIN || errno == EWOULDBLOCK
328 blockOnFile(fd, TH_WRITE);
336 * An attempt to access a file would block, so suspend the thread until
340 blockOnFile(int fd, int op)
342 DBG( printf("blockOnFile()\n"); )
353 FD_SET(fd, &readsPending);
354 suspendOnQThread(currentThread, &readQ[fd]);
355 FD_CLR(fd, &readsPending);
359 FD_SET(fd, &writesPending);
360 suspendOnQThread(currentThread, &writeQ[fd]);
361 FD_CLR(fd, &writesPending);
368 * Check if some file descriptor or other event to become ready.
369 * Block if required (but make sure we can still take timer interrupts).
372 checkEvents(bool block)
382 struct timeval *timeout;
384 assert(blockInts > 0);
386 DBG( printf("checkEvents block:%d\n", block); )
388 if (sleepThreads != 0)
390 time = currentTime();
391 while (sleepThreads != 0 && time >= CONTEXT(sleepThreads).time)
394 sleepThreads = sleepThreads->next;
403 if (sleepThreads != 0)
405 s8 wait_time = CONTEXT(sleepThreads).time - time;
407 tv.tv_sec = wait_time / 1000;
408 tv.tv_usec = (wait_time % 1000) * 1000;
422 FD_COPY(&readsPending, &rd);
423 FD_COPY(&writesPending, &wr);
425 memcpy(&rd, &readsPending, sizeof(rd));
426 memcpy(&wr, &writesPending, sizeof(wr));
429 r = select(maxFd+1, &rd, &wr, 0, timeout);
431 /* We must be holding off interrupts before we start playing with
432 * the read and write queues. This should be already done but a
433 * quick check never hurt anyone.
435 assert(blockInts > 0);
437 DBG( printf("Select returns %d\n", r); )
439 /* Some threads may have finished sleeping.
441 if (block && sleepThreads != 0)
443 time = currentTime();
444 while (sleepThreads != 0 && time >= CONTEXT(sleepThreads).time)
447 sleepThreads = sleepThreads->next;
454 for (i = 0; r > 0 && i <= maxFd; i++)
456 if (readQ[i] != 0 && FD_ISSET(i, &rd))
458 for (tid = readQ[i]; tid != 0; tid = ntid)
466 if (writeQ[i] != 0 && FD_ISSET(i, &wr))
468 for (tid = writeQ[i]; tid != 0; tid = ntid)