git.haldean.org ubik / bf7a294
restructure jobq to defer to deque, prevent global queue starvation Haldean Brown 5 years ago
7 changed file(s) with 61 addition(s) and 116 deletion(s). Raw diff Collapse all Expand all
5252
5353 bool
5454 ubik_deque_empty(struct ubik_deque *d);
55
56 /* Node structs are recycled internally by the deque; this frees all memory
57 * associated with the recycler. Calling this will probably reduce memory usage
58 * of the interpreter momentarily, but usage will eventually grow again when
59 * more elements are enqueued and dequeued. This function, unlike the other
60 * deque functions, is not MT-safe, and cannot be called while any other
61 * threads are attempting to access any deque. */
62 void
63 ubik_deque_empty_recycler();
2727 global queue.
2828 */
2929
30 #include "ubik/deque.h"
31
3032 #include <pthread.h>
3133 #include <stdlib.h>
3234
4547 struct ubik_jobq_node *tail;
4648 struct ubik_jobq_node *recycle;
4749 size_t size;
50
51 /* This counts the number of pops we've taken from the local queue
52 * without checking the global queue. When this hits a threshold value,
53 * we check the global queue for work. This prevents the evaluator from
54 * spinning on the local queue while the tasks that are blocking those
55 * local tasks from being evaluated are sitting on the global queue not
56 * getting done. */
57 uint16_t since_global_check;
4858 };
4959
5060 struct ubik_jobq
5161 {
62 struct ubik_deque d;
5263 struct ubik_jobq_subq *qs;
53 struct ubik_jobq_node *global_head;
54 struct ubik_jobq_node *global_tail;
55 pthread_mutex_t global_lock;
5664 size_t n_queues;
5765 };
5866
594594 ets = (struct eval_thread_state *) e;
595595 evaluator = ets->e;
596596 wid = ets->id;
597 free(ets);
598
597599 err = OK;
598600
599601 while (!evaluator->die)
2121 #include "ubik/jobq.h"
2222 #include "ubik/util.h"
2323
24 #define CHECK_GLOBAL_PERIOD 20
25
2426 static const size_t max_subqueue_size = 32;
2527
2628 void
2729 ubik_jobq_init(struct ubik_jobq *q, size_t n_workers)
2830 {
29 pthread_mutex_init(&q->global_lock, NULL);
31 ubik_deque_init(&q->d);
3032 ubik_galloc((void**) &q->qs, n_workers, sizeof(struct ubik_jobq_subq));
3133 q->n_queues = n_workers;
32 q->global_head = NULL;
33 q->global_tail = NULL;
3434 }
3535
3636 void
4040 struct ubik_jobq_node *n;
4141
4242 sq = q->qs + worker_id;
43
44 if (unlikely(sq->size >= max_subqueue_size))
45 {
46 ubik_deque_pushl(&q->d, e);
47 return;
48 }
49
4350 if (sq->recycle != NULL)
4451 {
4552 n = sq->recycle;
5259 }
5360 n->elem = e;
5461
55 if (likely(sq->size < max_subqueue_size))
56 {
57 n->right = sq->tail;
58 if (sq->tail != NULL)
59 sq->tail->left = n;
60 else
61 sq->head = n;
62 sq->tail = n;
63 sq->size++;
64 return;
65 }
66
67 pthread_mutex_lock(&q->global_lock);
68 n->right = q->global_tail;
69 if (q->global_tail != NULL)
70 q->global_tail->left = n;
71 if (q->global_head == NULL)
72 q->global_head = n;
73 q->global_tail = n;
74 pthread_mutex_unlock(&q->global_lock);
62 n->right = sq->tail;
63 if (sq->tail != NULL)
64 sq->tail->left = n;
65 else
66 sq->head = n;
67 sq->tail = n;
68 sq->size++;
7569 }
7670
7771 void *
8377
8478 sq = q->qs + worker_id;
8579 n = NULL;
80 sq->since_global_check++;
8681
87 if (likely(sq->size > 0))
82 if (sq->since_global_check == CHECK_GLOBAL_PERIOD)
83 {
84 elem = ubik_deque_popr(&q->d);
85 sq->since_global_check = 0;
86 if (elem != NULL)
87 return elem;
88 }
89
90 if (sq->size > 0)
8891 {
8992 n = sq->head;
9093 if (n != NULL)
9598 else
9699 sq->head->right = NULL;
97100 sq->size--;
101
102 elem = n->elem;
103
104 n->elem = NULL;
105 n->left = NULL;
106 n->right = sq->recycle;
107 sq->recycle = n;
108
109 return elem;
98110 }
99111 }
100 if (n == NULL)
101 {
102 pthread_mutex_lock(&q->global_lock);
103 n = q->global_head;
104 if (n != NULL)
105 q->global_head = n->left;
106 if (q->global_head == NULL)
107 q->global_tail = NULL;
108 else
109 q->global_head->right = NULL;
110 pthread_mutex_unlock(&q->global_lock);
111 }
112 if (n == NULL)
113 return NULL;
114112
115 elem = n->elem;
116
117 n->elem = NULL;
118 n->left = NULL;
119 n->right = sq->recycle;
120 sq->recycle = n;
121
122 return elem;
113 sq->since_global_check = 0;
114 return ubik_deque_popr(&q->d);
123115 }
124116
125117 static void
148140 free_ll(&sq->tail);
149141 free_ll(&sq->recycle);
150142 }
151 free_ll(&q->global_tail);
152143 free(q->qs);
153144 }
154145
2121
2222 #include <stdatomic.h>
2323
24 /* Node recycling; this becomes a linked list of free nodes available for use. */
25 static struct ubik_deque_elem *elem_pool = NULL;
26
27 static inline struct ubik_deque_elem *
28 obtain()
29 {
30 struct ubik_deque_elem *e;
31
32 for (;;)
33 {
34 e = atomic_load(&elem_pool);
35 if (e == NULL)
36 {
37 e = calloc(1, sizeof(struct ubik_deque_elem));
38 ubik_assert(e != NULL);
39 return e;
40 }
41 if (atomic_compare_exchange_weak(&elem_pool, &e, e->right))
42 return e;
43 }
44 }
45
46 static inline void
47 recycle(struct ubik_deque_elem *e)
48 {
49 struct ubik_deque_elem *current;
50
51 for (;;)
52 {
53 current = elem_pool;
54 if (atomic_compare_exchange_weak(&elem_pool, &current, e))
55 {
56 e->right = current;
57 return;
58 }
59 }
60 }
61
62 void
63 ubik_deque_empty_recycler()
64 {
65 struct ubik_deque_elem *e;
66
67 while (elem_pool != NULL)
68 {
69 e = elem_pool;
70 elem_pool = e->right;
71 free(e);
72 }
73 }
74
7524 void
7625 ubik_deque_init(struct ubik_deque *d)
7726 {
7827 pthread_mutex_init(&d->lock, NULL);
28 d->left = NULL;
29 d->right = NULL;
7930 }
8031
8132 void
8233 ubik_deque_pushl(struct ubik_deque *d, void *e)
8334 {
8435 struct ubik_deque_elem *elem;
85 elem = obtain();
36
37 ubik_galloc((void**) &elem, 1, sizeof(struct ubik_deque_elem));
8638
8739 pthread_mutex_lock(&d->lock);
8840 elem->e = e;
10052 ubik_deque_pushr(struct ubik_deque *d, void *e)
10153 {
10254 struct ubik_deque_elem *elem;
103 elem = obtain();
55
56 ubik_galloc((void**) &elem, 1, sizeof(struct ubik_deque_elem));
10457
10558 pthread_mutex_lock(&d->lock);
10659 elem->e = e;
13689 pthread_mutex_unlock(&d->lock);
13790
13891 v = e->e;
139 recycle(e);
92 free(e);
14093 return v;
14194 }
14295
162115 pthread_mutex_unlock(&d->lock);
163116
164117 v = e->e;
165 recycle(e);
118 free(e);
166119 return v;
167120 }
168121
118118 return err;
119119
120120 ubik_hooks_teardown();
121 ubik_deque_empty_recycler();
122121 return OK;
123122 }
2727 int x, y;
2828
2929 ubik_jobq_init(&q, 2);
30 assert(ubik_deque_empty(&q.d));
3031
3132 ubik_jobq_push(&q, 0, &x);
3233 ubik_jobq_push(&q, 1, &y);
3334 assert(q.qs[0].size == 1);
3435 assert(q.qs[1].size == 1);
35 assert(q.global_head == NULL);
36 assert(ubik_deque_empty(&q.d));
3637
3738 assert(ubik_jobq_pop(&q, 0) == &x);
3839 assert(ubik_jobq_pop(&q, 0) == NULL);