2 * mono-cq.c: concurrent queue
5 * Gonzalo Paniagua Javier (gonzalo@novell.com)
7 * Copyright (c) 2011 Novell, Inc (http://www.novell.com)
8 * Copyright 2011 Xamarin Inc
11 #include <mono/metadata/object.h>
12 #include <mono/metadata/mono-cq.h>
13 #include <mono/metadata/mono-mlist.h>
14 #include <mono/utils/mono-memory-model.h>
17 //#define CQ_DEBUG(...) g_message(__VA_ARGS__)
22 volatile gint32 count;
25 /* matches the System.MonoListItem object */
32 /* matches the System.MonoCQItem object */
35 MonoArray *array; // MonoObjects
36 MonoArray *array_state; // byte array
37 volatile gint32 first;
41 typedef struct _MonoCQItem MonoCQItem;
42 #define CQ_ARRAY_SIZE 64
44 static MonoVTable *monocq_item_vtable = NULL;
47 mono_cqitem_alloc (void)
50 MonoDomain *domain = mono_get_root_domain ();
52 if (!monocq_item_vtable) {
53 MonoClass *klass = mono_class_from_name (mono_defaults.corlib, "System", "MonoCQItem");
54 monocq_item_vtable = mono_class_vtable (domain, klass);
55 g_assert (monocq_item_vtable);
57 queue = (MonoCQItem *) mono_object_new_fast (monocq_item_vtable);
58 MONO_OBJECT_SETREF (queue, array, mono_array_new (domain, mono_defaults.object_class, CQ_ARRAY_SIZE));
59 MONO_OBJECT_SETREF (queue, array_state, mono_array_new (domain, mono_defaults.byte_class, CQ_ARRAY_SIZE));
68 cq = g_new0 (MonoCQ, 1);
69 MONO_GC_REGISTER_ROOT_SINGLE (cq->head);
70 MONO_GC_REGISTER_ROOT_SINGLE (cq->tail);
71 cq->head = mono_mlist_alloc ((MonoObject *) mono_cqitem_alloc ());
73 CQ_DEBUG ("Created %p", cq);
78 mono_cq_destroy (MonoCQ *cq)
80 CQ_DEBUG ("Destroy %p", cq);
84 mono_gc_bzero (cq, sizeof (MonoCQ));
85 MONO_GC_UNREGISTER_ROOT (cq->tail);
86 MONO_GC_UNREGISTER_ROOT (cq->head);
91 mono_cq_count (MonoCQ *cq)
96 CQ_DEBUG ("Count %d", cq->count);
101 mono_cq_add_node (MonoCQ *cq)
104 MonoMList *prev_tail;
106 CQ_DEBUG ("Adding node");
107 n = mono_mlist_alloc ((MonoObject *) mono_cqitem_alloc ());
108 prev_tail = cq->tail;
109 MONO_OBJECT_SETREF (prev_tail, next, n);
111 /* prev_tail->next must be visible before the new tail is */
118 mono_cqitem_try_enqueue (MonoCQ *cq, MonoObject *obj)
125 queue = (MonoCQItem *) tail->data;
128 if (pos >= CQ_ARRAY_SIZE) {
129 CQ_DEBUG ("enqueue(): pos >= CQ_ARRAY_SIZE, %d >= %d", pos, CQ_ARRAY_SIZE);
133 if (InterlockedCompareExchange (&queue->last, pos + 1, pos) == pos) {
134 mono_array_setref (queue->array, pos, obj);
136 mono_array_set (queue->array_state, char, pos, TRUE);
137 if ((pos + 1) == CQ_ARRAY_SIZE) {
138 CQ_DEBUG ("enqueue(): pos + 1 == CQ_ARRAY_SIZE, %d. Adding node.", CQ_ARRAY_SIZE);
139 mono_cq_add_node (cq);
144 g_assert_not_reached ();
148 mono_cq_enqueue (MonoCQ *cq, MonoObject *obj)
150 if (cq == NULL || obj == NULL)
154 if (mono_cqitem_try_enqueue (cq, obj)) {
155 CQ_DEBUG ("Queued one");
156 InterlockedIncrement (&cq->count);
164 mono_cq_remove_node (MonoCQ *cq)
168 CQ_DEBUG ("Removing node");
170 /* Not needed now that array_state is GC memory
174 queue = (MonoCQItem *) old_head->data;
177 for (i = 0; i < CQ_ARRAY_SIZE; i++) {
178 if (mono_array_get (queue->array_state, char, i) == TRUE) {
187 while (old_head->next == NULL)
189 cq->head = old_head->next;
194 mono_cqitem_try_dequeue (MonoCQ *cq, MonoObject **obj)
201 queue = (MonoCQItem *) head->data;
204 if (pos >= queue->last || pos >= CQ_ARRAY_SIZE)
207 if (InterlockedCompareExchange (&queue->first, pos + 1, pos) == pos) {
208 while (mono_array_get (queue->array_state, char, pos) == FALSE) {
212 *obj = mono_array_get (queue->array, MonoObject *, pos);
215 Here don't need to fence since the only spot that reads it is the one above.
216 Additionally, the first store is superfluous, so it can happen OOO with the second.
218 mono_array_set (queue->array, MonoObject *, pos, NULL);
219 mono_array_set (queue->array_state, char, pos, FALSE);
222 We should do a STORE_LOAD fence here to make sure subsequent loads see new state instead
223 of the above stores. We can safely ignore this as the only issue of seeing a stale value
224 is the thread yielding. Given how unfrequent this will be in practice, we better avoid the
225 very expensive STORE_LOAD fence.
228 if ((pos + 1) == CQ_ARRAY_SIZE) {
229 mono_cq_remove_node (cq);
234 g_assert_not_reached ();
238 mono_cq_dequeue (MonoCQ *cq, MonoObject **result)
240 while (cq->count > 0) {
241 if (mono_cqitem_try_dequeue (cq, result)) {
242 CQ_DEBUG ("Dequeued one");
243 InterlockedDecrement (&cq->count);