93ce98b6914d74f63fc068433c518a77db779744
[cacao.git] / threads / threadio.c
1 /*
2  * threadCalls.c
3  * Support for threaded ops which may block (read, write, connect, etc.).
4  *
5  * Copyright (c) 1996 T. J. Wilkinson & Associates, London, UK.
6  *
7  * See the file "license.terms" for information on usage and redistribution
8  * of this file, and for a DISCLAIMER OF ALL WARRANTIES.
9  *
10  * Written by Tim Wilkinson <tim@tjwassoc.demon.co.uk>, 1996.
11  */
12
13 #include "config.h"
14
15 #include <sys/types.h>
16 #include <sys/time.h>
17 #include <sys/socket.h>
18 #include <fcntl.h>
19 #include <errno.h>
20 #include <assert.h>
21 #include <unistd.h>
22
23 #include "thread.h"
24
25 #define TH_READ         0
26 #define TH_WRITE        1
27 #define TH_ACCEPT       TH_READ
28 #define TH_CONNECT      TH_WRITE
29
30 static int maxFd;
31 static fd_set readsPending;
32 static fd_set writesPending;
33 static thread* readQ[FD_SETSIZE];
34 static thread* writeQ[FD_SETSIZE];
35
36 void blockOnFile(int, int);
37 void waitOnEvents(void);
38
39 extern thread* currentThread;
40
41 /* These are undefined because we do not yet support async I/O */
42 #undef  F_SETOWN
43 #undef  FIOSETOWN
44 #undef  O_ASYNC
45 #undef  FIOASYNC
46
47 /*
48  * Create a threaded file descriptor.
49  */
50 int
51 threadedFileDescriptor(int fd)
52 {
53 #if !defined(BLOCKING_CALLS)
54     int r;
55 #if defined(HAVE_IOCTL) && defined(FIOASYNC)
56     int on = 1;
57 #endif
58     int pid;
59
60     /* Make non-blocking */
61 #if defined(HAVE_FCNTL) && defined(O_NONBLOCK)
62     r = fcntl(fd, F_GETFL, 0);
63     r = fcntl(fd, F_SETFL, r|O_NONBLOCK);
64 #elif defined(HAVE_IOCTL) && defined(FIONBIO)
65     r = ioctl(fd, FIONBIO, &on);
66 #else
67     r = 0;
68 #endif
69     if (r < 0)
70     {
71         return (r);
72     }
73
74     /* Allow socket to signal this process when new data is available */
75     pid = getpid();
76 #if defined(HAVE_FCNTL) && defined(F_SETOWN)
77     r = fcntl(fd, F_SETOWN, pid);
78 #elif defined(HAVE_IOCTL) && defined(FIOSETOWN)
79     r = ioctl(fd, FIOSETOWN, &pid);
80 #else
81     r = 0;
82 #endif
83     if (r < 0)
84     {
85         return (r);
86     }
87
88 #if defined(HAVE_FCNTL) && defined(O_ASYNC)
89     r = fcntl(fd, F_GETFL, 0);
90     r = fcntl(fd, F_SETFL, r|O_ASYNC);
91 #elif defined(HAVE_IOCTL) && defined(FIOASYNC)
92     r = ioctl(fd, FIOASYNC, &on);
93 #else
94     r = 0;
95 #endif
96     if (r < 0)
97     {
98         return (r);
99     }
100 #endif
101     return (fd);
102 }
103
104 void clear_thread_flags(void)
105 {
106 #if !defined(BLOCKING_CALLS)
107 #if defined(HAVE_FCNTL) && defined(O_NONBLOCK)
108     int fl, fd;
109
110     fd = fileno(stdin);
111     fl = fcntl(fd, F_GETFL, 0);
112     fl = fcntl(fd, F_SETFL, fl & (~O_NONBLOCK));
113
114     fd = fileno(stdout);
115     fl = fcntl(fd, F_GETFL, 0);
116     fl = fcntl(fd, F_SETFL, fl & (~O_NONBLOCK));
117
118     fd = fileno(stderr);
119     fl = fcntl(fd, F_GETFL, 0);
120     fl = fcntl(fd, F_SETFL, fl & (~O_NONBLOCK));
121
122 #elif defined(HAVE_IOCTL) && defined(FIONBIO)
123     int fl, fd;
124
125     fl = 0;
126     fd = fileno(stdin);
127     (void) ioctl(fd, FIONBIO, &fl);
128
129     fd = fileno(stdout);
130     (void) ioctl(fd, FIONBIO, &fl);
131
132     fd = fileno(stderr);
133     (void) ioctl(fd, FIONBIO, &fl);
134 #endif
135 #endif
136
137     fflush (stdout);
138     fflush (stderr);
139 }
140
141
142 /*
143  * Threaded create socket.
144  */
145 int
146 threadedSocket(int af, int type, int proto)
147 {
148     int fd;
149
150     fd = socket(af, type, proto);
151     return (threadedFileDescriptor(fd));
152 }
153
154 /*
155  * Threaded file open.
156  */
157 int
158 threadedOpen(char* path, int flags, int mode)
159 {
160     int fd;
161
162     fd = open(path, flags, mode);
163     return (threadedFileDescriptor(fd));
164 }
165
166 /*
167  * Threaded socket connect.
168  */
169 int
170 threadedConnect(int fd, struct sockaddr* addr, int len)
171 {
172     int r;
173
174     r = connect(fd, addr, len);
175 #if !defined(BLOCKING_CALLS)
176     if ((r < 0)
177         && (errno == EINPROGRESS || errno == EALREADY
178             || errno == EWOULDBLOCK)) {
179         blockOnFile(fd, TH_CONNECT);
180         r = 0; /* Assume it's okay when we get released */
181     }
182 #endif
183
184     return (r);
185 }
186
187 /*
188  * Threaded socket accept.
189  */
190 int
191 threadedAccept(int fd, struct sockaddr* addr, int* len)
192 {
193     int r;
194
195     for (;;)
196     {
197 #if defined(BLOCKING_CALLS)
198         blockOnFile(fd, TH_ACCEPT);
199 #endif
200         r = accept(fd, addr, (int*)len);
201         if (r >= 0
202             || !(errno == EINPROGRESS || errno == EALREADY
203                  || errno == EWOULDBLOCK))
204         {
205             break;
206         }
207 #if !defined(BLOCKING_CALLS)
208         blockOnFile(fd, TH_ACCEPT);
209 #endif
210     }
211     return (threadedFileDescriptor(r));
212 }
213
214 /*
215  * Read but only if we can.
216  */
217 int
218 threadedRead(int fd, char* buf, int len)
219 {
220     int r;
221
222     DBG(   printf("threadedRead\n");          )
223
224 #if defined(BLOCKING_CALLS)
225     blockOnFile(fd, TH_READ);
226 #endif
227     for (;;)
228     {
229         r = read(fd, buf, len);
230         if (r < 0
231             && (errno == EAGAIN || errno == EWOULDBLOCK
232                 || errno == EINTR))
233         {
234             blockOnFile(fd, TH_READ);
235             continue;
236         }
237         return (r);
238     }
239 }
240
241 /*
242  * Write but only if we can.
243  */
244 int
245 threadedWrite(int fd, char* buf, int len)
246 {
247     int r;
248     char* ptr;
249
250     ptr = buf;
251     r = 1;
252
253     DBG(    printf("threadedWrite %dbytes\n",len);      )
254
255     while (len > 0 && r > 0)
256     {
257 #if defined(BLOCKING_CALLS)
258         blockOnFile(fd, TH_WRITE);
259 #endif
260         r = write(fd, ptr, len);
261         if (r < 0
262             && (errno == EAGAIN || errno == EWOULDBLOCK
263                 || errno == EINTR))
264         {
265 #if !defined(BLOCKING_CALLS)
266             blockOnFile(fd, TH_WRITE);
267 #endif
268             r = 1;
269         }
270         else
271         {
272             ptr += r;
273             len -= r;
274         }
275     }
276     return (ptr - buf);
277 }
278
279 /*
280  * Receive, but only if we can.
281  */
282 int
283 threadedRecvfrom (int fd, void *buf, size_t len, int flags, struct sockaddr *addr, int *addrlen)
284 {
285     int r;
286
287     DBG(   printf("threadedRecvfrom\n");          )
288
289 #if defined(BLOCKING_CALLS)
290     blockOnFile(fd, TH_READ);
291 #endif
292     for (;;)
293     {
294         r = recvfrom(fd, buf, len, flags, addr, addrlen);
295         if (r < 0
296             && (errno == EAGAIN || errno == EWOULDBLOCK
297                 || errno == EINTR))
298         {
299             blockOnFile(fd, TH_READ);
300             continue;
301         }
302         return (r);
303     }
304 }
305
306 /*
307  * Send, but only if we can.
308  */
309 int
310 threadedSendto (int fd, void *buf, size_t len, int flags, struct sockaddr *addr, int addrlen)
311 {
312     int r;
313
314     DBG(   printf("threadedSendto\n");          )
315
316 #if defined(BLOCKING_CALLS)
317     blockOnFile(fd, TH_WRITE);
318 #endif
319     for (;;)
320     {
321         r = sendto(fd, buf, len, flags, addr, addrlen);
322         if (r < 0
323             && (errno == EAGAIN || errno == EWOULDBLOCK
324                 || errno == EINTR))
325         {
326             blockOnFile(fd, TH_WRITE);
327             continue;
328         }
329         return (r);
330     }
331 }
332
333 /*
334  * An attempt to access a file would block, so suspend the thread until
335  * it will happen.
336  */
337 void
338 blockOnFile(int fd, int op)
339 {
340 DBG(    printf("blockOnFile()\n");                                      )
341
342     intsDisable();
343
344     if (fd > maxFd)
345     {
346         maxFd = fd;
347     }
348
349     if (op == TH_READ)
350     {
351         FD_SET(fd, &readsPending);
352         suspendOnQThread(currentThread, &readQ[fd]);
353         FD_CLR(fd, &readsPending);
354     }
355     else
356     {
357         FD_SET(fd, &writesPending);
358         suspendOnQThread(currentThread, &writeQ[fd]);
359         FD_CLR(fd, &writesPending);
360     }
361
362     intsRestore();
363 }
364
365 /*
366  * Check if some file descriptor or other event to become ready.
367  * Block if required (but make sure we can still take timer interrupts).
368  */
369 void
370 checkEvents(bool block)
371 {
372     int r;
373     fd_set rd;
374     fd_set wr;
375     thread* tid;
376     thread* ntid;
377     int i;
378     s8 time = -1;
379     struct timeval tv;
380     struct timeval *timeout;
381
382     assert(blockInts > 0);
383
384     DBG( printf("checkEvents block:%d\n", block); )
385
386     if (sleepThreads != 0)
387     {
388         time = currentTime();
389         while (sleepThreads != 0 && time >= CONTEXT(sleepThreads).time)
390         {
391             tid = sleepThreads;
392             sleepThreads = sleepThreads->next;
393             tid->next = 0;
394
395             iresumeThread(tid);
396         }
397     }
398
399     if (block)
400     {
401         if (sleepThreads != 0)
402         {
403             s8 wait_time = CONTEXT(sleepThreads).time - time;
404
405             tv.tv_sec = wait_time / 1000;
406             tv.tv_usec = (wait_time % 1000) * 1000;
407             timeout = &tv;
408         }
409         else
410             timeout = 0;
411     }
412     else
413     {
414         tv.tv_sec = 0;
415         tv.tv_usec = 0;
416         timeout = &tv;
417     }
418
419 #if defined(FD_COPY)
420     FD_COPY(&readsPending, &rd);
421     FD_COPY(&writesPending, &wr);
422 #else
423     memcpy(&rd, &readsPending, sizeof(rd));
424     memcpy(&wr, &writesPending, sizeof(wr));
425 #endif
426
427     r = select(maxFd+1, &rd, &wr, 0, timeout);
428
429     /* We must be holding off interrupts before we start playing with
430      * the read and write queues.  This should be already done but a
431      * quick check never hurt anyone.
432      */
433     assert(blockInts > 0);
434
435     DBG( printf("Select returns %d\n", r); )
436
437     /* Some threads may have finished sleeping.
438      */
439     if (block && sleepThreads != 0)
440     {
441         time = currentTime();
442         while (sleepThreads != 0 && time >= CONTEXT(sleepThreads).time)
443         {
444             tid = sleepThreads;
445             sleepThreads = sleepThreads->next;
446             tid->next = 0;
447
448             iresumeThread(tid);
449         }
450     }
451
452     for (i = 0; r > 0 && i <= maxFd; i++)
453     {
454         if (readQ[i] != 0 && FD_ISSET(i, &rd))
455         {
456             for (tid = readQ[i]; tid != 0; tid = ntid)
457             {
458                 ntid = tid->next;
459                 iresumeThread(tid);
460             }
461             readQ[i] = 0;
462             r--;
463         }
464         if (writeQ[i] != 0 && FD_ISSET(i, &wr))
465         {
466             for (tid = writeQ[i]; tid != 0; tid = ntid)
467             {
468                 ntid = tid->next;
469                 iresumeThread(tid);
470             }
471             writeQ[i] = 0;
472             r--;
473         }
474     }
475 }