2 * mono-cq.c: concurrent queue
5 * Gonzalo Paniagua Javier (gonzalo@novell.com)
7 * Copyright (c) 2011 Novell, Inc (http://www.novell.com)
10 #include <mono/metadata/object.h>
11 #include <mono/metadata/mono-cq.h>
12 #include <mono/metadata/mono-mlist.h>
13 #include <mono/utils/mono-memory-model.h>
16 //#define CQ_DEBUG(...) g_message(__VA_ARGS__)
21 volatile gint32 count;
24 /* matches the System.MonoListItem object */
31 /* matches the System.MonoCQItem object */
34 MonoArray *array; // MonoObjects
35 MonoArray *array_state; // byte array
36 volatile gint32 first;
40 typedef struct _MonoCQItem MonoCQItem;
41 #define CQ_ARRAY_SIZE 64
43 static MonoVTable *monocq_item_vtable = NULL;
46 mono_cqitem_alloc (void)
49 MonoDomain *domain = mono_get_root_domain ();
51 if (!monocq_item_vtable) {
52 MonoClass *klass = mono_class_from_name (mono_defaults.corlib, "System", "MonoCQItem");
53 monocq_item_vtable = mono_class_vtable (domain, klass);
54 g_assert (monocq_item_vtable);
56 queue = (MonoCQItem *) mono_object_new_fast (monocq_item_vtable);
57 MONO_OBJECT_SETREF (queue, array, mono_array_new (domain, mono_defaults.object_class, CQ_ARRAY_SIZE));
58 MONO_OBJECT_SETREF (queue, array_state, mono_array_new (domain, mono_defaults.byte_class, CQ_ARRAY_SIZE));
67 cq = g_new0 (MonoCQ, 1);
68 MONO_GC_REGISTER_ROOT (cq->head);
69 MONO_GC_REGISTER_ROOT (cq->tail);
70 cq->head = mono_mlist_alloc ((MonoObject *) mono_cqitem_alloc ());
72 CQ_DEBUG ("Created %p", cq);
77 mono_cq_destroy (MonoCQ *cq)
79 CQ_DEBUG ("Destroy %p", cq);
83 mono_gc_bzero (cq, sizeof (MonoCQ));
84 MONO_GC_UNREGISTER_ROOT (cq->tail);
85 MONO_GC_UNREGISTER_ROOT (cq->head);
90 mono_cq_count (MonoCQ *cq)
95 CQ_DEBUG ("Count %d", cq->count);
100 mono_cq_add_node (MonoCQ *cq)
103 MonoMList *prev_tail;
105 CQ_DEBUG ("Adding node");
106 n = mono_mlist_alloc ((MonoObject *) mono_cqitem_alloc ());
107 prev_tail = cq->tail;
108 MONO_OBJECT_SETREF (prev_tail, next, n);
113 mono_cqitem_try_enqueue (MonoCQ *cq, MonoObject *obj)
120 queue = (MonoCQItem *) tail->data;
123 if (pos >= CQ_ARRAY_SIZE) {
124 CQ_DEBUG ("enqueue(): pos >= CQ_ARRAY_SIZE, %d >= %d", pos, CQ_ARRAY_SIZE);
128 if (InterlockedCompareExchange (&queue->last, pos + 1, pos) == pos) {
129 mono_array_setref (queue->array, pos, obj);
131 mono_array_set (queue->array_state, char, pos, TRUE);
132 if ((pos + 1) == CQ_ARRAY_SIZE) {
133 CQ_DEBUG ("enqueue(): pos + 1 == CQ_ARRAY_SIZE, %d. Adding node.", CQ_ARRAY_SIZE);
134 mono_cq_add_node (cq);
139 g_assert_not_reached ();
143 mono_cq_enqueue (MonoCQ *cq, MonoObject *obj)
145 if (cq == NULL || obj == NULL)
149 if (mono_cqitem_try_enqueue (cq, obj)) {
150 CQ_DEBUG ("Queued one");
151 InterlockedIncrement (&cq->count);
159 mono_cq_remove_node (MonoCQ *cq)
163 CQ_DEBUG ("Removing node");
165 /* Not needed now that array_state is GC memory
169 queue = (MonoCQItem *) old_head->data;
172 for (i = 0; i < CQ_ARRAY_SIZE; i++) {
173 if (mono_array_get (queue->array_state, char, i) == TRUE) {
182 while (old_head->next == NULL)
184 cq->head = old_head->next;
189 mono_cqitem_try_dequeue (MonoCQ *cq, MonoObject **obj)
196 queue = (MonoCQItem *) head->data;
199 if (pos >= queue->last || pos >= CQ_ARRAY_SIZE)
202 if (InterlockedCompareExchange (&queue->first, pos + 1, pos) == pos) {
203 while (mono_array_get (queue->array_state, char, pos) == FALSE) {
207 *obj = mono_array_get (queue->array, MonoObject *, pos);
210 Here don't need to fence since the only spot that reads it is the one above.
211 Additionally, the first store is superfluous, so it can happen OOO with the second.
213 mono_array_set (queue->array, MonoObject *, pos, NULL);
214 mono_array_set (queue->array_state, char, pos, FALSE);
217 We should do a STORE_LOAD fence here to make sure subsequent loads see new state instead
218 of the above stores. We can safely ignore this as the only issue of seeing a stale value
219 is the thread yielding. Given how unfrequent this will be in practice, we better avoid the
220 very expensive STORE_LOAD fence.
223 if ((pos + 1) == CQ_ARRAY_SIZE) {
224 mono_cq_remove_node (cq);
229 g_assert_not_reached ();
233 mono_cq_dequeue (MonoCQ *cq, MonoObject **result)
235 while (cq->count > 0) {
236 if (mono_cqitem_try_dequeue (cq, result)) {
237 CQ_DEBUG ("Dequeued one");
238 InterlockedDecrement (&cq->count);