2001-11-21 Dick Porter <dick@ximian.com>
[mono.git] / mono / io-layer / wait.c
1 #include <config.h>
2 #include <glib.h>
3 #include <string.h>
4
5 #include "mono/io-layer/wapi.h"
6 #include "wait-private.h"
7 #include "timed-thread.h"
8 #include "handles-private.h"
9 #include "wapi-private.h"
10
11 #define DEBUG
12
13 static pthread_once_t wait_once=PTHREAD_ONCE_INIT;
14
15 static GPtrArray *WaitQueue=NULL;
16
17 static pthread_t wait_monitor_thread_id;
18 static gboolean wait_monitor_thread_running=FALSE;
19 static pthread_mutex_t wait_monitor_mutex=PTHREAD_MUTEX_INITIALIZER;
20 static sem_t wait_monitor_sem;
21
22 static void launch_tidy(guint32 exitcode G_GNUC_UNUSED, gpointer user)
23 {
24         WaitQueueItem *item=(WaitQueueItem *)user;
25         
26 #ifdef DEBUG
27         g_message(G_GNUC_PRETTY_FUNCTION ": Informing monitor thread");
28 #endif
29
30         /* Update queue item */
31         pthread_mutex_lock(&item->mutex);
32         item->update++;
33         pthread_mutex_unlock(&item->mutex);
34         
35         /* Signal monitor */
36         sem_post(&wait_monitor_sem);
37 }
38
39 /* This function is called by the monitor thread to launch handle-type
40  * specific threads to block in particular ways.
41  *
42  * The item mutex is held by the monitor thread when this function is
43  * called.
44  */
45 static void launch_blocker_threads(WaitQueueItem *item)
46 {
47         int i, ret;
48         
49 #ifdef DEBUG
50         g_message(G_GNUC_PRETTY_FUNCTION ": Launching blocker threads");
51 #endif
52
53         for(i=0; i<WAPI_HANDLE_COUNT; i++) {
54                 if(item->handles[i]->len>0) {
55                         WapiHandle *handle;
56
57                         handle=g_ptr_array_index(item->handles[i], 0);
58                         g_assert(handle!=NULL);
59                         g_assert(handle->ops->wait_multiple!=NULL);
60                         
61 #ifdef DEBUG
62                         g_message("Handle type %d active", i);
63 #endif
64                         item->waited[i]=FALSE;
65                         
66                         ret=_wapi_timed_thread_create(
67                                 &item->thread[i], NULL,
68                                 handle->ops->wait_multiple, launch_tidy, item,
69                                 item);
70                         if(ret!=0) {
71                                 g_warning(G_GNUC_PRETTY_FUNCTION
72                                           ": Thread create error: %s",
73                                           strerror(ret));
74                                 return;
75                         }
76                 } else {
77                         /* Pretend to have already waited for the
78                          * thread; it makes life easier for the
79                          * monitor thread.
80                          */
81                         item->waited[i]=TRUE;
82                 }
83         }
84 }
85
86 static gboolean launch_threads_done(WaitQueueItem *item)
87 {
88         int i;
89         
90         for(i=0; i<WAPI_HANDLE_COUNT; i++) {
91                 if(item->waited[i]==FALSE) {
92                         return(FALSE);
93                 }
94         }
95
96         return(TRUE);
97 }
98
99 /* This is the main loop for the monitor thread.  It waits for a
100  * signal to check the wait queue, then takes any action necessary on
101  * any queue items that have indicated readiness.
102  */
103 static void *wait_monitor_thread(void *unused G_GNUC_UNUSED)
104 {
105         guint i;
106         
107         /* Signal that the monitor thread is ready */
108         wait_monitor_thread_running=TRUE;
109         
110         while(TRUE) {
111                 /* Use a semaphore rather than a cond so we dont miss
112                  * any signals
113                  */
114                 sem_wait(&wait_monitor_sem);
115                 
116 #ifdef DEBUG
117                 g_message(G_GNUC_PRETTY_FUNCTION
118                           ": Blocking thread doing stuff");
119 #endif
120                 
121                 /* We've been signalled, so scan the wait queue for
122                  * activity.
123                  */
124                 pthread_mutex_lock(&wait_monitor_mutex);
125                 for(i=0; i<WaitQueue->len; i++) {
126                         WaitQueueItem *item=g_ptr_array_index(WaitQueue, i);
127                         
128                         if(item->update > item->ack) {
129                                 /* Something changed */
130                                 pthread_mutex_lock(&item->mutex);
131                                 item->ack=item->update;
132                                 
133                                 switch(item->state) {
134                                 case WQ_NEW:
135                                         /* Launch a new thread for each type of
136                                          * handle to be waited for here.
137                                          */
138                                         launch_blocker_threads(item);
139                                         
140                                         item->state=WQ_WAITING;
141                                         break;
142                                         
143                                 case WQ_WAITING:
144                                         /* See if we have waited for
145                                          * the last blocker thread.
146                                          */
147                                         if(launch_threads_done(item)) {
148                                                 /* All handles have
149                                                  * been signalled, so
150                                                  * signal the waiting
151                                                  * thread.  Let the
152                                                  * waiting thread
153                                                  * remove this item
154                                                  * from the queue,
155                                                  * because it makes
156                                                  * the logic a lot
157                                                  * easier here.
158                                                  */
159                                                 item->state=WQ_SIGNALLED;
160                                                 sem_post(&item->wait_sem);
161                                         }
162                                         break;
163                                         
164                                 case WQ_SIGNALLED:
165                                         /* This shouldn't happen. Prod
166                                          * the blocking thread again
167                                          * just to make sure.
168                                          */
169                                         g_warning(G_GNUC_PRETTY_FUNCTION
170                                                   ": Prodding blocker again");
171                                         sem_post(&item->wait_sem);
172                                         break;
173                                 }
174                                 
175                                 pthread_mutex_unlock(&item->mutex);
176                         }
177                 }
178
179                 pthread_mutex_unlock(&wait_monitor_mutex);
180         }
181         
182         return(NULL);
183 }
184
185 static void wait_init(void)
186 {
187         int ret;
188         
189 #ifdef DEBUG
190         g_message(G_GNUC_PRETTY_FUNCTION ": Starting monitor thread");
191 #endif
192         
193         WaitQueue=g_ptr_array_new();
194         
195         sem_init(&wait_monitor_sem, 0, 0);
196         
197         /* Launch a thread which manages the wait queue, and deals
198          * with waiting for handles of various types.
199          */
200         ret=pthread_create(&wait_monitor_thread_id, NULL,
201                            wait_monitor_thread, NULL);
202         if(ret!=0) {
203                 g_warning(G_GNUC_PRETTY_FUNCTION
204                           ": Couldn't start handle monitor thread: %s",
205                           strerror(ret));
206         }
207         
208         /* Wait for the monitor thread to get going */
209         while(wait_monitor_thread_running==FALSE) {
210                 pthread_yield();
211         }
212 }
213
214 static WaitQueueItem *wait_item_new(guint32 timeout, gboolean waitall)
215 {
216         WaitQueueItem *new;
217         int i;
218         
219         new=g_new0(WaitQueueItem, 1);
220         
221         pthread_mutex_init(&new->mutex, NULL);
222         sem_init(&new->wait_sem, 0, 0);
223
224         new->update=1;          /* As soon as this item is queued it
225                                  * will need attention.
226                                  */
227         new->state=WQ_NEW;
228         new->timeout=timeout;
229         new->waitall=waitall;
230         new->lowest_signal=MAXIMUM_WAIT_OBJECTS;
231         
232         for(i=0; i<WAPI_HANDLE_COUNT; i++) {
233                 new->handles[i]=g_ptr_array_new();
234                 new->waitindex[i]=g_array_new(FALSE, FALSE, sizeof(guint32));
235         }
236         
237         return(new);
238 }
239
240 /* Adds our queue item to the work queue, and blocks until the monitor
241  * thread thinks it's done the work.  Returns TRUE for done, FALSE for
242  * timed out.  Sets lowest to the index of the first signalled handle
243  * in the list.
244  */
245 static gboolean wait_for_item(WaitQueueItem *item, guint32 *lowest)
246 {
247         gboolean ret;
248         int i;
249         
250         /* Add the wait item to the monitor queue, and signal the
251          * monitor thread */
252         pthread_mutex_lock(&wait_monitor_mutex);
253         g_ptr_array_add(WaitQueue, item);
254         sem_post(&wait_monitor_sem);
255         pthread_mutex_unlock(&wait_monitor_mutex);
256         
257         /* Wait for the item to become ready */
258         sem_wait(&item->wait_sem);
259         
260         pthread_mutex_lock(&item->mutex);
261         
262         /* If waitall is TRUE, then the number signalled in each handle type
263          * must be the length of the handle type array for the wait to be
264          * successful.  Otherwise, any signalled handle is good enough
265          */
266         if(item->waitall==TRUE) {
267                 ret=TRUE;
268                 for(i=0; i<WAPI_HANDLE_COUNT; i++) {
269                         if(item->waitcount[i]!=item->handles[i]->len) {
270                                 ret=FALSE;
271                                 break;
272                         }
273                 }
274         } else {
275                 ret=FALSE;
276                 for(i=0; i<WAPI_HANDLE_COUNT; i++) {
277                         if(item->waitcount[i]>0) {
278                                 ret=TRUE;
279                                 break;
280                         }
281                 }
282         }
283
284         *lowest=item->lowest_signal;
285         
286         pthread_mutex_unlock(&item->mutex);
287
288         return(ret);
289 }
290
291 static gboolean wait_dequeue_item(WaitQueueItem *item)
292 {
293         gboolean ret;
294         
295         g_assert(WaitQueue!=NULL);
296         
297         pthread_mutex_lock(&wait_monitor_mutex);
298         ret=g_ptr_array_remove_fast(WaitQueue, item);
299         pthread_mutex_unlock(&wait_monitor_mutex);
300         
301         return(ret);
302 }
303
304 static void wait_item_destroy(WaitQueueItem *item)
305 {
306         int i;
307         
308         pthread_mutex_destroy(&item->mutex);
309         sem_destroy(&item->wait_sem);
310         
311         for(i=0; i<WAPI_HANDLE_COUNT; i++) {
312                 if(item->thread[i]!=NULL) {
313                         g_free(item->thread[i]);
314                 }
315                 g_ptr_array_free(item->handles[i], FALSE);
316                 g_array_free(item->waitindex[i], FALSE);
317         }
318 }
319
320
321 /**
322  * WaitForSingleObject:
323  * @handle: an object to wait for
324  * @timeout: the maximum time in milliseconds to wait for
325  *
326  * This function returns when either @handle is signalled, or @timeout
327  * ms elapses.  If @timeout is zero, the object's state is tested and
328  * the function returns immediately.  If @timeout is %INFINITE, the
329  * function waits forever.
330  *
331  * Return value: %WAIT_ABANDONED - @handle is a mutex that was not
332  * released by the owning thread when it exited.  Ownership of the
333  * mutex object is granted to the calling thread and the mutex is set
334  * to nonsignalled.  %WAIT_OBJECT_0 - The state of @handle is
335  * signalled.  %WAIT_TIMEOUT - The @timeout interval elapsed and
336  * @handle's state is still not signalled.  %WAIT_FAILED - an error
337  * occurred.
338  */
339 guint32 WaitForSingleObject(WapiHandle *handle, guint32 timeout)
340 {
341         gboolean wait;
342         
343         if(handle->ops->wait==NULL) {
344                 return(WAIT_FAILED);
345         }
346
347         if(timeout==0) {
348                 /* Just poll the object */
349 #ifdef DEBUG
350                 g_message(G_GNUC_PRETTY_FUNCTION ": Polling");
351 #endif
352
353                 if(handle->signalled==TRUE) {
354                         return(WAIT_OBJECT_0);
355                 } else {
356                         return(WAIT_TIMEOUT);
357                 }
358         }
359         
360         wait=handle->ops->wait(handle, timeout);
361         if(wait==TRUE) {
362                 /* Object signalled before timeout expired */
363 #ifdef DEBUG
364                 g_message(G_GNUC_PRETTY_FUNCTION ": Object %p signalled",
365                           handle);
366 #endif
367                 return(WAIT_OBJECT_0);
368         } else {
369 #ifdef DEBUG
370                 g_message(G_GNUC_PRETTY_FUNCTION ": Object %p wait timed out",
371                           handle);
372 #endif
373                 return(WAIT_TIMEOUT);
374         }
375 }
376
377 /**
378  * WaitForMultipleObjects:
379  * @numobjects: The number of objects in @handles. The maximum allowed
380  * is %MAXIMUM_WAIT_OBJECTS.
381  * @handles: An array of object handles.  Duplicates are not allowed.
382  * @waitall: If %TRUE, this function waits until all of the handles
383  * are signalled.  If %FALSE, this function returns when any object is
384  * signalled.
385  * @timeout: The maximum time in milliseconds to wait for.
386  * 
387  * This function returns when either one or more of @handles is
388  * signalled, or @timeout ms elapses.  If @timeout is zero, the state
389  * of each item of @handles is tested and the function returns
390  * immediately.  If @timeout is %INFINITE, the function waits forever.
391  *
392  * Return value: %WAIT_OBJECT_0 to %WAIT_OBJECT_0 + @numobjects - 1 -
393  * if @waitall is %TRUE, indicates that all objects are signalled.  If
394  * @waitall is %FALSE, the return value minus %WAIT_OBJECT_0 indicates
395  * the first index into @handles of the objects that are signalled.
396  * %WAIT_ABANDONED_0 to %WAIT_ABANDONED_0 + @numobjects - 1 - if
397  * @waitall is %TRUE, indicates that all objects are signalled, and at
398  * least one object is an abandoned mutex object (See
399  * WaitForSingleObject() for a description of abandoned mutexes.)  If
400  * @waitall is %FALSE, the return value minus %WAIT_ABANDONED_0
401  * indicates the first index into @handles of an abandoned mutex.
402  * %WAIT_TIMEOUT - The @timeout interval elapsed and no objects in
403  * @handles are signalled.  %WAIT_FAILED - an error occurred.
404  */
405 guint32 WaitForMultipleObjects(guint32 numobjects, WapiHandle **handles,
406                                gboolean waitall, guint32 timeout)
407 {
408         WaitQueueItem *item;
409         GHashTable *dups;
410         gboolean duplicate=FALSE, bogustype=FALSE;
411         gboolean wait;
412         guint i;
413         guint32 lowest;
414         
415         pthread_once(&wait_once, wait_init);
416         
417         if(numobjects>MAXIMUM_WAIT_OBJECTS) {
418 #ifdef DEBUG
419                 g_message(G_GNUC_PRETTY_FUNCTION ": Too many handles: %d",
420                           numobjects);
421 #endif
422
423                 return(WAIT_FAILED);
424         }
425         
426         /* Check for duplicates */
427         dups=g_hash_table_new(g_direct_hash, g_direct_equal);
428         for(i=0; i<numobjects; i++) {
429                 gpointer exists=g_hash_table_lookup(dups, handles[i]);
430                 if(exists!=NULL) {
431 #ifdef DEBUG
432                         g_message(G_GNUC_PRETTY_FUNCTION
433                                   ": Handle %p duplicated", handles[i]);
434 #endif
435
436                         duplicate=TRUE;
437                         break;
438                 }
439
440                 if(handles[i]->ops->wait_multiple==NULL) {
441 #ifdef DEBUG
442                         g_message(G_GNUC_PRETTY_FUNCTION
443                                   ": Handle %p can't be waited for (type %d)",
444                                   handles[i], handles[i]->type);
445 #endif
446
447                         bogustype=TRUE;
448                 }
449
450                 g_hash_table_insert(dups, handles[i], handles[i]);
451         }
452         g_hash_table_destroy(dups);
453
454         if(duplicate==TRUE) {
455 #ifdef DEBUG
456                 g_message(G_GNUC_PRETTY_FUNCTION
457                           ": Returning due to duplicates");
458 #endif
459
460                 return(WAIT_FAILED);
461         }
462
463         if(bogustype==TRUE) {
464 #ifdef DEBUG
465                 g_message(G_GNUC_PRETTY_FUNCTION
466                           ": Returning due to bogus type");
467 #endif
468
469                 return(WAIT_FAILED);
470         }
471         
472         item=wait_item_new(timeout, waitall);
473
474         /* Sort the handles by type */
475         for(i=0; i<numobjects; i++) {
476                 g_ptr_array_add(item->handles[handles[i]->type], handles[i]);
477                 g_array_append_val(item->waitindex[handles[i]->type], i);
478         }
479         
480         wait=wait_for_item(item, &lowest);
481         wait_dequeue_item(item);
482         wait_item_destroy(item);
483
484         if(wait==FALSE) {
485                 /* Wait timed out */
486                 return(WAIT_TIMEOUT);
487         }
488
489         return(WAIT_OBJECT_0+lowest);
490 }