Added MonoString<->UTF-32 conversion helper functions.
[mono.git] / mono / metadata / mono-cq.c
1 /*
2  * mono-cq.c: concurrent queue
3  *
4  * Authors:
5  *   Gonzalo Paniagua Javier (gonzalo@novell.com)
6  *
7  * Copyright (c) 2011 Novell, Inc (http://www.novell.com)
8  * Copyright 2011 Xamarin Inc
9  */
10
11 #include <mono/metadata/object.h>
12 #include <mono/metadata/mono-cq.h>
13 #include <mono/metadata/mono-mlist.h>
14 #include <mono/utils/mono-memory-model.h>
15 #include <mono/utils/atomic.h>
16
17 #define CQ_DEBUG(...)
18 //#define CQ_DEBUG(...) g_message(__VA_ARGS__)
19
20 struct _MonoCQ {
21         MonoMList *head;
22         MonoMList *tail;
23         volatile gint32 count;
24 };
25
26 /* matches the System.MonoListItem object */
27 struct _MonoMList {
28         MonoObject object;
29         MonoMList *next;
30         MonoObject *data;
31 };
32
33 /* matches the System.MonoCQItem object */
34 struct _MonoCQItem {
35         MonoObject object;
36         MonoArray *array; // MonoObjects
37         MonoArray *array_state; // byte array
38         volatile gint32 first;
39         volatile gint32 last;
40 };
41
42 typedef struct _MonoCQItem MonoCQItem;
43 #define CQ_ARRAY_SIZE   64
44
45 static MonoVTable *monocq_item_vtable = NULL;
46
47 static MonoCQItem *
48 mono_cqitem_alloc (void)
49 {
50         MonoCQItem *queue;
51         MonoDomain *domain = mono_get_root_domain ();
52
53         if (!monocq_item_vtable) {
54                 MonoClass *klass = mono_class_from_name (mono_defaults.corlib, "System", "MonoCQItem");
55                 monocq_item_vtable = mono_class_vtable (domain, klass);
56                 g_assert (monocq_item_vtable);
57         }
58         queue = (MonoCQItem *) mono_object_new_fast (monocq_item_vtable);
59         MONO_OBJECT_SETREF (queue, array, mono_array_new (domain, mono_defaults.object_class, CQ_ARRAY_SIZE));
60         MONO_OBJECT_SETREF (queue, array_state, mono_array_new (domain, mono_defaults.byte_class, CQ_ARRAY_SIZE));
61         return queue;
62 }
63
64 MonoCQ *
65 mono_cq_create ()
66 {
67         MonoCQ *cq;
68
69         cq = g_new0 (MonoCQ, 1);
70         MONO_GC_REGISTER_ROOT_SINGLE (cq->head);
71         MONO_GC_REGISTER_ROOT_SINGLE (cq->tail);
72         cq->head = mono_mlist_alloc ((MonoObject *) mono_cqitem_alloc ());
73         cq->tail = cq->head;
74         CQ_DEBUG ("Created %p", cq);
75         return cq;
76 }
77
78 void
79 mono_cq_destroy (MonoCQ *cq)
80 {
81         CQ_DEBUG ("Destroy %p", cq);
82         if (!cq)
83                 return;
84
85         mono_gc_bzero_aligned (cq, sizeof (MonoCQ));
86         MONO_GC_UNREGISTER_ROOT (cq->tail);
87         MONO_GC_UNREGISTER_ROOT (cq->head);
88         g_free (cq);
89 }
90
91 gint32
92 mono_cq_count (MonoCQ *cq)
93 {
94         if (!cq)
95                 return 0;
96
97         CQ_DEBUG ("Count %d", cq->count);
98         return cq->count;
99 }
100
101 static void
102 mono_cq_add_node (MonoCQ *cq)
103 {
104         MonoMList *n;
105         MonoMList *prev_tail;
106
107         CQ_DEBUG ("Adding node");
108         n = mono_mlist_alloc ((MonoObject *) mono_cqitem_alloc ());
109         prev_tail = cq->tail;
110         MONO_OBJECT_SETREF (prev_tail, next, n);
111
112         /* prev_tail->next must be visible before the new tail is */
113         STORE_STORE_FENCE;
114
115         cq->tail = n;
116 }
117
118 static gboolean
119 mono_cqitem_try_enqueue (MonoCQ *cq, MonoObject *obj)
120 {
121         MonoCQItem *queue;
122         MonoMList *tail;
123         gint32 pos;
124
125         tail = cq->tail;
126         queue = (MonoCQItem *) tail->data;
127         do {
128                 pos = queue->last;
129                 if (pos >= CQ_ARRAY_SIZE) {
130                         CQ_DEBUG ("enqueue(): pos >= CQ_ARRAY_SIZE, %d >= %d", pos, CQ_ARRAY_SIZE);
131                         return FALSE;
132                 }
133
134                 if (InterlockedCompareExchange (&queue->last, pos + 1, pos) == pos) {
135                         mono_array_setref_fast (queue->array, pos, obj);
136                         STORE_STORE_FENCE;
137                         mono_array_set_fast (queue->array_state, char, pos, TRUE);
138                         if ((pos + 1) == CQ_ARRAY_SIZE) {
139                                 CQ_DEBUG ("enqueue(): pos + 1 == CQ_ARRAY_SIZE, %d. Adding node.", CQ_ARRAY_SIZE);
140                                 mono_cq_add_node (cq);
141                         }
142                         return TRUE;
143                 }
144         } while (TRUE);
145         g_assert_not_reached ();
146 }
147
148 void
149 mono_cq_enqueue (MonoCQ *cq, MonoObject *obj)
150 {
151         if (cq == NULL || obj == NULL)
152                 return;
153
154         do {
155                 if (mono_cqitem_try_enqueue (cq, obj)) {
156                         CQ_DEBUG ("Queued one");
157                         InterlockedIncrement (&cq->count);
158                         break;
159                 }
160                 SleepEx (0, FALSE);
161         } while (TRUE);
162 }
163
164 static void
165 mono_cq_remove_node (MonoCQ *cq)
166 {
167         MonoMList *old_head;
168
169         CQ_DEBUG ("Removing node");
170         old_head = cq->head;
171         /* Not needed now that array_state is GC memory
172         MonoCQItem *queue;
173         int i;
174         gboolean retry;
175         queue = (MonoCQItem *) old_head->data;
176         do {
177                 retry = FALSE;
178                 for (i = 0; i < CQ_ARRAY_SIZE; i++) {
179                         if (mono_array_get (queue->array_state, char, i) == TRUE) {
180                                 retry = TRUE;
181                                 break;
182                         }
183                 }
184                 if (retry)
185                         SleepEx (0, FALSE);
186         } while (retry);
187          */
188         while (old_head->next == NULL)
189                 SleepEx (0, FALSE);
190         cq->head = old_head->next;
191         
192         MONO_OBJECT_SETREF (old_head, next, NULL);
193         old_head = NULL;
194 }
195
196 static gboolean
197 mono_cqitem_try_dequeue (MonoCQ *cq, MonoObject **obj)
198 {
199         MonoCQItem *queue;
200         MonoMList *head;
201         gint32 pos;
202
203         head = cq->head;
204         queue = (MonoCQItem *) head->data;
205         do {
206                 pos = queue->first;
207                 if (pos >= queue->last || pos >= CQ_ARRAY_SIZE)
208                         return FALSE;
209
210                 if (InterlockedCompareExchange (&queue->first, pos + 1, pos) == pos) {
211                         while (mono_array_get (queue->array_state, char, pos) == FALSE) {
212                                 SleepEx (0, FALSE);
213                         }
214                         LOAD_LOAD_FENCE;
215                         *obj = mono_array_get (queue->array, MonoObject *, pos);
216
217                         /*
218                         Here don't need to fence since the only spot that reads it is the one above.
219                         Additionally, the first store is superfluous, so it can happen OOO with the second.
220                         */
221                         mono_array_set (queue->array, MonoObject *, pos, NULL);
222                         mono_array_set (queue->array_state, char, pos, FALSE);
223                         
224                         /*
225                         We should do a STORE_LOAD fence here to make sure subsequent loads see new state instead
226                         of the above stores. We can safely ignore this as the only issue of seeing a stale value
227                         is the thread yielding. Given how unfrequent this will be in practice, we better avoid the
228                         very expensive STORE_LOAD fence.
229                         */
230                         
231                         if ((pos + 1) == CQ_ARRAY_SIZE) {
232                                 mono_cq_remove_node (cq);
233                         }
234                         return TRUE;
235                 }
236         } while (TRUE);
237         g_assert_not_reached ();
238 }
239
240 gboolean
241 mono_cq_dequeue (MonoCQ *cq, MonoObject **result)
242 {
243         while (cq->count > 0) {
244                 if (mono_cqitem_try_dequeue (cq, result)) {
245                         CQ_DEBUG ("Dequeued one");
246                         InterlockedDecrement (&cq->count);
247                         return TRUE;
248                 }
249                 SleepEx (0, FALSE);
250         }
251         return FALSE;
252 }
253