- added gnu header
[cacao.git] / threads / threadio.c
1 /*
2  * threadCalls.c
3  * Support for threaded ops which may block (read, write, connect, etc.).
4  *
5  * Copyright (c) 1996 T. J. Wilkinson & Associates, London, UK.
6  *
7  * See the file "license.terms" for information on usage and redistribution
8  * of this file, and for a DISCLAIMER OF ALL WARRANTIES.
9  *
10  * Written by Tim Wilkinson <tim@tjwassoc.demon.co.uk>, 1996.
11  */
12
13
14 #include <stdio.h>
15 #include <sys/types.h>
16 #include <sys/time.h>
17 #include <sys/socket.h>
18 #include <fcntl.h>
19 #include <errno.h>
20 #include <assert.h>
21 #include <unistd.h>
22
23 #include "config.h"
24 #include "thread.h"
25
26
27 #define TH_READ         0
28 #define TH_WRITE        1
29 #define TH_ACCEPT       TH_READ
30 #define TH_CONNECT      TH_WRITE
31
32 static int maxFd;
33 static fd_set readsPending;
34 static fd_set writesPending;
35 static thread* readQ[FD_SETSIZE];
36 static thread* writeQ[FD_SETSIZE];
37
38 void blockOnFile(int, int);
39 void waitOnEvents(void);
40
41 extern thread* currentThread;
42
43 /* These are undefined because we do not yet support async I/O */
44 #undef  F_SETOWN
45 #undef  FIOSETOWN
46 #undef  O_ASYNC
47 #undef  FIOASYNC
48
49 /*
50  * Create a threaded file descriptor.
51  */
52 int
53 threadedFileDescriptor(int fd)
54 {
55 #if !defined(BLOCKING_CALLS)
56     int r;
57 #if defined(HAVE_IOCTL) && defined(FIOASYNC)
58     int on = 1;
59 #endif
60     int pid;
61
62     /* Make non-blocking */
63 #if defined(HAVE_FCNTL) && defined(O_NONBLOCK)
64     r = fcntl(fd, F_GETFL, 0);
65     r = fcntl(fd, F_SETFL, r|O_NONBLOCK);
66 #elif defined(HAVE_IOCTL) && defined(FIONBIO)
67     r = ioctl(fd, FIONBIO, &on);
68 #else
69     r = 0;
70 #endif
71     if (r < 0)
72     {
73         return (r);
74     }
75
76     /* Allow socket to signal this process when new data is available */
77     pid = getpid();
78 #if defined(HAVE_FCNTL) && defined(F_SETOWN)
79     r = fcntl(fd, F_SETOWN, pid);
80 #elif defined(HAVE_IOCTL) && defined(FIOSETOWN)
81     r = ioctl(fd, FIOSETOWN, &pid);
82 #else
83     r = 0;
84 #endif
85     if (r < 0)
86     {
87         return (r);
88     }
89
90 #if defined(HAVE_FCNTL) && defined(O_ASYNC)
91     r = fcntl(fd, F_GETFL, 0);
92     r = fcntl(fd, F_SETFL, r|O_ASYNC);
93 #elif defined(HAVE_IOCTL) && defined(FIOASYNC)
94     r = ioctl(fd, FIOASYNC, &on);
95 #else
96     r = 0;
97 #endif
98     if (r < 0)
99     {
100         return (r);
101     }
102 #endif
103     return (fd);
104 }
105
106 void clear_thread_flags(void)
107 {
108 #if !defined(BLOCKING_CALLS)
109 #if defined(HAVE_FCNTL) && defined(O_NONBLOCK)
110     int fl, fd;
111
112     fd = fileno(stdin);
113     fl = fcntl(fd, F_GETFL, 0);
114     fl = fcntl(fd, F_SETFL, fl & (~O_NONBLOCK));
115
116     fd = fileno(stdout);
117     fl = fcntl(fd, F_GETFL, 0);
118     fl = fcntl(fd, F_SETFL, fl & (~O_NONBLOCK));
119
120     fd = fileno(stderr);
121     fl = fcntl(fd, F_GETFL, 0);
122     fl = fcntl(fd, F_SETFL, fl & (~O_NONBLOCK));
123
124 #elif defined(HAVE_IOCTL) && defined(FIONBIO)
125     int fl, fd;
126
127     fl = 0;
128     fd = fileno(stdin);
129     (void) ioctl(fd, FIONBIO, &fl);
130
131     fd = fileno(stdout);
132     (void) ioctl(fd, FIONBIO, &fl);
133
134     fd = fileno(stderr);
135     (void) ioctl(fd, FIONBIO, &fl);
136 #endif
137 #endif
138
139     fflush(stdout);
140     fflush(stderr);
141 }
142
143
144 /*
145  * Threaded create socket.
146  */
147 int
148 threadedSocket(int af, int type, int proto)
149 {
150     int fd;
151
152     fd = socket(af, type, proto);
153     return (threadedFileDescriptor(fd));
154 }
155
156 /*
157  * Threaded file open.
158  */
159 int
160 threadedOpen(char* path, int flags, int mode)
161 {
162     int fd;
163
164     fd = open(path, flags, mode);
165     return (threadedFileDescriptor(fd));
166 }
167
168 /*
169  * Threaded socket connect.
170  */
171 int
172 threadedConnect(int fd, struct sockaddr* addr, int len)
173 {
174     int r;
175
176     r = connect(fd, addr, len);
177 #if !defined(BLOCKING_CALLS)
178     if ((r < 0)
179         && (errno == EINPROGRESS || errno == EALREADY
180             || errno == EWOULDBLOCK)) {
181         blockOnFile(fd, TH_CONNECT);
182         r = 0; /* Assume it's okay when we get released */
183     }
184 #endif
185
186     return (r);
187 }
188
189 /*
190  * Threaded socket accept.
191  */
192 int
193 threadedAccept(int fd, struct sockaddr* addr, int* len)
194 {
195     int r;
196
197     for (;;)
198     {
199 #if defined(BLOCKING_CALLS)
200         blockOnFile(fd, TH_ACCEPT);
201 #endif
202         r = accept(fd, addr, (int*)len);
203         if (r >= 0
204             || !(errno == EINPROGRESS || errno == EALREADY
205                  || errno == EWOULDBLOCK))
206         {
207             break;
208         }
209 #if !defined(BLOCKING_CALLS)
210         blockOnFile(fd, TH_ACCEPT);
211 #endif
212     }
213     return (threadedFileDescriptor(r));
214 }
215
216 /*
217  * Read but only if we can.
218  */
219 int
220 threadedRead(int fd, char* buf, int len)
221 {
222     int r;
223
224     DBG(   printf("threadedRead\n");          )
225
226 #if defined(BLOCKING_CALLS)
227     blockOnFile(fd, TH_READ);
228 #endif
229     for (;;)
230     {
231         r = read(fd, buf, len);
232         if (r < 0
233             && (errno == EAGAIN || errno == EWOULDBLOCK
234                 || errno == EINTR))
235         {
236             blockOnFile(fd, TH_READ);
237             continue;
238         }
239         return (r);
240     }
241 }
242
243 /*
244  * Write but only if we can.
245  */
246 int
247 threadedWrite(int fd, char* buf, int len)
248 {
249     int r;
250     char* ptr;
251
252     ptr = buf;
253     r = 1;
254
255     DBG(    printf("threadedWrite %dbytes\n",len);      )
256
257     while (len > 0 && r > 0)
258     {
259 #if defined(BLOCKING_CALLS)
260         blockOnFile(fd, TH_WRITE);
261 #endif
262         r = write(fd, ptr, len);
263         if (r < 0
264             && (errno == EAGAIN || errno == EWOULDBLOCK
265                 || errno == EINTR))
266         {
267 #if !defined(BLOCKING_CALLS)
268             blockOnFile(fd, TH_WRITE);
269 #endif
270             r = 1;
271         }
272         else
273         {
274             ptr += r;
275             len -= r;
276         }
277     }
278     return (ptr - buf);
279 }
280
281 /*
282  * Receive, but only if we can.
283  */
284 int
285 threadedRecvfrom (int fd, void *buf, size_t len, int flags, struct sockaddr *addr, int *addrlen)
286 {
287     int r;
288
289     DBG(   printf("threadedRecvfrom\n");          )
290
291 #if defined(BLOCKING_CALLS)
292     blockOnFile(fd, TH_READ);
293 #endif
294     for (;;)
295     {
296         r = recvfrom(fd, buf, len, flags, addr, addrlen);
297         if (r < 0
298             && (errno == EAGAIN || errno == EWOULDBLOCK
299                 || errno == EINTR))
300         {
301             blockOnFile(fd, TH_READ);
302             continue;
303         }
304         return (r);
305     }
306 }
307
308 /*
309  * Send, but only if we can.
310  */
311 int
312 threadedSendto (int fd, void *buf, size_t len, int flags, struct sockaddr *addr, int addrlen)
313 {
314     int r;
315
316     DBG(   printf("threadedSendto\n");          )
317
318 #if defined(BLOCKING_CALLS)
319     blockOnFile(fd, TH_WRITE);
320 #endif
321     for (;;)
322     {
323         r = sendto(fd, buf, len, flags, addr, addrlen);
324         if (r < 0
325             && (errno == EAGAIN || errno == EWOULDBLOCK
326                 || errno == EINTR))
327         {
328             blockOnFile(fd, TH_WRITE);
329             continue;
330         }
331         return (r);
332     }
333 }
334
335 /*
336  * An attempt to access a file would block, so suspend the thread until
337  * it will happen.
338  */
339 void
340 blockOnFile(int fd, int op)
341 {
342 DBG(    printf("blockOnFile()\n");                                      )
343
344     intsDisable();
345
346     if (fd > maxFd)
347     {
348         maxFd = fd;
349     }
350
351     if (op == TH_READ)
352     {
353         FD_SET(fd, &readsPending);
354         suspendOnQThread(currentThread, &readQ[fd]);
355         FD_CLR(fd, &readsPending);
356     }
357     else
358     {
359         FD_SET(fd, &writesPending);
360         suspendOnQThread(currentThread, &writeQ[fd]);
361         FD_CLR(fd, &writesPending);
362     }
363
364     intsRestore();
365 }
366
367 /*
368  * Check if some file descriptor or other event to become ready.
369  * Block if required (but make sure we can still take timer interrupts).
370  */
371 void
372 checkEvents(bool block)
373 {
374     int r;
375     fd_set rd;
376     fd_set wr;
377     thread* tid;
378     thread* ntid;
379     int i;
380     s8 time = -1;
381     struct timeval tv;
382     struct timeval *timeout;
383
384     assert(blockInts > 0);
385
386     DBG( printf("checkEvents block:%d\n", block); )
387
388     if (sleepThreads != 0)
389     {
390         time = currentTime();
391         while (sleepThreads != 0 && time >= CONTEXT(sleepThreads).time)
392         {
393             tid = sleepThreads;
394             sleepThreads = sleepThreads->next;
395             tid->next = 0;
396
397             iresumeThread(tid);
398         }
399     }
400
401     if (block)
402     {
403         if (sleepThreads != 0)
404         {
405             s8 wait_time = CONTEXT(sleepThreads).time - time;
406
407             tv.tv_sec = wait_time / 1000;
408             tv.tv_usec = (wait_time % 1000) * 1000;
409             timeout = &tv;
410         }
411         else
412             timeout = 0;
413     }
414     else
415     {
416         tv.tv_sec = 0;
417         tv.tv_usec = 0;
418         timeout = &tv;
419     }
420
421 #if defined(FD_COPY)
422     FD_COPY(&readsPending, &rd);
423     FD_COPY(&writesPending, &wr);
424 #else
425     memcpy(&rd, &readsPending, sizeof(rd));
426     memcpy(&wr, &writesPending, sizeof(wr));
427 #endif
428
429     r = select(maxFd+1, &rd, &wr, 0, timeout);
430
431     /* We must be holding off interrupts before we start playing with
432      * the read and write queues.  This should be already done but a
433      * quick check never hurt anyone.
434      */
435     assert(blockInts > 0);
436
437     DBG( printf("Select returns %d\n", r); )
438
439     /* Some threads may have finished sleeping.
440      */
441     if (block && sleepThreads != 0)
442     {
443         time = currentTime();
444         while (sleepThreads != 0 && time >= CONTEXT(sleepThreads).time)
445         {
446             tid = sleepThreads;
447             sleepThreads = sleepThreads->next;
448             tid->next = 0;
449
450             iresumeThread(tid);
451         }
452     }
453
454     for (i = 0; r > 0 && i <= maxFd; i++)
455     {
456         if (readQ[i] != 0 && FD_ISSET(i, &rd))
457         {
458             for (tid = readQ[i]; tid != 0; tid = ntid)
459             {
460                 ntid = tid->next;
461                 iresumeThread(tid);
462             }
463             readQ[i] = 0;
464             r--;
465         }
466         if (writeQ[i] != 0 && FD_ISSET(i, &wr))
467         {
468             for (tid = writeQ[i]; tid != 0; tid = ntid)
469             {
470                 ntid = tid->next;
471                 iresumeThread(tid);
472             }
473             writeQ[i] = 0;
474             r--;
475         }
476     }
477 }