Fix mono-cq under sgen.
[mono.git] / mono / metadata / mono-cq.c
1 /*
2  * mono-cq.c: concurrent queue
3  *
4  * Authors:
5  *   Gonzalo Paniagua Javier (gonzalo@novell.com)
6  *
7  * Copyright (c) 2011 Novell, Inc (http://www.novell.com)
8  */
9
10 #include <mono/metadata/object.h>
11 #include <mono/metadata/mono-cq.h>
12 #include <mono/metadata/mono-mlist.h>
13
14 #define CQ_DEBUG(...)
15 //#define CQ_DEBUG(...) g_message(__VA_ARGS__)
16
17 struct _MonoCQ {
18         MonoMList *head;
19         MonoMList *tail;
20         volatile gint32 count;
21 };
22
23 /* matches the System.MonoListItem object */
24 struct _MonoMList {
25         MonoObject object;
26         MonoMList *next;
27         MonoObject *data;
28 };
29
30 /* matches the System.MonoCQItem object */
31 struct _MonoCQItem {
32         MonoObject object;
33         MonoArray *array; // MonoObjects
34         MonoArray *array_state; // byte array
35         volatile gint32 first;
36         volatile gint32 last;
37 };
38
39 typedef struct _MonoCQItem MonoCQItem;
40 #define CQ_ARRAY_SIZE   64
41
42 static MonoVTable *monocq_item_vtable = NULL;
43
44 static MonoCQItem *
45 mono_cqitem_alloc (void)
46 {
47         MonoCQItem *queue;
48         MonoDomain *domain = mono_get_root_domain ();
49
50         if (!monocq_item_vtable) {
51                 MonoClass *klass = mono_class_from_name (mono_defaults.corlib, "System", "MonoCQItem");
52                 monocq_item_vtable = mono_class_vtable (domain, klass);
53                 g_assert (monocq_item_vtable);
54         }
55         queue = (MonoCQItem *) mono_object_new_fast (monocq_item_vtable);
56         MONO_OBJECT_SETREF (queue, array, mono_array_new (domain, mono_defaults.object_class, CQ_ARRAY_SIZE));
57         MONO_OBJECT_SETREF (queue, array_state, mono_array_new (domain, mono_defaults.byte_class, CQ_ARRAY_SIZE));
58         return queue;
59 }
60
61 MonoCQ *
62 mono_cq_create ()
63 {
64         MonoCQ *cq;
65
66         cq = g_new0 (MonoCQ, 1);
67         MONO_GC_REGISTER_ROOT (cq->head);
68         MONO_GC_REGISTER_ROOT (cq->tail);
69         cq->head = mono_mlist_alloc ((MonoObject *) mono_cqitem_alloc ());
70         cq->tail = cq->head;
71         CQ_DEBUG ("Created %p", cq);
72         return cq;
73 }
74
75 void
76 mono_cq_destroy (MonoCQ *cq)
77 {
78         CQ_DEBUG ("Destroy %p", cq);
79         if (!cq)
80                 return;
81
82         mono_gc_bzero (cq, sizeof (MonoCQ));
83         MONO_GC_UNREGISTER_ROOT (cq->tail);
84         MONO_GC_UNREGISTER_ROOT (cq->head);
85         g_free (cq);
86 }
87
88 gint32
89 mono_cq_count (MonoCQ *cq)
90 {
91         if (!cq)
92                 return 0;
93
94         CQ_DEBUG ("Count %d", cq->count);
95         return cq->count;
96 }
97
98 static void
99 mono_cq_add_node (MonoCQ *cq)
100 {
101         MonoMList *n;
102         MonoMList *prev_tail;
103
104         CQ_DEBUG ("Adding node");
105         n = mono_mlist_alloc ((MonoObject *) mono_cqitem_alloc ());
106         prev_tail = cq->tail;
107         MONO_OBJECT_SETREF (prev_tail, next, n);
108         cq->tail = n;
109 }
110
111 static gboolean
112 mono_cqitem_try_enqueue (MonoCQ *cq, MonoObject *obj)
113 {
114         MonoCQItem *queue;
115         MonoMList *tail;
116         gint32 pos;
117
118         tail = cq->tail;
119         queue = (MonoCQItem *) tail->data;
120         do {
121                 pos = queue->last;
122                 if (pos >= CQ_ARRAY_SIZE) {
123                         CQ_DEBUG ("enqueue(): pos >= CQ_ARRAY_SIZE, %d >= %d", pos, CQ_ARRAY_SIZE);
124                         return FALSE;
125                 }
126
127                 if (InterlockedCompareExchange (&queue->last, pos + 1, pos) == pos) {
128                         mono_array_setref (queue->array, pos, obj);
129                         mono_array_set (queue->array_state, char, pos, TRUE);
130                         if ((pos + 1) == CQ_ARRAY_SIZE) {
131                                 CQ_DEBUG ("enqueue(): pos + 1 == CQ_ARRAY_SIZE, %d. Adding node.", CQ_ARRAY_SIZE);
132                                 mono_cq_add_node (cq);
133                         }
134                         return TRUE;
135                 }
136         } while (TRUE);
137         g_assert_not_reached ();
138 }
139
140 void
141 mono_cq_enqueue (MonoCQ *cq, MonoObject *obj)
142 {
143         if (cq == NULL || obj == NULL)
144                 return;
145
146         do {
147                 if (mono_cqitem_try_enqueue (cq, obj)) {
148                         CQ_DEBUG ("Queued one");
149                         InterlockedIncrement (&cq->count);
150                         break;
151                 }
152                 SleepEx (0, FALSE);
153         } while (TRUE);
154 }
155
156 static void
157 mono_cq_remove_node (MonoCQ *cq)
158 {
159         MonoMList *old_head;
160
161         CQ_DEBUG ("Removing node");
162         old_head = cq->head;
163         /* Not needed now that array_state is GC memory
164         MonoCQItem *queue;
165         int i;
166         gboolean retry;
167         queue = (MonoCQItem *) old_head->data;
168         do {
169                 retry = FALSE;
170                 for (i = 0; i < CQ_ARRAY_SIZE; i++) {
171                         if (mono_array_get (queue->array_state, char, i) == TRUE) {
172                                 retry = TRUE;
173                                 break;
174                         }
175                 }
176                 if (retry)
177                         SleepEx (0, FALSE);
178         } while (retry);
179          */
180         while (old_head->next == NULL)
181                 SleepEx (0, FALSE);
182         cq->head = old_head->next;
183         old_head = NULL;
184 }
185
186 static gboolean
187 mono_cqitem_try_dequeue (MonoCQ *cq, MonoObject **obj)
188 {
189         MonoCQItem *queue;
190         MonoMList *head;
191         gint32 pos;
192
193         head = cq->head;
194         queue = (MonoCQItem *) head->data;
195         do {
196                 pos = queue->first;
197                 if (pos >= queue->last || pos >= CQ_ARRAY_SIZE)
198                         return FALSE;
199
200                 if (InterlockedCompareExchange (&queue->first, pos + 1, pos) == pos) {
201                         while (mono_array_get (queue->array_state, char, pos) == FALSE) {
202                                 SleepEx (0, FALSE);
203                         }
204                         *obj = mono_array_get (queue->array, MonoObject *, pos);
205                         mono_array_set (queue->array, MonoObject *, pos, NULL);
206                         mono_array_set (queue->array_state, char, pos, FALSE);
207                         if ((pos + 1) == CQ_ARRAY_SIZE) {
208                                 mono_cq_remove_node (cq);
209                         }
210                         return TRUE;
211                 }
212         } while (TRUE);
213         g_assert_not_reached ();
214 }
215
216 gboolean
217 mono_cq_dequeue (MonoCQ *cq, MonoObject **result)
218 {
219         while (cq->count > 0) {
220                 if (mono_cqitem_try_dequeue (cq, result)) {
221                         CQ_DEBUG ("Dequeued one");
222                         InterlockedDecrement (&cq->count);
223                         return TRUE;
224                 }
225                 SleepEx (0, FALSE);
226         }
227         return FALSE;
228 }
229