Merge pull request #225 from mistoll/master
[mono.git] / mono / metadata / mono-cq.c
1 /*
2  * mono-cq.c: concurrent queue
3  *
4  * Authors:
5  *   Gonzalo Paniagua Javier (gonzalo@novell.com)
6  *
7  * Copyright (c) 2011 Novell, Inc (http://www.novell.com)
8  */
9
10 #include <mono/metadata/object.h>
11 #include <mono/metadata/mono-cq.h>
12 #include <mono/metadata/mono-mlist.h>
13 #include <mono/utils/mono-memory-model.h>
14
15 #define CQ_DEBUG(...)
16 //#define CQ_DEBUG(...) g_message(__VA_ARGS__)
17
18 struct _MonoCQ {
19         MonoMList *head;
20         MonoMList *tail;
21         volatile gint32 count;
22 };
23
24 /* matches the System.MonoListItem object */
25 struct _MonoMList {
26         MonoObject object;
27         MonoMList *next;
28         MonoObject *data;
29 };
30
31 /* matches the System.MonoCQItem object */
32 struct _MonoCQItem {
33         MonoObject object;
34         MonoArray *array; // MonoObjects
35         MonoArray *array_state; // byte array
36         volatile gint32 first;
37         volatile gint32 last;
38 };
39
40 typedef struct _MonoCQItem MonoCQItem;
41 #define CQ_ARRAY_SIZE   64
42
43 static MonoVTable *monocq_item_vtable = NULL;
44
45 static MonoCQItem *
46 mono_cqitem_alloc (void)
47 {
48         MonoCQItem *queue;
49         MonoDomain *domain = mono_get_root_domain ();
50
51         if (!monocq_item_vtable) {
52                 MonoClass *klass = mono_class_from_name (mono_defaults.corlib, "System", "MonoCQItem");
53                 monocq_item_vtable = mono_class_vtable (domain, klass);
54                 g_assert (monocq_item_vtable);
55         }
56         queue = (MonoCQItem *) mono_object_new_fast (monocq_item_vtable);
57         MONO_OBJECT_SETREF (queue, array, mono_array_new (domain, mono_defaults.object_class, CQ_ARRAY_SIZE));
58         MONO_OBJECT_SETREF (queue, array_state, mono_array_new (domain, mono_defaults.byte_class, CQ_ARRAY_SIZE));
59         return queue;
60 }
61
62 MonoCQ *
63 mono_cq_create ()
64 {
65         MonoCQ *cq;
66
67         cq = g_new0 (MonoCQ, 1);
68         MONO_GC_REGISTER_ROOT (cq->head);
69         MONO_GC_REGISTER_ROOT (cq->tail);
70         cq->head = mono_mlist_alloc ((MonoObject *) mono_cqitem_alloc ());
71         cq->tail = cq->head;
72         CQ_DEBUG ("Created %p", cq);
73         return cq;
74 }
75
76 void
77 mono_cq_destroy (MonoCQ *cq)
78 {
79         CQ_DEBUG ("Destroy %p", cq);
80         if (!cq)
81                 return;
82
83         mono_gc_bzero (cq, sizeof (MonoCQ));
84         MONO_GC_UNREGISTER_ROOT (cq->tail);
85         MONO_GC_UNREGISTER_ROOT (cq->head);
86         g_free (cq);
87 }
88
89 gint32
90 mono_cq_count (MonoCQ *cq)
91 {
92         if (!cq)
93                 return 0;
94
95         CQ_DEBUG ("Count %d", cq->count);
96         return cq->count;
97 }
98
99 static void
100 mono_cq_add_node (MonoCQ *cq)
101 {
102         MonoMList *n;
103         MonoMList *prev_tail;
104
105         CQ_DEBUG ("Adding node");
106         n = mono_mlist_alloc ((MonoObject *) mono_cqitem_alloc ());
107         prev_tail = cq->tail;
108         MONO_OBJECT_SETREF (prev_tail, next, n);
109         cq->tail = n;
110 }
111
112 static gboolean
113 mono_cqitem_try_enqueue (MonoCQ *cq, MonoObject *obj)
114 {
115         MonoCQItem *queue;
116         MonoMList *tail;
117         gint32 pos;
118
119         tail = cq->tail;
120         queue = (MonoCQItem *) tail->data;
121         do {
122                 pos = queue->last;
123                 if (pos >= CQ_ARRAY_SIZE) {
124                         CQ_DEBUG ("enqueue(): pos >= CQ_ARRAY_SIZE, %d >= %d", pos, CQ_ARRAY_SIZE);
125                         return FALSE;
126                 }
127
128                 if (InterlockedCompareExchange (&queue->last, pos + 1, pos) == pos) {
129                         mono_array_setref (queue->array, pos, obj);
130                         STORE_STORE_FENCE;
131                         mono_array_set (queue->array_state, char, pos, TRUE);
132                         if ((pos + 1) == CQ_ARRAY_SIZE) {
133                                 CQ_DEBUG ("enqueue(): pos + 1 == CQ_ARRAY_SIZE, %d. Adding node.", CQ_ARRAY_SIZE);
134                                 mono_cq_add_node (cq);
135                         }
136                         return TRUE;
137                 }
138         } while (TRUE);
139         g_assert_not_reached ();
140 }
141
142 void
143 mono_cq_enqueue (MonoCQ *cq, MonoObject *obj)
144 {
145         if (cq == NULL || obj == NULL)
146                 return;
147
148         do {
149                 if (mono_cqitem_try_enqueue (cq, obj)) {
150                         CQ_DEBUG ("Queued one");
151                         InterlockedIncrement (&cq->count);
152                         break;
153                 }
154                 SleepEx (0, FALSE);
155         } while (TRUE);
156 }
157
158 static void
159 mono_cq_remove_node (MonoCQ *cq)
160 {
161         MonoMList *old_head;
162
163         CQ_DEBUG ("Removing node");
164         old_head = cq->head;
165         /* Not needed now that array_state is GC memory
166         MonoCQItem *queue;
167         int i;
168         gboolean retry;
169         queue = (MonoCQItem *) old_head->data;
170         do {
171                 retry = FALSE;
172                 for (i = 0; i < CQ_ARRAY_SIZE; i++) {
173                         if (mono_array_get (queue->array_state, char, i) == TRUE) {
174                                 retry = TRUE;
175                                 break;
176                         }
177                 }
178                 if (retry)
179                         SleepEx (0, FALSE);
180         } while (retry);
181          */
182         while (old_head->next == NULL)
183                 SleepEx (0, FALSE);
184         cq->head = old_head->next;
185         old_head = NULL;
186 }
187
188 static gboolean
189 mono_cqitem_try_dequeue (MonoCQ *cq, MonoObject **obj)
190 {
191         MonoCQItem *queue;
192         MonoMList *head;
193         gint32 pos;
194
195         head = cq->head;
196         queue = (MonoCQItem *) head->data;
197         do {
198                 pos = queue->first;
199                 if (pos >= queue->last || pos >= CQ_ARRAY_SIZE)
200                         return FALSE;
201
202                 if (InterlockedCompareExchange (&queue->first, pos + 1, pos) == pos) {
203                         while (mono_array_get (queue->array_state, char, pos) == FALSE) {
204                                 SleepEx (0, FALSE);
205                         }
206                         LOAD_LOAD_FENCE;
207                         *obj = mono_array_get (queue->array, MonoObject *, pos);
208
209                         /*
210                         Here don't need to fence since the only spot that reads it is the one above.
211                         Additionally, the first store is superfluous, so it can happen OOO with the second.
212                         */
213                         mono_array_set (queue->array, MonoObject *, pos, NULL);
214                         mono_array_set (queue->array_state, char, pos, FALSE);
215                         
216                         /*
217                         We should do a STORE_LOAD fence here to make sure subsequent loads see new state instead
218                         of the above stores. We can safely ignore this as the only issue of seeing a stale value
219                         is the thread yielding. Given how unfrequent this will be in practice, we better avoid the
220                         very expensive STORE_LOAD fence.
221                         */
222                         
223                         if ((pos + 1) == CQ_ARRAY_SIZE) {
224                                 mono_cq_remove_node (cq);
225                         }
226                         return TRUE;
227                 }
228         } while (TRUE);
229         g_assert_not_reached ();
230 }
231
232 gboolean
233 mono_cq_dequeue (MonoCQ *cq, MonoObject **result)
234 {
235         while (cq->count > 0) {
236                 if (mono_cqitem_try_dequeue (cq, result)) {
237                         CQ_DEBUG ("Dequeued one");
238                         InterlockedDecrement (&cq->count);
239                         return TRUE;
240                 }
241                 SleepEx (0, FALSE);
242         }
243         return FALSE;
244 }
245