2001-12-23 Miguel de Icaza <miguel@ximian.com>
[mono.git] / mono / io-layer / wait.c
1 #include <config.h>
2 #include <glib.h>
3 #include <string.h>
4
5 #include "mono/io-layer/wapi.h"
6 #include "wait-private.h"
7 #include "timed-thread.h"
8 #include "handles-private.h"
9 #include "wapi-private.h"
10
11 #undef DEBUG
12
13 static pthread_once_t wait_once=PTHREAD_ONCE_INIT;
14
15 static GPtrArray *WaitQueue=NULL;
16
17 static pthread_t wait_monitor_thread_id;
18 static gboolean wait_monitor_thread_running=FALSE;
19 static pthread_mutex_t wait_monitor_mutex=PTHREAD_MUTEX_INITIALIZER;
20 static sem_t wait_monitor_sem;
21
22 static void launch_tidy(guint32 exitcode G_GNUC_UNUSED, gpointer user)
23 {
24         WaitQueueItem *item=(WaitQueueItem *)user;
25         
26 #ifdef DEBUG
27         g_message(G_GNUC_PRETTY_FUNCTION ": Informing monitor thread");
28 #endif
29
30         /* Update queue item */
31         pthread_mutex_lock(&item->mutex);
32         item->update++;
33         pthread_mutex_unlock(&item->mutex);
34         
35         /* Signal monitor */
36         sem_post(&wait_monitor_sem);
37 }
38
39 /* This function is called by the monitor thread to launch handle-type
40  * specific threads to block in particular ways.
41  *
42  * The item mutex is held by the monitor thread when this function is
43  * called.
44  */
45 static void launch_blocker_threads(WaitQueueItem *item)
46 {
47         int i, ret;
48         
49 #ifdef DEBUG
50         g_message(G_GNUC_PRETTY_FUNCTION ": Launching blocker threads");
51 #endif
52
53         for(i=0; i<WAPI_HANDLE_COUNT; i++) {
54                 if(item->handles[i]->len>0) {
55                         WapiHandle *handle;
56
57                         handle=g_ptr_array_index(item->handles[i], 0);
58                         g_assert(handle!=NULL);
59                         g_assert(handle->ops->wait_multiple!=NULL);
60                         
61 #ifdef DEBUG
62                         g_message("Handle type %d active", i);
63 #endif
64                         item->waited[i]=FALSE;
65                         
66                         ret=_wapi_timed_thread_create(
67                                 &item->thread[i], NULL,
68                                 handle->ops->wait_multiple, launch_tidy, item,
69                                 item);
70                         if(ret!=0) {
71                                 g_warning(G_GNUC_PRETTY_FUNCTION
72                                           ": Thread create error: %s",
73                                           strerror(ret));
74                                 return;
75                         }
76                 } else {
77                         /* Pretend to have already waited for the
78                          * thread; it makes life easier for the
79                          * monitor thread.
80                          */
81                         item->waited[i]=TRUE;
82                 }
83         }
84 }
85
86 static gboolean launch_threads_done(WaitQueueItem *item)
87 {
88         int i;
89         
90         for(i=0; i<WAPI_HANDLE_COUNT; i++) {
91                 if(item->waited[i]==FALSE) {
92                         return(FALSE);
93                 }
94         }
95
96         return(TRUE);
97 }
98
99 /* This is the main loop for the monitor thread.  It waits for a
100  * signal to check the wait queue, then takes any action necessary on
101  * any queue items that have indicated readiness.
102  */
103 static void *wait_monitor_thread(void *unused G_GNUC_UNUSED)
104 {
105         guint i;
106         
107         /* Signal that the monitor thread is ready */
108         wait_monitor_thread_running=TRUE;
109         
110         while(TRUE) {
111                 /* Use a semaphore rather than a cond so we dont miss
112                  * any signals
113                  */
114                 sem_wait(&wait_monitor_sem);
115                 
116 #ifdef DEBUG
117                 g_message(G_GNUC_PRETTY_FUNCTION
118                           ": Blocking thread doing stuff");
119 #endif
120                 
121                 /* We've been signalled, so scan the wait queue for
122                  * activity.
123                  */
124                 pthread_mutex_lock(&wait_monitor_mutex);
125                 for(i=0; i<WaitQueue->len; i++) {
126                         WaitQueueItem *item=g_ptr_array_index(WaitQueue, i);
127                         
128                         if(item->update > item->ack) {
129                                 /* Something changed */
130                                 pthread_mutex_lock(&item->mutex);
131                                 item->ack=item->update;
132                                 
133                                 switch(item->state) {
134                                 case WQ_NEW:
135                                         /* Launch a new thread for each type of
136                                          * handle to be waited for here.
137                                          */
138                                         launch_blocker_threads(item);
139                                         
140                                         item->state=WQ_WAITING;
141                                         break;
142                                         
143                                 case WQ_WAITING:
144                                         /* See if we have waited for
145                                          * the last blocker thread.
146                                          */
147                                         if(launch_threads_done(item)) {
148                                                 /* All handles have
149                                                  * been signalled, so
150                                                  * signal the waiting
151                                                  * thread.  Let the
152                                                  * waiting thread
153                                                  * remove this item
154                                                  * from the queue,
155                                                  * because it makes
156                                                  * the logic a lot
157                                                  * easier here.
158                                                  */
159                                                 item->state=WQ_SIGNALLED;
160                                                 sem_post(&item->wait_sem);
161                                         }
162                                         break;
163                                         
164                                 case WQ_SIGNALLED:
165                                         /* This shouldn't happen. Prod
166                                          * the blocking thread again
167                                          * just to make sure.
168                                          */
169                                         g_warning(G_GNUC_PRETTY_FUNCTION
170                                                   ": Prodding blocker again");
171                                         sem_post(&item->wait_sem);
172                                         break;
173                                 }
174                                 
175                                 pthread_mutex_unlock(&item->mutex);
176                         }
177                 }
178
179                 pthread_mutex_unlock(&wait_monitor_mutex);
180         }
181         
182         return(NULL);
183 }
184
185 static void wait_init(void)
186 {
187         int ret;
188         
189 #ifdef DEBUG
190         g_message(G_GNUC_PRETTY_FUNCTION ": Starting monitor thread");
191 #endif
192         
193         WaitQueue=g_ptr_array_new();
194         
195         sem_init(&wait_monitor_sem, 0, 0);
196         
197         /* Launch a thread which manages the wait queue, and deals
198          * with waiting for handles of various types.
199          */
200         ret=pthread_create(&wait_monitor_thread_id, NULL,
201                            wait_monitor_thread, NULL);
202         if(ret!=0) {
203                 g_warning(G_GNUC_PRETTY_FUNCTION
204                           ": Couldn't start handle monitor thread: %s",
205                           strerror(ret));
206         }
207         
208         /* Wait for the monitor thread to get going */
209         while(wait_monitor_thread_running==FALSE) {
210                 sched_yield();
211         }
212 }
213
214 static WaitQueueItem *wait_item_new(guint32 timeout, gboolean waitall)
215 {
216         WaitQueueItem *new;
217         int i;
218         
219         new=g_new0(WaitQueueItem, 1);
220         
221         pthread_mutex_init(&new->mutex, NULL);
222         sem_init(&new->wait_sem, 0, 0);
223
224         new->update=1;          /* As soon as this item is queued it
225                                  * will need attention.
226                                  */
227         new->state=WQ_NEW;
228         new->timeout=timeout;
229         new->waitall=waitall;
230         new->lowest_signal=MAXIMUM_WAIT_OBJECTS;
231         
232         for(i=0; i<WAPI_HANDLE_COUNT; i++) {
233                 new->handles[i]=g_ptr_array_new();
234                 new->waitindex[i]=g_array_new(FALSE, FALSE, sizeof(guint32));
235         }
236         
237         return(new);
238 }
239
240 /* Adds our queue item to the work queue, and blocks until the monitor
241  * thread thinks it's done the work.  Returns TRUE for done, FALSE for
242  * timed out.  Sets lowest to the index of the first signalled handle
243  * in the list.
244  */
245 static gboolean wait_for_item(WaitQueueItem *item, guint32 *lowest)
246 {
247         gboolean ret;
248         int i;
249         
250         /* Add the wait item to the monitor queue, and signal the
251          * monitor thread */
252         pthread_mutex_lock(&wait_monitor_mutex);
253         g_ptr_array_add(WaitQueue, item);
254         sem_post(&wait_monitor_sem);
255         pthread_mutex_unlock(&wait_monitor_mutex);
256         
257         /* Wait for the item to become ready */
258         sem_wait(&item->wait_sem);
259         
260         pthread_mutex_lock(&item->mutex);
261         
262         /* If waitall is TRUE, then the number signalled in each handle type
263          * must be the length of the handle type array for the wait to be
264          * successful.  Otherwise, any signalled handle is good enough
265          */
266         if(item->waitall==TRUE) {
267                 ret=TRUE;
268                 for(i=0; i<WAPI_HANDLE_COUNT; i++) {
269                         if(item->waitcount[i]!=item->handles[i]->len) {
270                                 ret=FALSE;
271                                 break;
272                         }
273                 }
274         } else {
275                 ret=FALSE;
276                 for(i=0; i<WAPI_HANDLE_COUNT; i++) {
277                         if(item->waitcount[i]>0) {
278                                 ret=TRUE;
279                                 break;
280                         }
281                 }
282         }
283
284         *lowest=item->lowest_signal;
285         
286         pthread_mutex_unlock(&item->mutex);
287
288         return(ret);
289 }
290
291 static gboolean wait_dequeue_item(WaitQueueItem *item)
292 {
293         gboolean ret;
294         
295         g_assert(WaitQueue!=NULL);
296         
297         pthread_mutex_lock(&wait_monitor_mutex);
298         ret=g_ptr_array_remove_fast(WaitQueue, item);
299         pthread_mutex_unlock(&wait_monitor_mutex);
300         
301         return(ret);
302 }
303
304 static void wait_item_destroy(WaitQueueItem *item)
305 {
306         int i;
307         
308         pthread_mutex_destroy(&item->mutex);
309         sem_destroy(&item->wait_sem);
310         
311         for(i=0; i<WAPI_HANDLE_COUNT; i++) {
312                 if(item->thread[i]!=NULL) {
313                         g_free(item->thread[i]);
314                 }
315                 g_ptr_array_free(item->handles[i], FALSE);
316                 g_array_free(item->waitindex[i], FALSE);
317         }
318 }
319
320
321 /**
322  * WaitForSingleObject:
323  * @handle: an object to wait for
324  * @timeout: the maximum time in milliseconds to wait for
325  *
326  * This function returns when either @handle is signalled, or @timeout
327  * ms elapses.  If @timeout is zero, the object's state is tested and
328  * the function returns immediately.  If @timeout is %INFINITE, the
329  * function waits forever.
330  *
331  * Return value: %WAIT_ABANDONED - @handle is a mutex that was not
332  * released by the owning thread when it exited.  Ownership of the
333  * mutex object is granted to the calling thread and the mutex is set
334  * to nonsignalled.  %WAIT_OBJECT_0 - The state of @handle is
335  * signalled.  %WAIT_TIMEOUT - The @timeout interval elapsed and
336  * @handle's state is still not signalled.  %WAIT_FAILED - an error
337  * occurred.
338  */
339 guint32 WaitForSingleObject(WapiHandle *handle, guint32 timeout)
340 {
341         gboolean wait;
342         
343         if(handle->ops->wait==NULL) {
344                 return(WAIT_FAILED);
345         }
346
347         wait=handle->ops->wait(handle, NULL, timeout);
348         if(wait==TRUE) {
349                 /* Object signalled before timeout expired */
350 #ifdef DEBUG
351                 g_message(G_GNUC_PRETTY_FUNCTION ": Object %p signalled",
352                           handle);
353 #endif
354                 return(WAIT_OBJECT_0);
355         } else {
356 #ifdef DEBUG
357                 g_message(G_GNUC_PRETTY_FUNCTION ": Object %p wait timed out",
358                           handle);
359 #endif
360                 return(WAIT_TIMEOUT);
361         }
362 }
363
364 /**
365  * WaitForMultipleObjects:
366  * @numobjects: The number of objects in @handles. The maximum allowed
367  * is %MAXIMUM_WAIT_OBJECTS.
368  * @handles: An array of object handles.  Duplicates are not allowed.
369  * @waitall: If %TRUE, this function waits until all of the handles
370  * are signalled.  If %FALSE, this function returns when any object is
371  * signalled.
372  * @timeout: The maximum time in milliseconds to wait for.
373  * 
374  * This function returns when either one or more of @handles is
375  * signalled, or @timeout ms elapses.  If @timeout is zero, the state
376  * of each item of @handles is tested and the function returns
377  * immediately.  If @timeout is %INFINITE, the function waits forever.
378  *
379  * Return value: %WAIT_OBJECT_0 to %WAIT_OBJECT_0 + @numobjects - 1 -
380  * if @waitall is %TRUE, indicates that all objects are signalled.  If
381  * @waitall is %FALSE, the return value minus %WAIT_OBJECT_0 indicates
382  * the first index into @handles of the objects that are signalled.
383  * %WAIT_ABANDONED_0 to %WAIT_ABANDONED_0 + @numobjects - 1 - if
384  * @waitall is %TRUE, indicates that all objects are signalled, and at
385  * least one object is an abandoned mutex object (See
386  * WaitForSingleObject() for a description of abandoned mutexes.)  If
387  * @waitall is %FALSE, the return value minus %WAIT_ABANDONED_0
388  * indicates the first index into @handles of an abandoned mutex.
389  * %WAIT_TIMEOUT - The @timeout interval elapsed and no objects in
390  * @handles are signalled.  %WAIT_FAILED - an error occurred.
391  */
392 guint32 WaitForMultipleObjects(guint32 numobjects, WapiHandle **handles,
393                                gboolean waitall, guint32 timeout)
394 {
395         WaitQueueItem *item;
396         GHashTable *dups;
397         gboolean duplicate=FALSE, bogustype=FALSE;
398         gboolean wait;
399         guint i;
400         guint32 lowest;
401         
402         pthread_once(&wait_once, wait_init);
403         
404         if(numobjects>MAXIMUM_WAIT_OBJECTS) {
405 #ifdef DEBUG
406                 g_message(G_GNUC_PRETTY_FUNCTION ": Too many handles: %d",
407                           numobjects);
408 #endif
409
410                 return(WAIT_FAILED);
411         }
412         
413         /* Check for duplicates */
414         dups=g_hash_table_new(g_direct_hash, g_direct_equal);
415         for(i=0; i<numobjects; i++) {
416                 gpointer exists=g_hash_table_lookup(dups, handles[i]);
417                 if(exists!=NULL) {
418 #ifdef DEBUG
419                         g_message(G_GNUC_PRETTY_FUNCTION
420                                   ": Handle %p duplicated", handles[i]);
421 #endif
422
423                         duplicate=TRUE;
424                         break;
425                 }
426
427                 if(handles[i]->ops->wait_multiple==NULL) {
428 #ifdef DEBUG
429                         g_message(G_GNUC_PRETTY_FUNCTION
430                                   ": Handle %p can't be waited for (type %d)",
431                                   handles[i], handles[i]->type);
432 #endif
433
434                         bogustype=TRUE;
435                 }
436
437                 g_hash_table_insert(dups, handles[i], handles[i]);
438         }
439         g_hash_table_destroy(dups);
440
441         if(duplicate==TRUE) {
442 #ifdef DEBUG
443                 g_message(G_GNUC_PRETTY_FUNCTION
444                           ": Returning due to duplicates");
445 #endif
446
447                 return(WAIT_FAILED);
448         }
449
450         if(bogustype==TRUE) {
451 #ifdef DEBUG
452                 g_message(G_GNUC_PRETTY_FUNCTION
453                           ": Returning due to bogus type");
454 #endif
455
456                 return(WAIT_FAILED);
457         }
458         
459         item=wait_item_new(timeout, waitall);
460
461         /* Sort the handles by type */
462         for(i=0; i<numobjects; i++) {
463                 g_ptr_array_add(item->handles[handles[i]->type], handles[i]);
464                 g_array_append_val(item->waitindex[handles[i]->type], i);
465         }
466         
467         wait=wait_for_item(item, &lowest);
468         wait_dequeue_item(item);
469         wait_item_destroy(item);
470
471         if(wait==FALSE) {
472                 /* Wait timed out */
473                 return(WAIT_TIMEOUT);
474         }
475
476         return(WAIT_OBJECT_0+lowest);
477 }