379b49af11d16acc7a3afa01037741209b3d6441
[cacao.git] / src / threads / green / threadio.c
1 /*
2  * threadCalls.c
3  * Support for threaded ops which may block (read, write, connect, etc.).
4  *
5  * Copyright (c) 1996 T. J. Wilkinson & Associates, London, UK.
6  *
7  * See the file "license.terms" for information on usage and redistribution
8  * of this file, and for a DISCLAIMER OF ALL WARRANTIES.
9  *
10  * Written by Tim Wilkinson <tim@tjwassoc.demon.co.uk>, 1996.
11  */
12
13 #define DBG(s)                    
14
15 #include "sysdep/defines.h"
16
17 #include <sys/types.h>
18 #include <sys/time.h>
19 #include <sys/socket.h>
20 #include <fcntl.h>
21 #include <errno.h>
22 #include <assert.h>
23 #include <unistd.h>
24
25 #include "thread.h"
26
27 /*
28  * We only need this stuff is we are using the internal thread system.
29  */
30 #if defined(USE_INTERNAL_THREADS)
31
32 #define TH_READ         0
33 #define TH_WRITE        1
34 #define TH_ACCEPT       TH_READ
35 #define TH_CONNECT      TH_WRITE
36
37 static int maxFd;
38 static fd_set readsPending;
39 static fd_set writesPending;
40 static thread* readQ[FD_SETSIZE];
41 static thread* writeQ[FD_SETSIZE];
42 static struct timeval tm = { 0, 0 };
43
44 void blockOnFile(int, int);
45 void waitOnEvents(void);
46
47 extern thread* currentThread;
48
49 /* These are undefined because we do not yet support async I/O */
50 #undef  F_SETOWN
51 #undef  FIOSETOWN
52 #undef  O_ASYNC
53 #undef  FIOASYNC
54
55 /*
56  * Create a threaded file descriptor.
57  */
58 int
59 threadedFileDescriptor(int fd)
60 {
61 #if !defined(BLOCKING_CALLS)
62     int r;
63     int on = 1;
64     int pid;
65
66     /* Make non-blocking */
67 #if defined(HAVE_FCNTL) && defined(O_NONBLOCK)
68     r = fcntl(fd, F_GETFL, 0);
69     r = fcntl(fd, F_SETFL, r|O_NONBLOCK);
70 #elif defined(HAVE_IOCTL) && defined(FIONBIO)
71     r = ioctl(fd, FIONBIO, &on);
72 #else
73     r = 0;
74 #endif
75     if (r < 0)
76     {
77         return (r);
78     }
79
80     /* Allow socket to signal this process when new data is available */
81     pid = getpid();
82 #if defined(HAVE_FCNTL) && defined(F_SETOWN)
83     r = fcntl(fd, F_SETOWN, pid);
84 #elif defined(HAVE_IOCTL) && defined(FIOSETOWN)
85     r = ioctl(fd, FIOSETOWN, &pid);
86 #else
87     r = 0;
88 #endif
89     if (r < 0)
90     {
91         return (r);
92     }
93
94 #if defined(HAVE_FCNTL) && defined(O_ASYNC)
95     r = fcntl(fd, F_GETFL, 0);
96     r = fcntl(fd, F_SETFL, r|O_ASYNC);
97 #elif defined(HAVE_IOCTL) && defined(FIOASYNC)
98     r = ioctl(fd, FIOASYNC, &on);
99 #else
100     r = 0;
101 #endif
102     if (r < 0)
103     {
104         return (r);
105     }
106 #endif
107     return (fd);
108 }
109
110 void clear_thread_flags(void)
111 {
112 #if !defined(BLOCKING_CALLS)
113         int fl, fd;
114
115 #if defined(HAVE_FCNTL) && defined(O_NONBLOCK)
116         fd = fileno(stdin);
117     fl = fcntl(fd, F_GETFL, 0);
118     fl = fcntl(fd, F_SETFL, fl & (~O_NONBLOCK));
119
120         fd = fileno(stdout);
121     fl = fcntl(fd, F_GETFL, 0);
122     fl = fcntl(fd, F_SETFL, fl & (~O_NONBLOCK));
123
124         fd = fileno(stderr);
125     fl = fcntl(fd, F_GETFL, 0);
126     fl = fcntl(fd, F_SETFL, fl & (~O_NONBLOCK));
127 #elif defined(HAVE_IOCTL) && defined(FIONBIO)
128         fl = 0;
129         fd = fileno(stdin);
130     (void) ioctl(fd, FIONBIO, &fl);
131
132         fd = fileno(stdout);
133     (void) ioctl(fd, FIONBIO, &fl);
134
135         fd = fileno(stderr);
136     (void) ioctl(fd, FIONBIO, &fl);
137 #endif
138
139 #endif
140 }
141
142
143 /*
144  * Threaded create socket.
145  */
146 int
147 threadedSocket(int af, int type, int proto)
148 {
149     int fd;
150     int r;
151     int on = 1;
152     int pid;
153
154     fd = socket(af, type, proto);
155     return (threadedFileDescriptor(fd));
156 }
157
158 /*
159  * Threaded file open.
160  */
161 int
162 threadedOpen(char* path, int flags, int mode)
163 {
164     int fd;
165     int r;
166     int on = 1;
167     int pid;
168
169     fd = open(path, flags, mode);
170     return (threadedFileDescriptor(fd));
171 }
172
173 /*
174  * Threaded socket connect.
175  */
176 int
177 threadedConnect(int fd, struct sockaddr* addr, int len)
178 {
179     int r;
180
181     r = connect(fd, addr, len);
182 #if !defined(BLOCKING_CALLS)
183     if ((r < 0)
184         && (errno == EINPROGRESS || errno == EALREADY
185             || errno == EWOULDBLOCK)) {
186         blockOnFile(fd, TH_CONNECT);
187         r = 0; /* Assume it's okay when we get released */
188     }
189 #endif
190
191     return (r);
192 }
193
194 /*
195  * Threaded socket accept.
196  */
197 int
198 threadedAccept(int fd, struct sockaddr* addr, int* len)
199 {
200     int r;
201     int on = 1;
202
203     for (;;)
204     {
205 #if defined(BLOCKING_CALLS)
206         blockOnFile(fd, TH_ACCEPT);
207 #endif
208         r = accept(fd, addr, len);
209         if (r >= 0
210             || !(errno == EINPROGRESS || errno == EALREADY
211                  || errno == EWOULDBLOCK))
212         {
213             break;
214         }
215 #if !defined(BLOCKING_CALLS)
216         blockOnFile(fd, TH_ACCEPT);
217 #endif
218     }
219     return (threadedFileDescriptor(r));
220 }
221
222 /*
223  * Read but only if we can.
224  */
225 int
226 threadedRead(int fd, char* buf, int len)
227 {
228     int r;
229
230     DBG(   printf("threadedRead\n");          )
231
232 #if defined(BLOCKING_CALLS)
233     blockOnFile(fd, TH_READ);
234 #endif
235     for (;;)
236     {
237         r = read(fd, buf, len);
238         if (r < 0
239             && (errno == EAGAIN || errno == EWOULDBLOCK
240                 || errno == EINTR))
241         {
242             blockOnFile(fd, TH_READ);
243             continue;
244         }
245         return (r);
246     }
247 }
248
249 /*
250  * Write but only if we can.
251  */
252 int
253 threadedWrite(int fd, char* buf, int len)
254 {
255     int r;
256     char* ptr;
257
258     ptr = buf;
259     r = 1;
260
261     DBG(    printf("threadedWrite %dbytes\n",len);      )
262
263     while (len > 0 && r > 0)
264     {
265 #if defined(BLOCKING_CALLS)
266         blockOnFile(fd, TH_WRITE);
267 #endif
268         r = write(fd, ptr, len);
269         if (r < 0
270             && (errno == EAGAIN || errno == EWOULDBLOCK
271                 || errno == EINTR))
272         {
273 #if !defined(BLOCKING_CALLS)
274             blockOnFile(fd, TH_WRITE);
275 #endif
276             r = 1;
277         }
278         else
279         {
280             ptr += r;
281             len -= r;
282         }
283     }
284     return (ptr - buf);
285 }
286
287 /*
288  * An attempt to access a file would block, so suspend the thread until
289  * it will happen.
290  */
291 void
292 blockOnFile(int fd, int op)
293 {
294 DBG(    printf("blockOnFile()\n");                                      )
295
296     intsDisable();
297
298     if (fd > maxFd)
299     {
300         maxFd = fd;
301     }
302     if (op == TH_READ)
303     {
304         FD_SET(fd, &readsPending);
305         suspendOnQThread(currentThread, &readQ[fd]);
306         FD_CLR(fd, &readsPending);
307     }
308     else
309     {
310         FD_SET(fd, &writesPending);
311         suspendOnQThread(currentThread, &writeQ[fd]);
312         FD_CLR(fd, &writesPending);
313     }
314
315     intsRestore();
316 }
317
318 /*
319  * Check if some file descriptor or other event to become ready.
320  * Block if required (but make sure we can still take timer interrupts).
321  */
322 void
323 checkEvents(bool block)
324 {
325     int r;
326     fd_set rd;
327     fd_set wr;
328     thread* tid;
329     thread* ntid;
330     int i;
331     int b;
332
333 DBG(    printf("checkEvents block:%d\n", block);                        )
334
335 #if defined(FD_COPY)
336     FD_COPY(&readsPending, &rd);
337     FD_COPY(&writesPending, &wr);
338 #else
339     memcpy(&rd, &readsPending, sizeof(rd));
340     memcpy(&wr, &writesPending, sizeof(wr));
341 #endif
342
343     /* 
344      * If select() is called with indefinite wait, we have to make sure
345      * we can get interrupted by timer events. 
346      */
347     if (block == true)
348     {
349         b = blockInts;
350         blockInts = 0;
351         r = select(maxFd+1, &rd, &wr, 0, 0);
352         blockInts = b;
353     }
354     else
355     {
356         r = select(maxFd+1, &rd, &wr, 0, &tm);
357     }
358
359     /* We must be holding off interrupts before we start playing with
360      * the read and write queues.  This should be already done but a
361      * quick check never hurt anyone.
362      */
363     assert(blockInts > 0);
364
365 DBG(    printf("Select returns %d\n", r);                               )
366
367     for (i = 0; r > 0 && i <= maxFd; i++)
368     {
369         if (readQ[i] != 0 && FD_ISSET(i, &rd))
370         {
371             for (tid = readQ[i]; tid != 0; tid = ntid)
372             {
373                 ntid = tid->next;
374                 iresumeThread(tid);
375             }
376             readQ[i] = 0;
377             r--;
378         }
379         if (writeQ[i] != 0 && FD_ISSET(i, &wr))
380         {
381             for (tid = writeQ[i]; tid != 0; tid = ntid)
382             {
383                 ntid = tid->next;
384                 iresumeThread(tid);
385             }
386             writeQ[i] = 0;
387             r--;
388         }
389     }
390 }
391 #endif