* Written by Tim Wilkinson <tim@tjwassoc.demon.co.uk>, 1996.
*/
-#define DBG(s)
-
-#include "sysdep/defines.h"
+#include <stdio.h>
#include <sys/types.h>
#include <sys/time.h>
#include <sys/socket.h>
#include <assert.h>
#include <unistd.h>
+#include "config.h"
#include "thread.h"
-/*
- * We only need this stuff is we are using the internal thread system.
- */
-#if defined(USE_INTERNAL_THREADS)
#define TH_READ 0
#define TH_WRITE 1
static fd_set writesPending;
static thread* readQ[FD_SETSIZE];
static thread* writeQ[FD_SETSIZE];
-static struct timeval tm = { 0, 0 };
void blockOnFile(int, int);
void waitOnEvents(void);
{
#if !defined(BLOCKING_CALLS)
int r;
+#if defined(HAVE_IOCTL) && defined(FIOASYNC)
int on = 1;
+#endif
int pid;
/* Make non-blocking */
void clear_thread_flags(void)
{
#if !defined(BLOCKING_CALLS)
- int fl, fd;
-
#if defined(HAVE_FCNTL) && defined(O_NONBLOCK)
- fd = fileno(stdin);
+ int fl, fd;
+
+ fd = fileno(stdin);
fl = fcntl(fd, F_GETFL, 0);
fl = fcntl(fd, F_SETFL, fl & (~O_NONBLOCK));
- fd = fileno(stdout);
+ fd = fileno(stdout);
fl = fcntl(fd, F_GETFL, 0);
fl = fcntl(fd, F_SETFL, fl & (~O_NONBLOCK));
- fd = fileno(stderr);
+ fd = fileno(stderr);
fl = fcntl(fd, F_GETFL, 0);
fl = fcntl(fd, F_SETFL, fl & (~O_NONBLOCK));
+
#elif defined(HAVE_IOCTL) && defined(FIONBIO)
- fl = 0;
- fd = fileno(stdin);
+ int fl, fd;
+
+ fl = 0;
+ fd = fileno(stdin);
(void) ioctl(fd, FIONBIO, &fl);
- fd = fileno(stdout);
+ fd = fileno(stdout);
(void) ioctl(fd, FIONBIO, &fl);
- fd = fileno(stderr);
+ fd = fileno(stderr);
(void) ioctl(fd, FIONBIO, &fl);
#endif
-
#endif
+
+ fflush(stdout);
+ fflush(stderr);
}
threadedSocket(int af, int type, int proto)
{
int fd;
- int r;
- int on = 1;
- int pid;
fd = socket(af, type, proto);
return (threadedFileDescriptor(fd));
threadedOpen(char* path, int flags, int mode)
{
int fd;
- int r;
- int on = 1;
- int pid;
fd = open(path, flags, mode);
return (threadedFileDescriptor(fd));
threadedAccept(int fd, struct sockaddr* addr, int* len)
{
int r;
- int on = 1;
for (;;)
{
#if defined(BLOCKING_CALLS)
blockOnFile(fd, TH_ACCEPT);
#endif
- r = accept(fd, addr, len);
+ r = accept(fd, addr, (int*)len);
if (r >= 0
|| !(errno == EINPROGRESS || errno == EALREADY
|| errno == EWOULDBLOCK))
return (ptr - buf);
}
+/*
+ * Receive, but only if we can.
+ */
+int
+threadedRecvfrom (int fd, void *buf, size_t len, int flags, struct sockaddr *addr, int *addrlen)
+{
+ int r;
+
+ DBG( printf("threadedRecvfrom\n"); )
+
+#if defined(BLOCKING_CALLS)
+ blockOnFile(fd, TH_READ);
+#endif
+ for (;;)
+ {
+ r = recvfrom(fd, buf, len, flags, addr, addrlen);
+ if (r < 0
+ && (errno == EAGAIN || errno == EWOULDBLOCK
+ || errno == EINTR))
+ {
+ blockOnFile(fd, TH_READ);
+ continue;
+ }
+ return (r);
+ }
+}
+
+/*
+ * Send, but only if we can.
+ */
+int
+threadedSendto (int fd, void *buf, size_t len, int flags, struct sockaddr *addr, int addrlen)
+{
+ int r;
+
+ DBG( printf("threadedSendto\n"); )
+
+#if defined(BLOCKING_CALLS)
+ blockOnFile(fd, TH_WRITE);
+#endif
+ for (;;)
+ {
+ r = sendto(fd, buf, len, flags, addr, addrlen);
+ if (r < 0
+ && (errno == EAGAIN || errno == EWOULDBLOCK
+ || errno == EINTR))
+ {
+ blockOnFile(fd, TH_WRITE);
+ continue;
+ }
+ return (r);
+ }
+}
+
/*
* An attempt to access a file would block, so suspend the thread until
* it will happen.
{
maxFd = fd;
}
+
if (op == TH_READ)
{
FD_SET(fd, &readsPending);
thread* tid;
thread* ntid;
int i;
- int b;
+ s8 time = -1;
+ struct timeval tv;
+ struct timeval *timeout;
+
+ assert(blockInts > 0);
+
+ DBG( printf("checkEvents block:%d\n", block); )
-DBG( printf("checkEvents block:%d\n", block); )
+ if (sleepThreads != 0)
+ {
+ time = currentTime();
+ while (sleepThreads != 0 && time >= CONTEXT(sleepThreads).time)
+ {
+ tid = sleepThreads;
+ sleepThreads = sleepThreads->next;
+ tid->next = 0;
+
+ iresumeThread(tid);
+ }
+ }
+
+ if (block)
+ {
+ if (sleepThreads != 0)
+ {
+ s8 wait_time = CONTEXT(sleepThreads).time - time;
+
+ tv.tv_sec = wait_time / 1000;
+ tv.tv_usec = (wait_time % 1000) * 1000;
+ timeout = &tv;
+ }
+ else
+ timeout = 0;
+ }
+ else
+ {
+ tv.tv_sec = 0;
+ tv.tv_usec = 0;
+ timeout = &tv;
+ }
#if defined(FD_COPY)
FD_COPY(&readsPending, &rd);
memcpy(&wr, &writesPending, sizeof(wr));
#endif
- /*
- * If select() is called with indefinite wait, we have to make sure
- * we can get interrupted by timer events.
- */
- if (block == true)
- {
- b = blockInts;
- blockInts = 0;
- r = select(maxFd+1, &rd, &wr, 0, 0);
- blockInts = b;
- }
- else
- {
- r = select(maxFd+1, &rd, &wr, 0, &tm);
- }
+ r = select(maxFd+1, &rd, &wr, 0, timeout);
/* We must be holding off interrupts before we start playing with
* the read and write queues. This should be already done but a
*/
assert(blockInts > 0);
-DBG( printf("Select returns %d\n", r); )
+ DBG( printf("Select returns %d\n", r); )
+
+ /* Some threads may have finished sleeping.
+ */
+ if (block && sleepThreads != 0)
+ {
+ time = currentTime();
+ while (sleepThreads != 0 && time >= CONTEXT(sleepThreads).time)
+ {
+ tid = sleepThreads;
+ sleepThreads = sleepThreads->next;
+ tid->next = 0;
+
+ iresumeThread(tid);
+ }
+ }
for (i = 0; r > 0 && i <= maxFd; i++)
{
}
}
}
-#endif