Initial revision
[cacao.git] / src / threads / green / threadio.c
1 /*
2  * threadCalls.c
3  * Support for threaded ops which may block (read, write, connect, etc.).
4  *
5  * Copyright (c) 1996 T. J. Wilkinson & Associates, London, UK.
6  *
7  * See the file "license.terms" for information on usage and redistribution
8  * of this file, and for a DISCLAIMER OF ALL WARRANTIES.
9  *
10  * Written by Tim Wilkinson <tim@tjwassoc.demon.co.uk>, 1996.
11  */
12
13 #define DBG(s)                    
14
15 #include "sysdep/defines.h"
16
17 #include <sys/types.h>
18 #include <sys/time.h>
19 #include <sys/socket.h>
20 #include <fcntl.h>
21 #include <errno.h>
22 #include <assert.h>
23 #include <unistd.h>
24
25 #include "thread.h"
26
27 /*
28  * We only need this stuff is we are using the internal thread system.
29  */
30 #if defined(USE_INTERNAL_THREADS)
31
32 #define TH_READ         0
33 #define TH_WRITE        1
34 #define TH_ACCEPT       TH_READ
35 #define TH_CONNECT      TH_WRITE
36
37 static int maxFd;
38 static fd_set readsPending;
39 static fd_set writesPending;
40 static thread* readQ[FD_SETSIZE];
41 static thread* writeQ[FD_SETSIZE];
42 static struct timeval tm = { 0, 0 };
43
44 void blockOnFile(int, int);
45 void waitOnEvents(void);
46
47 extern thread* currentThread;
48
49 /* These are undefined because we do not yet support async I/O */
50 #undef  F_SETOWN
51 #undef  FIOSETOWN
52 #undef  O_ASYNC
53 #undef  FIOASYNC
54
55 /*
56  * Create a threaded file descriptor.
57  */
58 int
59 threadedFileDescriptor(int fd)
60 {
61 #if !defined(BLOCKING_CALLS)
62     int r;
63     int on = 1;
64     int pid;
65
66     /* Make non-blocking */
67 #if defined(HAVE_FCNTL) && defined(O_NONBLOCK)
68     r = fcntl(fd, F_GETFL, 0);
69     r = fcntl(fd, F_SETFL, r|O_NONBLOCK);
70 #elif defined(HAVE_IOCTL) && defined(FIONBIO)
71     r = ioctl(fd, FIONBIO, &on);
72 #else
73     r = 0;
74 #endif
75     if (r < 0)
76     {
77         return (r);
78     }
79
80     /* Allow socket to signal this process when new data is available */
81     pid = getpid();
82 #if defined(HAVE_FCNTL) && defined(F_SETOWN)
83     r = fcntl(fd, F_SETOWN, pid);
84 #elif defined(HAVE_IOCTL) && defined(FIOSETOWN)
85     r = ioctl(fd, FIOSETOWN, &pid);
86 #else
87     r = 0;
88 #endif
89     if (r < 0)
90     {
91         return (r);
92     }
93
94 #if defined(HAVE_FCNTL) && defined(O_ASYNC)
95     r = fcntl(fd, F_GETFL, 0);
96     r = fcntl(fd, F_SETFL, r|O_ASYNC);
97 #elif defined(HAVE_IOCTL) && defined(FIOASYNC)
98     r = ioctl(fd, FIOASYNC, &on);
99 #else
100     r = 0;
101 #endif
102     if (r < 0)
103     {
104         return (r);
105     }
106 #endif
107     return (fd);
108 }
109
110 /*
111  * Threaded create socket.
112  */
113 int
114 threadedSocket(int af, int type, int proto)
115 {
116     int fd;
117     int r;
118     int on = 1;
119     int pid;
120
121     fd = socket(af, type, proto);
122     return (threadedFileDescriptor(fd));
123 }
124
125 /*
126  * Threaded file open.
127  */
128 int
129 threadedOpen(char* path, int flags, int mode)
130 {
131     int fd;
132     int r;
133     int on = 1;
134     int pid;
135
136     fd = open(path, flags, mode);
137     return (threadedFileDescriptor(fd));
138 }
139
140 /*
141  * Threaded socket connect.
142  */
143 int
144 threadedConnect(int fd, struct sockaddr* addr, int len)
145 {
146     int r;
147
148     r = connect(fd, addr, len);
149 #if !defined(BLOCKING_CALLS)
150     if ((r < 0)
151         && (errno == EINPROGRESS || errno == EALREADY
152             || errno == EWOULDBLOCK)) {
153         blockOnFile(fd, TH_CONNECT);
154         r = 0; /* Assume it's okay when we get released */
155     }
156 #endif
157
158     return (r);
159 }
160
161 /*
162  * Threaded socket accept.
163  */
164 int
165 threadedAccept(int fd, struct sockaddr* addr, int* len)
166 {
167     int r;
168     int on = 1;
169
170     for (;;)
171     {
172 #if defined(BLOCKING_CALLS)
173         blockOnFile(fd, TH_ACCEPT);
174 #endif
175         r = accept(fd, addr, len);
176         if (r >= 0
177             || !(errno == EINPROGRESS || errno == EALREADY
178                  || errno == EWOULDBLOCK))
179         {
180             break;
181         }
182 #if !defined(BLOCKING_CALLS)
183         blockOnFile(fd, TH_ACCEPT);
184 #endif
185     }
186     return (threadedFileDescriptor(r));
187 }
188
189 /*
190  * Read but only if we can.
191  */
192 int
193 threadedRead(int fd, char* buf, int len)
194 {
195     int r;
196
197     DBG(   printf("threadedRead\n");          )
198
199 #if defined(BLOCKING_CALLS)
200     blockOnFile(fd, TH_READ);
201 #endif
202     for (;;)
203     {
204         r = read(fd, buf, len);
205         if (r < 0
206             && (errno == EAGAIN || errno == EWOULDBLOCK
207                 || errno == EINTR))
208         {
209             blockOnFile(fd, TH_READ);
210             continue;
211         }
212         return (r);
213     }
214 }
215
216 /*
217  * Write but only if we can.
218  */
219 int
220 threadedWrite(int fd, char* buf, int len)
221 {
222     int r;
223     char* ptr;
224
225     ptr = buf;
226     r = 1;
227
228     DBG(    printf("threadedWrite %dbytes\n",len);      )
229
230     while (len > 0 && r > 0)
231     {
232 #if defined(BLOCKING_CALLS)
233         blockOnFile(fd, TH_WRITE);
234 #endif
235         r = write(fd, ptr, len);
236         if (r < 0
237             && (errno == EAGAIN || errno == EWOULDBLOCK
238                 || errno == EINTR))
239         {
240 #if !defined(BLOCKING_CALLS)
241             blockOnFile(fd, TH_WRITE);
242 #endif
243             r = 1;
244         }
245         else
246         {
247             ptr += r;
248             len -= r;
249         }
250     }
251     return (ptr - buf);
252 }
253
254 /*
255  * An attempt to access a file would block, so suspend the thread until
256  * it will happen.
257  */
258 void
259 blockOnFile(int fd, int op)
260 {
261 DBG(    printf("blockOnFile()\n");                                      )
262
263     intsDisable();
264
265     if (fd > maxFd)
266     {
267         maxFd = fd;
268     }
269     if (op == TH_READ)
270     {
271         FD_SET(fd, &readsPending);
272         suspendOnQThread(currentThread, &readQ[fd]);
273         FD_CLR(fd, &readsPending);
274     }
275     else
276     {
277         FD_SET(fd, &writesPending);
278         suspendOnQThread(currentThread, &writeQ[fd]);
279         FD_CLR(fd, &writesPending);
280     }
281
282     intsRestore();
283 }
284
285 /*
286  * Check if some file descriptor or other event to become ready.
287  * Block if required (but make sure we can still take timer interrupts).
288  */
289 void
290 checkEvents(bool block)
291 {
292     int r;
293     fd_set rd;
294     fd_set wr;
295     thread* tid;
296     thread* ntid;
297     int i;
298     int b;
299
300 DBG(    printf("checkEvents block:%d\n", block);                        )
301
302 #if defined(FD_COPY)
303     FD_COPY(&readsPending, &rd);
304     FD_COPY(&writesPending, &wr);
305 #else
306     memcpy(&rd, &readsPending, sizeof(rd));
307     memcpy(&wr, &writesPending, sizeof(wr));
308 #endif
309
310     /* 
311      * If select() is called with indefinite wait, we have to make sure
312      * we can get interrupted by timer events. 
313      */
314     if (block == true)
315     {
316         b = blockInts;
317         blockInts = 0;
318         r = select(maxFd+1, &rd, &wr, 0, 0);
319         blockInts = b;
320     }
321     else
322     {
323         r = select(maxFd+1, &rd, &wr, 0, &tm);
324     }
325
326     /* We must be holding off interrupts before we start playing with
327      * the read and write queues.  This should be already done but a
328      * quick check never hurt anyone.
329      */
330     assert(blockInts > 0);
331
332 DBG(    printf("Select returns %d\n", r);                               )
333
334     for (i = 0; r > 0 && i <= maxFd; i++)
335     {
336         if (readQ[i] != 0 && FD_ISSET(i, &rd))
337         {
338             for (tid = readQ[i]; tid != 0; tid = ntid)
339             {
340                 ntid = tid->next;
341                 iresumeThread(tid);
342             }
343             readQ[i] = 0;
344             r--;
345         }
346         if (writeQ[i] != 0 && FD_ISSET(i, &wr))
347         {
348             for (tid = writeQ[i]; tid != 0; tid = ntid)
349             {
350                 ntid = tid->next;
351                 iresumeThread(tid);
352             }
353             writeQ[i] = 0;
354             r--;
355         }
356     }
357 }
358 #endif