Merge pull request #901 from Blewzman/FixAggregateExceptionGetBaseException
[mono.git] / mono / metadata / mono-cq.c
1 /*
2  * mono-cq.c: concurrent queue
3  *
4  * Authors:
5  *   Gonzalo Paniagua Javier (gonzalo@novell.com)
6  *
7  * Copyright (c) 2011 Novell, Inc (http://www.novell.com)
8  * Copyright 2011 Xamarin Inc
9  */
10
11 #include <mono/metadata/object.h>
12 #include <mono/metadata/mono-cq.h>
13 #include <mono/metadata/mono-mlist.h>
14 #include <mono/utils/mono-memory-model.h>
15 #include <mono/utils/atomic.h>
16
17 #define CQ_DEBUG(...)
18 //#define CQ_DEBUG(...) g_message(__VA_ARGS__)
19
20 struct _MonoCQ {
21         MonoMList *head;
22         MonoMList *tail;
23         volatile gint32 count;
24 };
25
26 /* matches the System.MonoListItem object */
27 struct _MonoMList {
28         MonoObject object;
29         MonoMList *next;
30         MonoObject *data;
31 };
32
33 /* matches the System.MonoCQItem object */
34 struct _MonoCQItem {
35         MonoObject object;
36         MonoArray *array; // MonoObjects
37         MonoArray *array_state; // byte array
38         volatile gint32 first;
39         volatile gint32 last;
40 };
41
42 typedef struct _MonoCQItem MonoCQItem;
43 #define CQ_ARRAY_SIZE   64
44
45 static MonoVTable *monocq_item_vtable = NULL;
46
47 static MonoCQItem *
48 mono_cqitem_alloc (void)
49 {
50         MonoCQItem *queue;
51         MonoDomain *domain = mono_get_root_domain ();
52
53         if (!monocq_item_vtable) {
54                 MonoClass *klass = mono_class_from_name (mono_defaults.corlib, "System", "MonoCQItem");
55                 monocq_item_vtable = mono_class_vtable (domain, klass);
56                 g_assert (monocq_item_vtable);
57         }
58         queue = (MonoCQItem *) mono_object_new_fast (monocq_item_vtable);
59         MONO_OBJECT_SETREF (queue, array, mono_array_new (domain, mono_defaults.object_class, CQ_ARRAY_SIZE));
60         MONO_OBJECT_SETREF (queue, array_state, mono_array_new (domain, mono_defaults.byte_class, CQ_ARRAY_SIZE));
61         return queue;
62 }
63
64 MonoCQ *
65 mono_cq_create ()
66 {
67         MonoCQ *cq;
68
69         cq = g_new0 (MonoCQ, 1);
70         MONO_GC_REGISTER_ROOT_SINGLE (cq->head);
71         MONO_GC_REGISTER_ROOT_SINGLE (cq->tail);
72         cq->head = mono_mlist_alloc ((MonoObject *) mono_cqitem_alloc ());
73         cq->tail = cq->head;
74         CQ_DEBUG ("Created %p", cq);
75         return cq;
76 }
77
78 void
79 mono_cq_destroy (MonoCQ *cq)
80 {
81         CQ_DEBUG ("Destroy %p", cq);
82         if (!cq)
83                 return;
84
85         mono_gc_bzero_aligned (cq, sizeof (MonoCQ));
86         MONO_GC_UNREGISTER_ROOT (cq->tail);
87         MONO_GC_UNREGISTER_ROOT (cq->head);
88         g_free (cq);
89 }
90
91 gint32
92 mono_cq_count (MonoCQ *cq)
93 {
94         if (!cq)
95                 return 0;
96
97         CQ_DEBUG ("Count %d", cq->count);
98         return cq->count;
99 }
100
101 static void
102 mono_cq_add_node (MonoCQ *cq)
103 {
104         MonoMList *n;
105         MonoMList *prev_tail;
106
107         CQ_DEBUG ("Adding node");
108         n = mono_mlist_alloc ((MonoObject *) mono_cqitem_alloc ());
109         prev_tail = cq->tail;
110         MONO_OBJECT_SETREF (prev_tail, next, n);
111
112         /* prev_tail->next must be visible before the new tail is */
113         STORE_STORE_FENCE;
114
115         cq->tail = n;
116 }
117
118 static gboolean
119 mono_cqitem_try_enqueue (MonoCQ *cq, MonoObject *obj)
120 {
121         MonoCQItem *queue;
122         MonoMList *tail;
123         gint32 pos;
124
125         tail = cq->tail;
126         queue = (MonoCQItem *) tail->data;
127         do {
128                 pos = queue->last;
129                 if (pos >= CQ_ARRAY_SIZE) {
130                         CQ_DEBUG ("enqueue(): pos >= CQ_ARRAY_SIZE, %d >= %d", pos, CQ_ARRAY_SIZE);
131                         return FALSE;
132                 }
133
134                 if (InterlockedCompareExchange (&queue->last, pos + 1, pos) == pos) {
135                         mono_array_setref_fast (queue->array, pos, obj);
136                         STORE_STORE_FENCE;
137                         mono_array_set_fast (queue->array_state, char, pos, TRUE);
138                         if ((pos + 1) == CQ_ARRAY_SIZE) {
139                                 CQ_DEBUG ("enqueue(): pos + 1 == CQ_ARRAY_SIZE, %d. Adding node.", CQ_ARRAY_SIZE);
140                                 mono_cq_add_node (cq);
141                         }
142                         return TRUE;
143                 }
144         } while (TRUE);
145         g_assert_not_reached ();
146 }
147
148 void
149 mono_cq_enqueue (MonoCQ *cq, MonoObject *obj)
150 {
151         if (cq == NULL || obj == NULL)
152                 return;
153
154         do {
155                 if (mono_cqitem_try_enqueue (cq, obj)) {
156                         CQ_DEBUG ("Queued one");
157                         InterlockedIncrement (&cq->count);
158                         break;
159                 }
160                 SleepEx (0, FALSE);
161         } while (TRUE);
162 }
163
164 static void
165 mono_cq_remove_node (MonoCQ *cq)
166 {
167         MonoMList *old_head;
168
169         CQ_DEBUG ("Removing node");
170         old_head = cq->head;
171         /* Not needed now that array_state is GC memory
172         MonoCQItem *queue;
173         int i;
174         gboolean retry;
175         queue = (MonoCQItem *) old_head->data;
176         do {
177                 retry = FALSE;
178                 for (i = 0; i < CQ_ARRAY_SIZE; i++) {
179                         if (mono_array_get (queue->array_state, char, i) == TRUE) {
180                                 retry = TRUE;
181                                 break;
182                         }
183                 }
184                 if (retry)
185                         SleepEx (0, FALSE);
186         } while (retry);
187          */
188         while (old_head->next == NULL)
189                 SleepEx (0, FALSE);
190         cq->head = old_head->next;
191         old_head = NULL;
192 }
193
194 static gboolean
195 mono_cqitem_try_dequeue (MonoCQ *cq, MonoObject **obj)
196 {
197         MonoCQItem *queue;
198         MonoMList *head;
199         gint32 pos;
200
201         head = cq->head;
202         queue = (MonoCQItem *) head->data;
203         do {
204                 pos = queue->first;
205                 if (pos >= queue->last || pos >= CQ_ARRAY_SIZE)
206                         return FALSE;
207
208                 if (InterlockedCompareExchange (&queue->first, pos + 1, pos) == pos) {
209                         while (mono_array_get (queue->array_state, char, pos) == FALSE) {
210                                 SleepEx (0, FALSE);
211                         }
212                         LOAD_LOAD_FENCE;
213                         *obj = mono_array_get (queue->array, MonoObject *, pos);
214
215                         /*
216                         Here don't need to fence since the only spot that reads it is the one above.
217                         Additionally, the first store is superfluous, so it can happen OOO with the second.
218                         */
219                         mono_array_set (queue->array, MonoObject *, pos, NULL);
220                         mono_array_set (queue->array_state, char, pos, FALSE);
221                         
222                         /*
223                         We should do a STORE_LOAD fence here to make sure subsequent loads see new state instead
224                         of the above stores. We can safely ignore this as the only issue of seeing a stale value
225                         is the thread yielding. Given how unfrequent this will be in practice, we better avoid the
226                         very expensive STORE_LOAD fence.
227                         */
228                         
229                         if ((pos + 1) == CQ_ARRAY_SIZE) {
230                                 mono_cq_remove_node (cq);
231                         }
232                         return TRUE;
233                 }
234         } while (TRUE);
235         g_assert_not_reached ();
236 }
237
238 gboolean
239 mono_cq_dequeue (MonoCQ *cq, MonoObject **result)
240 {
241         while (cq->count > 0) {
242                 if (mono_cqitem_try_dequeue (cq, result)) {
243                         CQ_DEBUG ("Dequeued one");
244                         InterlockedDecrement (&cq->count);
245                         return TRUE;
246                 }
247                 SleepEx (0, FALSE);
248         }
249         return FALSE;
250 }
251