nat/GtkLabelPeer.c has been removed
[cacao.git] / threads / threadio.c
1 /*
2  * threadCalls.c
3  * Support for threaded ops which may block (read, write, connect, etc.).
4  *
5  * Copyright (c) 1996 T. J. Wilkinson & Associates, London, UK.
6  *
7  * See the file "license.terms" for information on usage and redistribution
8  * of this file, and for a DISCLAIMER OF ALL WARRANTIES.
9  *
10  * Written by Tim Wilkinson <tim@tjwassoc.demon.co.uk>, 1996.
11  */
12
13
14 #include <stdio.h>
15 #include <sys/types.h>
16 #include <sys/time.h>
17 #include <sys/socket.h>
18 #include <fcntl.h>
19 #include <errno.h>
20 #include <assert.h>
21 #include <unistd.h>
22 #include <string.h>
23
24 #include "config.h"
25 #include "threads/thread.h"
26
27 #if !defined(NATIVE_THREADS)
28
29 #define TH_READ         0
30 #define TH_WRITE        1
31 #define TH_ACCEPT       TH_READ
32 #define TH_CONNECT      TH_WRITE
33
34 static int maxFd;
35 static fd_set readsPending;
36 static fd_set writesPending;
37 static thread* readQ[FD_SETSIZE];
38 static thread* writeQ[FD_SETSIZE];
39
40 void blockOnFile(int, int);
41 void waitOnEvents(void);
42
43 extern thread* currentThread;
44
45 /* These are undefined because we do not yet support async I/O */
46 #undef  F_SETOWN
47 #undef  FIOSETOWN
48 #undef  O_ASYNC
49 #undef  FIOASYNC
50
51 /*
52  * Create a threaded file descriptor.
53  */
54 int
55 threadedFileDescriptor(int fd)
56 {
57 #if !defined(BLOCKING_CALLS)
58     int r;
59 #if defined(HAVE_IOCTL) && defined(FIOASYNC)
60     int on = 1;
61 #endif
62     int pid;
63
64     /* Make non-blocking */
65 #if defined(HAVE_FCNTL) && defined(O_NONBLOCK)
66     r = fcntl(fd, F_GETFL, 0);
67     r = fcntl(fd, F_SETFL, r|O_NONBLOCK);
68 #elif defined(HAVE_IOCTL) && defined(FIONBIO)
69     r = ioctl(fd, FIONBIO, &on);
70 #else
71     r = 0;
72 #endif
73     if (r < 0)
74     {
75         return (r);
76     }
77
78     /* Allow socket to signal this process when new data is available */
79     pid = getpid();
80 #if defined(HAVE_FCNTL) && defined(F_SETOWN)
81     r = fcntl(fd, F_SETOWN, pid);
82 #elif defined(HAVE_IOCTL) && defined(FIOSETOWN)
83     r = ioctl(fd, FIOSETOWN, &pid);
84 #else
85     r = 0;
86 #endif
87     if (r < 0)
88     {
89         return (r);
90     }
91
92 #if defined(HAVE_FCNTL) && defined(O_ASYNC)
93     r = fcntl(fd, F_GETFL, 0);
94     r = fcntl(fd, F_SETFL, r|O_ASYNC);
95 #elif defined(HAVE_IOCTL) && defined(FIOASYNC)
96     r = ioctl(fd, FIOASYNC, &on);
97 #else
98     r = 0;
99 #endif
100     if (r < 0)
101     {
102         return (r);
103     }
104 #endif
105     return (fd);
106 }
107
108 void clear_thread_flags(void)
109 {
110 #if !defined(BLOCKING_CALLS)
111 #if defined(HAVE_FCNTL) && defined(O_NONBLOCK)
112     int fl, fd;
113
114     fd = fileno(stdin);
115     fl = fcntl(fd, F_GETFL, 0);
116     fl = fcntl(fd, F_SETFL, fl & (~O_NONBLOCK));
117
118     fd = fileno(stdout);
119     fl = fcntl(fd, F_GETFL, 0);
120     fl = fcntl(fd, F_SETFL, fl & (~O_NONBLOCK));
121
122     fd = fileno(stderr);
123     fl = fcntl(fd, F_GETFL, 0);
124     fl = fcntl(fd, F_SETFL, fl & (~O_NONBLOCK));
125
126 #elif defined(HAVE_IOCTL) && defined(FIONBIO)
127     int fl, fd;
128
129     fl = 0;
130     fd = fileno(stdin);
131     (void) ioctl(fd, FIONBIO, &fl);
132
133     fd = fileno(stdout);
134     (void) ioctl(fd, FIONBIO, &fl);
135
136     fd = fileno(stderr);
137     (void) ioctl(fd, FIONBIO, &fl);
138 #endif
139 #endif
140
141     fflush(stdout);
142     fflush(stderr);
143 }
144
145
146 /*
147  * Threaded create socket.
148  */
149 int
150 threadedSocket(int af, int type, int proto)
151 {
152     int fd;
153
154     fd = socket(af, type, proto);
155     return (threadedFileDescriptor(fd));
156 }
157
158 /*
159  * Threaded file open.
160  */
161 int
162 threadedOpen(char* path, int flags, int mode)
163 {
164     int fd;
165
166     fd = open(path, flags, mode);
167     return (threadedFileDescriptor(fd));
168 }
169
170 /*
171  * Threaded socket connect.
172  */
173 int
174 threadedConnect(int fd, struct sockaddr* addr, int len)
175 {
176     int r;
177
178     r = connect(fd, addr, len);
179 #if !defined(BLOCKING_CALLS)
180     if ((r < 0)
181         && (errno == EINPROGRESS || errno == EALREADY
182             || errno == EWOULDBLOCK)) {
183         blockOnFile(fd, TH_CONNECT);
184         r = 0; /* Assume it's okay when we get released */
185     }
186 #endif
187
188     return (r);
189 }
190
191 /*
192  * Threaded socket accept.
193  */
194 int
195 threadedAccept(int fd, struct sockaddr* addr, int* len)
196 {
197     int r;
198
199     for (;;)
200     {
201 #if defined(BLOCKING_CALLS)
202         blockOnFile(fd, TH_ACCEPT);
203 #endif
204         r = accept(fd, addr, (int*)len);
205         if (r >= 0
206             || !(errno == EINPROGRESS || errno == EALREADY
207                  || errno == EWOULDBLOCK))
208         {
209             break;
210         }
211 #if !defined(BLOCKING_CALLS)
212         blockOnFile(fd, TH_ACCEPT);
213 #endif
214     }
215     return (threadedFileDescriptor(r));
216 }
217
218 /*
219  * Read but only if we can.
220  */
221 int
222 threadedRead(int fd, char* buf, int len)
223 {
224     int r;
225
226     DBG(   printf("threadedRead\n");          )
227
228 #if defined(BLOCKING_CALLS)
229     blockOnFile(fd, TH_READ);
230 #endif
231     for (;;)
232     {
233         r = read(fd, buf, len);
234         if (r < 0
235             && (errno == EAGAIN || errno == EWOULDBLOCK
236                 || errno == EINTR))
237         {
238             blockOnFile(fd, TH_READ);
239             continue;
240         }
241         return (r);
242     }
243 }
244
245 /*
246  * Write but only if we can.
247  */
248 int
249 threadedWrite(int fd, char* buf, int len)
250 {
251     int r;
252     char* ptr;
253
254     ptr = buf;
255     r = 1;
256
257     DBG(    printf("threadedWrite %dbytes\n",len);      )
258
259     while (len > 0 && r > 0)
260     {
261 #if defined(BLOCKING_CALLS)
262         blockOnFile(fd, TH_WRITE);
263 #endif
264         r = write(fd, ptr, len);
265         if (r < 0
266             && (errno == EAGAIN || errno == EWOULDBLOCK
267                 || errno == EINTR))
268         {
269 #if !defined(BLOCKING_CALLS)
270             blockOnFile(fd, TH_WRITE);
271 #endif
272             r = 1;
273         }
274         else
275         {
276             ptr += r;
277             len -= r;
278         }
279     }
280     return (ptr - buf);
281 }
282
283 /*
284  * Receive, but only if we can.
285  */
286 int
287 threadedRecvfrom (int fd, void *buf, size_t len, int flags, struct sockaddr *addr, int *addrlen)
288 {
289     int r;
290
291     DBG(   printf("threadedRecvfrom\n");          )
292
293 #if defined(BLOCKING_CALLS)
294     blockOnFile(fd, TH_READ);
295 #endif
296     for (;;)
297     {
298         r = recvfrom(fd, buf, len, flags, addr, addrlen);
299         if (r < 0
300             && (errno == EAGAIN || errno == EWOULDBLOCK
301                 || errno == EINTR))
302         {
303             blockOnFile(fd, TH_READ);
304             continue;
305         }
306         return (r);
307     }
308 }
309
310 /*
311  * Send, but only if we can.
312  */
313 int
314 threadedSendto (int fd, void *buf, size_t len, int flags, struct sockaddr *addr, int addrlen)
315 {
316     int r;
317
318     DBG(   printf("threadedSendto\n");          )
319
320 #if defined(BLOCKING_CALLS)
321     blockOnFile(fd, TH_WRITE);
322 #endif
323     for (;;)
324     {
325         r = sendto(fd, buf, len, flags, addr, addrlen);
326         if (r < 0
327             && (errno == EAGAIN || errno == EWOULDBLOCK
328                 || errno == EINTR))
329         {
330             blockOnFile(fd, TH_WRITE);
331             continue;
332         }
333         return (r);
334     }
335 }
336
337 /*
338  * An attempt to access a file would block, so suspend the thread until
339  * it will happen.
340  */
341 void
342 blockOnFile(int fd, int op)
343 {
344 DBG(    printf("blockOnFile()\n");                                      )
345
346     intsDisable();
347
348     if (fd > maxFd)
349     {
350         maxFd = fd;
351     }
352
353     if (op == TH_READ)
354     {
355         FD_SET(fd, &readsPending);
356         suspendOnQThread(currentThread, &readQ[fd]);
357         FD_CLR(fd, &readsPending);
358     }
359     else
360     {
361         FD_SET(fd, &writesPending);
362         suspendOnQThread(currentThread, &writeQ[fd]);
363         FD_CLR(fd, &writesPending);
364     }
365
366     intsRestore();
367 }
368
369 /*
370  * Check if some file descriptor or other event to become ready.
371  * Block if required (but make sure we can still take timer interrupts).
372  */
373 void
374 checkEvents(bool block)
375 {
376     int r;
377     fd_set rd;
378     fd_set wr;
379     thread* tid;
380     thread* ntid;
381     int i;
382     s8 time = -1;
383     struct timeval tv;
384     struct timeval *timeout;
385
386     assert(blockInts > 0);
387
388     DBG( printf("checkEvents block:%d\n", block); )
389
390     if (sleepThreads != 0)
391     {
392         time = currentTime();
393         while (sleepThreads != 0 && time >= CONTEXT(sleepThreads).time)
394         {
395             tid = sleepThreads;
396             sleepThreads = sleepThreads->vmThread->next;
397             tid->vmThread->next = 0;
398
399             iresumeThread(tid);
400         }
401     }
402
403     if (block)
404     {
405         if (sleepThreads != 0)
406         {
407             s8 wait_time = CONTEXT(sleepThreads).time - time;
408
409             tv.tv_sec = wait_time / 1000;
410             tv.tv_usec = (wait_time % 1000) * 1000;
411             timeout = &tv;
412         }
413         else
414             timeout = 0;
415     }
416     else
417     {
418         tv.tv_sec = 0;
419         tv.tv_usec = 0;
420         timeout = &tv;
421     }
422
423 #if defined(FD_COPY)
424     FD_COPY(&readsPending, &rd);
425     FD_COPY(&writesPending, &wr);
426 #else
427     memcpy(&rd, &readsPending, sizeof(rd));
428     memcpy(&wr, &writesPending, sizeof(wr));
429 #endif
430
431     r = select(maxFd+1, &rd, &wr, 0, timeout);
432
433     /* We must be holding off interrupts before we start playing with
434      * the read and write queues.  This should be already done but a
435      * quick check never hurt anyone.
436      */
437     assert(blockInts > 0);
438
439     DBG( printf("Select returns %d\n", r); )
440
441     /* Some threads may have finished sleeping.
442      */
443     if (block && sleepThreads != 0)
444     {
445         time = currentTime();
446         while (sleepThreads != 0 && time >= CONTEXT(sleepThreads).time)
447         {
448             tid = sleepThreads;
449             sleepThreads = sleepThreads->vmThread->next;
450             tid->vmThread->next = 0;
451
452             iresumeThread(tid);
453         }
454     }
455
456     for (i = 0; r > 0 && i <= maxFd; i++)
457     {
458         if (readQ[i] != 0 && FD_ISSET(i, &rd))
459         {
460             for (tid = readQ[i]; tid != 0; tid = ntid)
461             {
462                 ntid = tid->vmThread->next;
463                 iresumeThread(tid);
464             }
465             readQ[i] = 0;
466             r--;
467         }
468         if (writeQ[i] != 0 && FD_ISSET(i, &wr))
469         {
470             for (tid = writeQ[i]; tid != 0; tid = ntid)
471             {
472                 ntid = tid->vmThread->next;
473                 iresumeThread(tid);
474             }
475             writeQ[i] = 0;
476             r--;
477         }
478     }
479 }
480
481 #endif