2 * mono-cq.c: concurrent queue
5 * Gonzalo Paniagua Javier (gonzalo@novell.com)
7 * Copyright (c) 2011 Novell, Inc (http://www.novell.com)
8 * Copyright 2011 Xamarin Inc
11 #include <mono/metadata/object.h>
12 #include <mono/metadata/mono-cq.h>
13 #include <mono/metadata/mono-mlist.h>
14 #include <mono/utils/mono-memory-model.h>
17 //#define CQ_DEBUG(...) g_message(__VA_ARGS__)
22 volatile gint32 count;
25 /* matches the System.MonoListItem object */
32 /* matches the System.MonoCQItem object */
35 MonoArray *array; // MonoObjects
36 MonoArray *array_state; // byte array
37 volatile gint32 first;
41 typedef struct _MonoCQItem MonoCQItem;
42 #define CQ_ARRAY_SIZE 64
44 static MonoVTable *monocq_item_vtable = NULL;
47 mono_cqitem_alloc (void)
50 MonoDomain *domain = mono_get_root_domain ();
52 if (!monocq_item_vtable) {
53 MonoClass *klass = mono_class_from_name (mono_defaults.corlib, "System", "MonoCQItem");
54 monocq_item_vtable = mono_class_vtable (domain, klass);
55 g_assert (monocq_item_vtable);
57 queue = (MonoCQItem *) mono_object_new_fast (monocq_item_vtable);
58 MONO_OBJECT_SETREF (queue, array, mono_array_new (domain, mono_defaults.object_class, CQ_ARRAY_SIZE));
59 MONO_OBJECT_SETREF (queue, array_state, mono_array_new (domain, mono_defaults.byte_class, CQ_ARRAY_SIZE));
68 cq = g_new0 (MonoCQ, 1);
69 MONO_GC_REGISTER_ROOT (cq->head);
70 MONO_GC_REGISTER_ROOT (cq->tail);
71 cq->head = mono_mlist_alloc ((MonoObject *) mono_cqitem_alloc ());
73 CQ_DEBUG ("Created %p", cq);
78 mono_cq_destroy (MonoCQ *cq)
80 CQ_DEBUG ("Destroy %p", cq);
84 mono_gc_bzero (cq, sizeof (MonoCQ));
85 MONO_GC_UNREGISTER_ROOT (cq->tail);
86 MONO_GC_UNREGISTER_ROOT (cq->head);
91 mono_cq_count (MonoCQ *cq)
96 CQ_DEBUG ("Count %d", cq->count);
101 mono_cq_add_node (MonoCQ *cq)
104 MonoMList *prev_tail;
106 CQ_DEBUG ("Adding node");
107 n = mono_mlist_alloc ((MonoObject *) mono_cqitem_alloc ());
108 prev_tail = cq->tail;
109 MONO_OBJECT_SETREF (prev_tail, next, n);
114 mono_cqitem_try_enqueue (MonoCQ *cq, MonoObject *obj)
121 queue = (MonoCQItem *) tail->data;
124 if (pos >= CQ_ARRAY_SIZE) {
125 CQ_DEBUG ("enqueue(): pos >= CQ_ARRAY_SIZE, %d >= %d", pos, CQ_ARRAY_SIZE);
129 if (InterlockedCompareExchange (&queue->last, pos + 1, pos) == pos) {
130 mono_array_setref (queue->array, pos, obj);
132 mono_array_set (queue->array_state, char, pos, TRUE);
133 if ((pos + 1) == CQ_ARRAY_SIZE) {
134 CQ_DEBUG ("enqueue(): pos + 1 == CQ_ARRAY_SIZE, %d. Adding node.", CQ_ARRAY_SIZE);
135 mono_cq_add_node (cq);
140 g_assert_not_reached ();
144 mono_cq_enqueue (MonoCQ *cq, MonoObject *obj)
146 if (cq == NULL || obj == NULL)
150 if (mono_cqitem_try_enqueue (cq, obj)) {
151 CQ_DEBUG ("Queued one");
152 InterlockedIncrement (&cq->count);
160 mono_cq_remove_node (MonoCQ *cq)
164 CQ_DEBUG ("Removing node");
166 /* Not needed now that array_state is GC memory
170 queue = (MonoCQItem *) old_head->data;
173 for (i = 0; i < CQ_ARRAY_SIZE; i++) {
174 if (mono_array_get (queue->array_state, char, i) == TRUE) {
183 while (old_head->next == NULL)
185 cq->head = old_head->next;
190 mono_cqitem_try_dequeue (MonoCQ *cq, MonoObject **obj)
197 queue = (MonoCQItem *) head->data;
200 if (pos >= queue->last || pos >= CQ_ARRAY_SIZE)
203 if (InterlockedCompareExchange (&queue->first, pos + 1, pos) == pos) {
204 while (mono_array_get (queue->array_state, char, pos) == FALSE) {
208 *obj = mono_array_get (queue->array, MonoObject *, pos);
211 Here don't need to fence since the only spot that reads it is the one above.
212 Additionally, the first store is superfluous, so it can happen OOO with the second.
214 mono_array_set (queue->array, MonoObject *, pos, NULL);
215 mono_array_set (queue->array_state, char, pos, FALSE);
218 We should do a STORE_LOAD fence here to make sure subsequent loads see new state instead
219 of the above stores. We can safely ignore this as the only issue of seeing a stale value
220 is the thread yielding. Given how unfrequent this will be in practice, we better avoid the
221 very expensive STORE_LOAD fence.
224 if ((pos + 1) == CQ_ARRAY_SIZE) {
225 mono_cq_remove_node (cq);
230 g_assert_not_reached ();
234 mono_cq_dequeue (MonoCQ *cq, MonoObject **result)
236 while (cq->count > 0) {
237 if (mono_cqitem_try_dequeue (cq, result)) {
238 CQ_DEBUG ("Dequeued one");
239 InterlockedDecrement (&cq->count);