Merge pull request #347 from JamesB7/master
[mono.git] / mono / metadata / mono-cq.c
1 /*
2  * mono-cq.c: concurrent queue
3  *
4  * Authors:
5  *   Gonzalo Paniagua Javier (gonzalo@novell.com)
6  *
7  * Copyright (c) 2011 Novell, Inc (http://www.novell.com)
8  * Copyright 2011 Xamarin Inc
9  */
10
11 #include <mono/metadata/object.h>
12 #include <mono/metadata/mono-cq.h>
13 #include <mono/metadata/mono-mlist.h>
14 #include <mono/utils/mono-memory-model.h>
15
16 #define CQ_DEBUG(...)
17 //#define CQ_DEBUG(...) g_message(__VA_ARGS__)
18
19 struct _MonoCQ {
20         MonoMList *head;
21         MonoMList *tail;
22         volatile gint32 count;
23 };
24
25 /* matches the System.MonoListItem object */
26 struct _MonoMList {
27         MonoObject object;
28         MonoMList *next;
29         MonoObject *data;
30 };
31
32 /* matches the System.MonoCQItem object */
33 struct _MonoCQItem {
34         MonoObject object;
35         MonoArray *array; // MonoObjects
36         MonoArray *array_state; // byte array
37         volatile gint32 first;
38         volatile gint32 last;
39 };
40
41 typedef struct _MonoCQItem MonoCQItem;
42 #define CQ_ARRAY_SIZE   64
43
44 static MonoVTable *monocq_item_vtable = NULL;
45
46 static MonoCQItem *
47 mono_cqitem_alloc (void)
48 {
49         MonoCQItem *queue;
50         MonoDomain *domain = mono_get_root_domain ();
51
52         if (!monocq_item_vtable) {
53                 MonoClass *klass = mono_class_from_name (mono_defaults.corlib, "System", "MonoCQItem");
54                 monocq_item_vtable = mono_class_vtable (domain, klass);
55                 g_assert (monocq_item_vtable);
56         }
57         queue = (MonoCQItem *) mono_object_new_fast (monocq_item_vtable);
58         MONO_OBJECT_SETREF (queue, array, mono_array_new (domain, mono_defaults.object_class, CQ_ARRAY_SIZE));
59         MONO_OBJECT_SETREF (queue, array_state, mono_array_new (domain, mono_defaults.byte_class, CQ_ARRAY_SIZE));
60         return queue;
61 }
62
63 MonoCQ *
64 mono_cq_create ()
65 {
66         MonoCQ *cq;
67
68         cq = g_new0 (MonoCQ, 1);
69         MONO_GC_REGISTER_ROOT_SINGLE (cq->head);
70         MONO_GC_REGISTER_ROOT_SINGLE (cq->tail);
71         cq->head = mono_mlist_alloc ((MonoObject *) mono_cqitem_alloc ());
72         cq->tail = cq->head;
73         CQ_DEBUG ("Created %p", cq);
74         return cq;
75 }
76
77 void
78 mono_cq_destroy (MonoCQ *cq)
79 {
80         CQ_DEBUG ("Destroy %p", cq);
81         if (!cq)
82                 return;
83
84         mono_gc_bzero (cq, sizeof (MonoCQ));
85         MONO_GC_UNREGISTER_ROOT (cq->tail);
86         MONO_GC_UNREGISTER_ROOT (cq->head);
87         g_free (cq);
88 }
89
90 gint32
91 mono_cq_count (MonoCQ *cq)
92 {
93         if (!cq)
94                 return 0;
95
96         CQ_DEBUG ("Count %d", cq->count);
97         return cq->count;
98 }
99
100 static void
101 mono_cq_add_node (MonoCQ *cq)
102 {
103         MonoMList *n;
104         MonoMList *prev_tail;
105
106         CQ_DEBUG ("Adding node");
107         n = mono_mlist_alloc ((MonoObject *) mono_cqitem_alloc ());
108         prev_tail = cq->tail;
109         MONO_OBJECT_SETREF (prev_tail, next, n);
110
111         /* prev_tail->next must be visible before the new tail is */
112         STORE_STORE_FENCE;
113
114         cq->tail = n;
115 }
116
117 static gboolean
118 mono_cqitem_try_enqueue (MonoCQ *cq, MonoObject *obj)
119 {
120         MonoCQItem *queue;
121         MonoMList *tail;
122         gint32 pos;
123
124         tail = cq->tail;
125         queue = (MonoCQItem *) tail->data;
126         do {
127                 pos = queue->last;
128                 if (pos >= CQ_ARRAY_SIZE) {
129                         CQ_DEBUG ("enqueue(): pos >= CQ_ARRAY_SIZE, %d >= %d", pos, CQ_ARRAY_SIZE);
130                         return FALSE;
131                 }
132
133                 if (InterlockedCompareExchange (&queue->last, pos + 1, pos) == pos) {
134                         mono_array_setref (queue->array, pos, obj);
135                         STORE_STORE_FENCE;
136                         mono_array_set (queue->array_state, char, pos, TRUE);
137                         if ((pos + 1) == CQ_ARRAY_SIZE) {
138                                 CQ_DEBUG ("enqueue(): pos + 1 == CQ_ARRAY_SIZE, %d. Adding node.", CQ_ARRAY_SIZE);
139                                 mono_cq_add_node (cq);
140                         }
141                         return TRUE;
142                 }
143         } while (TRUE);
144         g_assert_not_reached ();
145 }
146
147 void
148 mono_cq_enqueue (MonoCQ *cq, MonoObject *obj)
149 {
150         if (cq == NULL || obj == NULL)
151                 return;
152
153         do {
154                 if (mono_cqitem_try_enqueue (cq, obj)) {
155                         CQ_DEBUG ("Queued one");
156                         InterlockedIncrement (&cq->count);
157                         break;
158                 }
159                 SleepEx (0, FALSE);
160         } while (TRUE);
161 }
162
163 static void
164 mono_cq_remove_node (MonoCQ *cq)
165 {
166         MonoMList *old_head;
167
168         CQ_DEBUG ("Removing node");
169         old_head = cq->head;
170         /* Not needed now that array_state is GC memory
171         MonoCQItem *queue;
172         int i;
173         gboolean retry;
174         queue = (MonoCQItem *) old_head->data;
175         do {
176                 retry = FALSE;
177                 for (i = 0; i < CQ_ARRAY_SIZE; i++) {
178                         if (mono_array_get (queue->array_state, char, i) == TRUE) {
179                                 retry = TRUE;
180                                 break;
181                         }
182                 }
183                 if (retry)
184                         SleepEx (0, FALSE);
185         } while (retry);
186          */
187         while (old_head->next == NULL)
188                 SleepEx (0, FALSE);
189         cq->head = old_head->next;
190         old_head = NULL;
191 }
192
193 static gboolean
194 mono_cqitem_try_dequeue (MonoCQ *cq, MonoObject **obj)
195 {
196         MonoCQItem *queue;
197         MonoMList *head;
198         gint32 pos;
199
200         head = cq->head;
201         queue = (MonoCQItem *) head->data;
202         do {
203                 pos = queue->first;
204                 if (pos >= queue->last || pos >= CQ_ARRAY_SIZE)
205                         return FALSE;
206
207                 if (InterlockedCompareExchange (&queue->first, pos + 1, pos) == pos) {
208                         while (mono_array_get (queue->array_state, char, pos) == FALSE) {
209                                 SleepEx (0, FALSE);
210                         }
211                         LOAD_LOAD_FENCE;
212                         *obj = mono_array_get (queue->array, MonoObject *, pos);
213
214                         /*
215                         Here don't need to fence since the only spot that reads it is the one above.
216                         Additionally, the first store is superfluous, so it can happen OOO with the second.
217                         */
218                         mono_array_set (queue->array, MonoObject *, pos, NULL);
219                         mono_array_set (queue->array_state, char, pos, FALSE);
220                         
221                         /*
222                         We should do a STORE_LOAD fence here to make sure subsequent loads see new state instead
223                         of the above stores. We can safely ignore this as the only issue of seeing a stale value
224                         is the thread yielding. Given how unfrequent this will be in practice, we better avoid the
225                         very expensive STORE_LOAD fence.
226                         */
227                         
228                         if ((pos + 1) == CQ_ARRAY_SIZE) {
229                                 mono_cq_remove_node (cq);
230                         }
231                         return TRUE;
232                 }
233         } while (TRUE);
234         g_assert_not_reached ();
235 }
236
237 gboolean
238 mono_cq_dequeue (MonoCQ *cq, MonoObject **result)
239 {
240         while (cq->count > 0) {
241                 if (mono_cqitem_try_dequeue (cq, result)) {
242                         CQ_DEBUG ("Dequeued one");
243                         InterlockedDecrement (&cq->count);
244                         return TRUE;
245                 }
246                 SleepEx (0, FALSE);
247         }
248         return FALSE;
249 }
250