Merge pull request #205 from m3rlinez/master
[mono.git] / mono / metadata / mono-cq.c
1 /*
2  * mono-cq.c: concurrent queue
3  *
4  * Authors:
5  *   Gonzalo Paniagua Javier (gonzalo@novell.com)
6  *
7  * Copyright (c) 2011 Novell, Inc (http://www.novell.com)
8  * Copyright 2011 Xamarin Inc
9  */
10
11 #include <mono/metadata/object.h>
12 #include <mono/metadata/mono-cq.h>
13 #include <mono/metadata/mono-mlist.h>
14 #include <mono/utils/mono-memory-model.h>
15
16 #define CQ_DEBUG(...)
17 //#define CQ_DEBUG(...) g_message(__VA_ARGS__)
18
19 struct _MonoCQ {
20         MonoMList *head;
21         MonoMList *tail;
22         volatile gint32 count;
23 };
24
25 /* matches the System.MonoListItem object */
26 struct _MonoMList {
27         MonoObject object;
28         MonoMList *next;
29         MonoObject *data;
30 };
31
32 /* matches the System.MonoCQItem object */
33 struct _MonoCQItem {
34         MonoObject object;
35         MonoArray *array; // MonoObjects
36         MonoArray *array_state; // byte array
37         volatile gint32 first;
38         volatile gint32 last;
39 };
40
41 typedef struct _MonoCQItem MonoCQItem;
42 #define CQ_ARRAY_SIZE   64
43
44 static MonoVTable *monocq_item_vtable = NULL;
45
46 static MonoCQItem *
47 mono_cqitem_alloc (void)
48 {
49         MonoCQItem *queue;
50         MonoDomain *domain = mono_get_root_domain ();
51
52         if (!monocq_item_vtable) {
53                 MonoClass *klass = mono_class_from_name (mono_defaults.corlib, "System", "MonoCQItem");
54                 monocq_item_vtable = mono_class_vtable (domain, klass);
55                 g_assert (monocq_item_vtable);
56         }
57         queue = (MonoCQItem *) mono_object_new_fast (monocq_item_vtable);
58         MONO_OBJECT_SETREF (queue, array, mono_array_new (domain, mono_defaults.object_class, CQ_ARRAY_SIZE));
59         MONO_OBJECT_SETREF (queue, array_state, mono_array_new (domain, mono_defaults.byte_class, CQ_ARRAY_SIZE));
60         return queue;
61 }
62
63 MonoCQ *
64 mono_cq_create ()
65 {
66         MonoCQ *cq;
67
68         cq = g_new0 (MonoCQ, 1);
69         MONO_GC_REGISTER_ROOT (cq->head);
70         MONO_GC_REGISTER_ROOT (cq->tail);
71         cq->head = mono_mlist_alloc ((MonoObject *) mono_cqitem_alloc ());
72         cq->tail = cq->head;
73         CQ_DEBUG ("Created %p", cq);
74         return cq;
75 }
76
77 void
78 mono_cq_destroy (MonoCQ *cq)
79 {
80         CQ_DEBUG ("Destroy %p", cq);
81         if (!cq)
82                 return;
83
84         mono_gc_bzero (cq, sizeof (MonoCQ));
85         MONO_GC_UNREGISTER_ROOT (cq->tail);
86         MONO_GC_UNREGISTER_ROOT (cq->head);
87         g_free (cq);
88 }
89
90 gint32
91 mono_cq_count (MonoCQ *cq)
92 {
93         if (!cq)
94                 return 0;
95
96         CQ_DEBUG ("Count %d", cq->count);
97         return cq->count;
98 }
99
100 static void
101 mono_cq_add_node (MonoCQ *cq)
102 {
103         MonoMList *n;
104         MonoMList *prev_tail;
105
106         CQ_DEBUG ("Adding node");
107         n = mono_mlist_alloc ((MonoObject *) mono_cqitem_alloc ());
108         prev_tail = cq->tail;
109         MONO_OBJECT_SETREF (prev_tail, next, n);
110         cq->tail = n;
111 }
112
113 static gboolean
114 mono_cqitem_try_enqueue (MonoCQ *cq, MonoObject *obj)
115 {
116         MonoCQItem *queue;
117         MonoMList *tail;
118         gint32 pos;
119
120         tail = cq->tail;
121         queue = (MonoCQItem *) tail->data;
122         do {
123                 pos = queue->last;
124                 if (pos >= CQ_ARRAY_SIZE) {
125                         CQ_DEBUG ("enqueue(): pos >= CQ_ARRAY_SIZE, %d >= %d", pos, CQ_ARRAY_SIZE);
126                         return FALSE;
127                 }
128
129                 if (InterlockedCompareExchange (&queue->last, pos + 1, pos) == pos) {
130                         mono_array_setref (queue->array, pos, obj);
131                         STORE_STORE_FENCE;
132                         mono_array_set (queue->array_state, char, pos, TRUE);
133                         if ((pos + 1) == CQ_ARRAY_SIZE) {
134                                 CQ_DEBUG ("enqueue(): pos + 1 == CQ_ARRAY_SIZE, %d. Adding node.", CQ_ARRAY_SIZE);
135                                 mono_cq_add_node (cq);
136                         }
137                         return TRUE;
138                 }
139         } while (TRUE);
140         g_assert_not_reached ();
141 }
142
143 void
144 mono_cq_enqueue (MonoCQ *cq, MonoObject *obj)
145 {
146         if (cq == NULL || obj == NULL)
147                 return;
148
149         do {
150                 if (mono_cqitem_try_enqueue (cq, obj)) {
151                         CQ_DEBUG ("Queued one");
152                         InterlockedIncrement (&cq->count);
153                         break;
154                 }
155                 SleepEx (0, FALSE);
156         } while (TRUE);
157 }
158
159 static void
160 mono_cq_remove_node (MonoCQ *cq)
161 {
162         MonoMList *old_head;
163
164         CQ_DEBUG ("Removing node");
165         old_head = cq->head;
166         /* Not needed now that array_state is GC memory
167         MonoCQItem *queue;
168         int i;
169         gboolean retry;
170         queue = (MonoCQItem *) old_head->data;
171         do {
172                 retry = FALSE;
173                 for (i = 0; i < CQ_ARRAY_SIZE; i++) {
174                         if (mono_array_get (queue->array_state, char, i) == TRUE) {
175                                 retry = TRUE;
176                                 break;
177                         }
178                 }
179                 if (retry)
180                         SleepEx (0, FALSE);
181         } while (retry);
182          */
183         while (old_head->next == NULL)
184                 SleepEx (0, FALSE);
185         cq->head = old_head->next;
186         old_head = NULL;
187 }
188
189 static gboolean
190 mono_cqitem_try_dequeue (MonoCQ *cq, MonoObject **obj)
191 {
192         MonoCQItem *queue;
193         MonoMList *head;
194         gint32 pos;
195
196         head = cq->head;
197         queue = (MonoCQItem *) head->data;
198         do {
199                 pos = queue->first;
200                 if (pos >= queue->last || pos >= CQ_ARRAY_SIZE)
201                         return FALSE;
202
203                 if (InterlockedCompareExchange (&queue->first, pos + 1, pos) == pos) {
204                         while (mono_array_get (queue->array_state, char, pos) == FALSE) {
205                                 SleepEx (0, FALSE);
206                         }
207                         LOAD_LOAD_FENCE;
208                         *obj = mono_array_get (queue->array, MonoObject *, pos);
209
210                         /*
211                         Here don't need to fence since the only spot that reads it is the one above.
212                         Additionally, the first store is superfluous, so it can happen OOO with the second.
213                         */
214                         mono_array_set (queue->array, MonoObject *, pos, NULL);
215                         mono_array_set (queue->array_state, char, pos, FALSE);
216                         
217                         /*
218                         We should do a STORE_LOAD fence here to make sure subsequent loads see new state instead
219                         of the above stores. We can safely ignore this as the only issue of seeing a stale value
220                         is the thread yielding. Given how unfrequent this will be in practice, we better avoid the
221                         very expensive STORE_LOAD fence.
222                         */
223                         
224                         if ((pos + 1) == CQ_ARRAY_SIZE) {
225                                 mono_cq_remove_node (cq);
226                         }
227                         return TRUE;
228                 }
229         } while (TRUE);
230         g_assert_not_reached ();
231 }
232
233 gboolean
234 mono_cq_dequeue (MonoCQ *cq, MonoObject **result)
235 {
236         while (cq->count > 0) {
237                 if (mono_cqitem_try_dequeue (cq, result)) {
238                         CQ_DEBUG ("Dequeued one");
239                         InterlockedDecrement (&cq->count);
240                         return TRUE;
241                 }
242                 SleepEx (0, FALSE);
243         }
244         return FALSE;
245 }
246