3 * Support for threaded ops which may block (read, write, connect, etc.).
5 * Copyright (c) 1996 T. J. Wilkinson & Associates, London, UK.
7 * See the file "license.terms" for information on usage and redistribution
8 * of this file, and for a DISCLAIMER OF ALL WARRANTIES.
10 * Written by Tim Wilkinson <tim@tjwassoc.demon.co.uk>, 1996.
15 #include <sys/types.h>
17 #include <sys/socket.h>
27 #define TH_ACCEPT TH_READ
28 #define TH_CONNECT TH_WRITE
31 static fd_set readsPending;
32 static fd_set writesPending;
33 static thread* readQ[FD_SETSIZE];
34 static thread* writeQ[FD_SETSIZE];
36 void blockOnFile(int, int);
37 void waitOnEvents(void);
39 extern thread* currentThread;
41 /* These are undefined because we do not yet support async I/O */
48 * Create a threaded file descriptor.
51 threadedFileDescriptor(int fd)
53 #if !defined(BLOCKING_CALLS)
55 #if defined(HAVE_IOCTL) && defined(FIOASYNC)
60 /* Make non-blocking */
61 #if defined(HAVE_FCNTL) && defined(O_NONBLOCK)
62 r = fcntl(fd, F_GETFL, 0);
63 r = fcntl(fd, F_SETFL, r|O_NONBLOCK);
64 #elif defined(HAVE_IOCTL) && defined(FIONBIO)
65 r = ioctl(fd, FIONBIO, &on);
74 /* Allow socket to signal this process when new data is available */
76 #if defined(HAVE_FCNTL) && defined(F_SETOWN)
77 r = fcntl(fd, F_SETOWN, pid);
78 #elif defined(HAVE_IOCTL) && defined(FIOSETOWN)
79 r = ioctl(fd, FIOSETOWN, &pid);
88 #if defined(HAVE_FCNTL) && defined(O_ASYNC)
89 r = fcntl(fd, F_GETFL, 0);
90 r = fcntl(fd, F_SETFL, r|O_ASYNC);
91 #elif defined(HAVE_IOCTL) && defined(FIOASYNC)
92 r = ioctl(fd, FIOASYNC, &on);
104 void clear_thread_flags(void)
106 #if !defined(BLOCKING_CALLS)
107 #if defined(HAVE_FCNTL) && defined(O_NONBLOCK)
111 fl = fcntl(fd, F_GETFL, 0);
112 fl = fcntl(fd, F_SETFL, fl & (~O_NONBLOCK));
115 fl = fcntl(fd, F_GETFL, 0);
116 fl = fcntl(fd, F_SETFL, fl & (~O_NONBLOCK));
119 fl = fcntl(fd, F_GETFL, 0);
120 fl = fcntl(fd, F_SETFL, fl & (~O_NONBLOCK));
122 #elif defined(HAVE_IOCTL) && defined(FIONBIO)
127 (void) ioctl(fd, FIONBIO, &fl);
130 (void) ioctl(fd, FIONBIO, &fl);
133 (void) ioctl(fd, FIONBIO, &fl);
143 * Threaded create socket.
146 threadedSocket(int af, int type, int proto)
150 fd = socket(af, type, proto);
151 return (threadedFileDescriptor(fd));
155 * Threaded file open.
158 threadedOpen(char* path, int flags, int mode)
162 fd = open(path, flags, mode);
163 return (threadedFileDescriptor(fd));
167 * Threaded socket connect.
170 threadedConnect(int fd, struct sockaddr* addr, int len)
174 r = connect(fd, addr, len);
175 #if !defined(BLOCKING_CALLS)
177 && (errno == EINPROGRESS || errno == EALREADY
178 || errno == EWOULDBLOCK)) {
179 blockOnFile(fd, TH_CONNECT);
180 r = 0; /* Assume it's okay when we get released */
188 * Threaded socket accept.
191 threadedAccept(int fd, struct sockaddr* addr, int* len)
197 #if defined(BLOCKING_CALLS)
198 blockOnFile(fd, TH_ACCEPT);
200 r = accept(fd, addr, (int*)len);
202 || !(errno == EINPROGRESS || errno == EALREADY
203 || errno == EWOULDBLOCK))
207 #if !defined(BLOCKING_CALLS)
208 blockOnFile(fd, TH_ACCEPT);
211 return (threadedFileDescriptor(r));
215 * Read but only if we can.
218 threadedRead(int fd, char* buf, int len)
222 DBG( printf("threadedRead\n"); )
224 #if defined(BLOCKING_CALLS)
225 blockOnFile(fd, TH_READ);
229 r = read(fd, buf, len);
231 && (errno == EAGAIN || errno == EWOULDBLOCK
234 blockOnFile(fd, TH_READ);
242 * Write but only if we can.
245 threadedWrite(int fd, char* buf, int len)
253 DBG( printf("threadedWrite %dbytes\n",len); )
255 while (len > 0 && r > 0)
257 #if defined(BLOCKING_CALLS)
258 blockOnFile(fd, TH_WRITE);
260 r = write(fd, ptr, len);
262 && (errno == EAGAIN || errno == EWOULDBLOCK
265 #if !defined(BLOCKING_CALLS)
266 blockOnFile(fd, TH_WRITE);
280 * Receive, but only if we can.
283 threadedRecvfrom (int fd, void *buf, size_t len, int flags, struct sockaddr *addr, int *addrlen)
287 DBG( printf("threadedRecvfrom\n"); )
289 #if defined(BLOCKING_CALLS)
290 blockOnFile(fd, TH_READ);
294 r = recvfrom(fd, buf, len, flags, addr, addrlen);
296 && (errno == EAGAIN || errno == EWOULDBLOCK
299 blockOnFile(fd, TH_READ);
307 * Send, but only if we can.
310 threadedSendto (int fd, void *buf, size_t len, int flags, struct sockaddr *addr, int addrlen)
314 DBG( printf("threadedSendto\n"); )
316 #if defined(BLOCKING_CALLS)
317 blockOnFile(fd, TH_WRITE);
321 r = sendto(fd, buf, len, flags, addr, addrlen);
323 && (errno == EAGAIN || errno == EWOULDBLOCK
326 blockOnFile(fd, TH_WRITE);
334 * An attempt to access a file would block, so suspend the thread until
338 blockOnFile(int fd, int op)
340 DBG( printf("blockOnFile()\n"); )
351 FD_SET(fd, &readsPending);
352 suspendOnQThread(currentThread, &readQ[fd]);
353 FD_CLR(fd, &readsPending);
357 FD_SET(fd, &writesPending);
358 suspendOnQThread(currentThread, &writeQ[fd]);
359 FD_CLR(fd, &writesPending);
366 * Check if some file descriptor or other event to become ready.
367 * Block if required (but make sure we can still take timer interrupts).
370 checkEvents(bool block)
380 struct timeval *timeout;
382 assert(blockInts > 0);
384 DBG( printf("checkEvents block:%d\n", block); )
386 if (sleepThreads != 0)
388 time = currentTime();
389 while (sleepThreads != 0 && time >= CONTEXT(sleepThreads).time)
392 sleepThreads = sleepThreads->next;
401 if (sleepThreads != 0)
403 s8 wait_time = CONTEXT(sleepThreads).time - time;
405 tv.tv_sec = wait_time / 1000;
406 tv.tv_usec = (wait_time % 1000) * 1000;
420 FD_COPY(&readsPending, &rd);
421 FD_COPY(&writesPending, &wr);
423 memcpy(&rd, &readsPending, sizeof(rd));
424 memcpy(&wr, &writesPending, sizeof(wr));
427 r = select(maxFd+1, &rd, &wr, 0, timeout);
429 /* We must be holding off interrupts before we start playing with
430 * the read and write queues. This should be already done but a
431 * quick check never hurt anyone.
433 assert(blockInts > 0);
435 DBG( printf("Select returns %d\n", r); )
437 /* Some threads may have finished sleeping.
439 if (block && sleepThreads != 0)
441 time = currentTime();
442 while (sleepThreads != 0 && time >= CONTEXT(sleepThreads).time)
445 sleepThreads = sleepThreads->next;
452 for (i = 0; r > 0 && i <= maxFd; i++)
454 if (readQ[i] != 0 && FD_ISSET(i, &rd))
456 for (tid = readQ[i]; tid != 0; tid = ntid)
464 if (writeQ[i] != 0 && FD_ISSET(i, &wr))
466 for (tid = writeQ[i]; tid != 0; tid = ntid)