RCU
更新于 2023-11-26
注意
DPDK-RCU和内存RCU实现略有不同
静态
Quiescent State (QS)
线程执行时未访问临界资源
当读线程进入静态时,写线程可以无需加锁即可删除或修改临界资源
设计原理
- 使用QS变量(原子自增类型变量)
读线程可以加入或离开检测数组
读线程访问临界资源前要获取token,访问完后更新token或下线。
写线程操作前更新token,通过遍历读线程token确认读线程是否进入静态
使用方法
数据结构
| C |
|---|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37 | struct rte_rcu_qsbr_cnt {
uint64_t cnt; //静态计数
/**< Quiescent state counter. Value 0 indicates the thread is offline
* 64b counter is used to avoid adding more code to address
* counter overflow. Changing this to 32b would require additional
* changes to various APIs.
*/
uint32_t lock_cnt; //加锁计数,调试使用
/**< Lock counter. Used when RTE_LIBRTE_RCU_DEBUG is enabled */
} __rte_cache_aligned;
struct rte_rcu_qsbr {
uint64_t token __rte_cache_aligned;
/**< Counter to allow for multiple concurrent quiescent state queries */
uint64_t acked_token;
/**< Least token acked by all the threads in the last call to
* rte_rcu_qsbr_check API.
*/
uint32_t num_elems __rte_cache_aligned;
/**< Number of elements in the thread ID array */
uint32_t num_threads; //访问线程个数
/**< Number of threads currently using this QS variable */
uint32_t max_threads; //最大线程个数
/**< Maximum number of threads using this QS variable */
//此处内存要动态申请
//第一块是各线程状态计数
//第二块是线程位图
struct rte_rcu_qsbr_cnt qsbr_cnt[0] __rte_cache_aligned;
/**< Quiescent state counter array of 'max_threads' elements */
/**< Registered thread IDs are stored in a bitmap array,
* after the quiescent state counter array.
*/
} __rte_cache_aligned;
|
初始化
| C |
|---|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53 | int
rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads) //初始化函数
{
size_t sz;
if (v == NULL) {
rte_log(RTE_LOG_ERR, rte_rcu_log_type,
"%s(): Invalid input parameter\n", __func__);
rte_errno = EINVAL;
return 1;
}
sz = rte_rcu_qsbr_get_memsize(max_threads); //根据线程个数计算内存
if (sz == 1)
return 1;
/* Set all the threads to offline */
memset(v, 0, sz); //全清空
v->max_threads = max_threads; //记录最大线程个数
v->num_elems = RTE_ALIGN_MUL_CEIL(max_threads,
__RTE_QSBR_THRID_ARRAY_ELM_SIZE) /
__RTE_QSBR_THRID_ARRAY_ELM_SIZE; //关于64向上取整,而后除64
v->token = __RTE_QSBR_CNT_INIT;
v->acked_token = __RTE_QSBR_CNT_INIT - 1;
return 0;
}
size_t
rte_rcu_qsbr_get_memsize(uint32_t max_threads) //计算qsbr占用内存
{
size_t sz;
if (max_threads == 0) {
rte_log(RTE_LOG_ERR, rte_rcu_log_type,
"%s(): Invalid max_threads %u\n",
__func__, max_threads);
rte_errno = EINVAL;
return 1;
}
sz = sizeof(struct rte_rcu_qsbr); //结构体内存大小
/* Add the size of quiescent state counter array */
sz += sizeof(struct rte_rcu_qsbr_cnt) * max_threads; //线程状态占用内存大小
/* Add the size of the registered thread ID bitmap array */
sz += __RTE_QSBR_THRID_ARRAY_SIZE(max_threads); //线程ID位图数组
return sz;
}
|
读线程
线程注册
注册即记录线程
| C |
|---|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34 | int
rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr *v, unsigned int thread_id)
{
unsigned int i, id, success;
uint64_t old_bmap, new_bmap;
id = thread_id & __RTE_QSBR_THRID_MASK; //低6位做偏移
i = thread_id >> __RTE_QSBR_THRID_INDEX_SHIFT; //高26位做索引
old_bmap = __atomic_load_n(__RTE_QSBR_THRID_ARRAY_ELM(v, i),
__ATOMIC_RELAXED); //取出64bit位图
if (old_bmap & 1UL << id) //若注册过,直接返回
return 0;
do {
new_bmap = old_bmap | (1UL << id); //未注册过,更新位图
success = __atomic_compare_exchange(
__RTE_QSBR_THRID_ARRAY_ELM(v, i),
&old_bmap, &new_bmap, 0,
__ATOMIC_RELEASE, __ATOMIC_RELAXED); //写入位图
if (success) //写成功,读线程总数自增一
__atomic_fetch_add(&v->num_threads,
1, __ATOMIC_RELAXED);
else if (old_bmap & (1UL << id))
/* Someone else registered this thread.
* Counter should not be incremented.
*/
return 0;
} while (success == 0);
return 0;
}
|
去注册
| C |
|---|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 | int
rte_rcu_qsbr_thread_unregister(struct rte_rcu_qsbr *v, unsigned int thread_id)
{
unsigned int i, id, success;
uint64_t old_bmap, new_bmap;
id = thread_id & __RTE_QSBR_THRID_MASK; //低6位做偏移
i = thread_id >> __RTE_QSBR_THRID_INDEX_SHIFT; //高26位做索引
old_bmap = __atomic_load_n(__RTE_QSBR_THRID_ARRAY_ELM(v, i),
__ATOMIC_RELAXED); //读取位图
if (!(old_bmap & (1UL << id))) //未注册过直接返回
return 0;
do {
new_bmap = old_bmap & ~(1UL << id); //清空特定位
success = __atomic_compare_exchange(
__RTE_QSBR_THRID_ARRAY_ELM(v, i),
&old_bmap, &new_bmap, 0,
__ATOMIC_RELEASE, __ATOMIC_RELAXED); //更新位图
if (success) //注册线程数减一
__atomic_fetch_sub(&v->num_threads,
1, __ATOMIC_RELAXED);
else if (!(old_bmap & (1UL << id)))
return 0;
} while (success == 0);
return 0;
}
|
线程上线
| C |
|---|
1
2
3
4
5
6
7
8
9
10
11
12 | static __rte_always_inline void
rte_rcu_qsbr_thread_online(struct rte_rcu_qsbr *v, unsigned int thread_id)
{
uint64_t t;
t = __atomic_load_n(&v->token, __ATOMIC_RELAXED); //读取token
__atomic_store_n(&v->qsbr_cnt[thread_id].cnt,
t, __ATOMIC_RELAXED); //记录token到线程变量
rte_atomic_thread_fence(__ATOMIC_SEQ_CST);
}
|
线程下线
| C |
|---|
| static __rte_always_inline void
rte_rcu_qsbr_thread_offline(struct rte_rcu_qsbr *v, unsigned int thread_id)
{
__atomic_store_n(&v->qsbr_cnt[thread_id].cnt,
__RTE_QSBR_CNT_THR_OFFLINE, __ATOMIC_RELEASE); //将线程token置零
}
|
状态上报
| C |
|---|
1
2
3
4
5
6
7
8
9
10
11
12 | static __rte_always_inline void
rte_rcu_qsbr_quiescent(struct rte_rcu_qsbr *v, unsigned int thread_id)
{
uint64_t t;
t = __atomic_load_n(&v->token, __ATOMIC_ACQUIRE); //读取最新token
if (t != __atomic_load_n(&v->qsbr_cnt[thread_id].cnt, __ATOMIC_RELAXED)) //若token不一致
__atomic_store_n(&v->qsbr_cnt[thread_id].cnt,
t, __ATOMIC_RELEASE); //更新线程token
}
|
写线程
允许上报
| C |
|---|
| static __rte_always_inline uint64_t
rte_rcu_qsbr_start(struct rte_rcu_qsbr *v)
{
uint64_t t;
t = __atomic_fetch_add(&v->token, 1, __ATOMIC_RELEASE) + 1; //更新token
return t;
}
|
检测
| C |
|---|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47 | static __rte_always_inline int
rte_rcu_qsbr_check(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
{
if (likely(t <= v->acked_token)) { //若当前检测token比读线程得都小,直接返回成功
return 1;
}
if (likely(v->num_threads == v->max_threads))
return __rte_rcu_qsbr_check_all(v, t, wait); //检测所有读线程
else
return __rte_rcu_qsbr_check_selective(v, t, wait); //检测特定线程
}
static __rte_always_inline int
__rte_rcu_qsbr_check_all(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
{
uint32_t i;
struct rte_rcu_qsbr_cnt *cnt;
uint64_t c;
uint64_t acked_token = __RTE_QSBR_CNT_MAX;
for (i = 0, cnt = v->qsbr_cnt; i < v->max_threads; i++, cnt++) { //遍历所有线程
while (1) {
c = __atomic_load_n(&cnt->cnt, __ATOMIC_ACQUIRE); //读取读线程得token
if (likely(c == __RTE_QSBR_CNT_THR_OFFLINE || c >= t)) //若线程下线或token已更新,则跳出
break;
if (!wait) //是否等待
return 0; //若不等,直接返回失败
rte_pause(); //等一下
}
if (likely(c != __RTE_QSBR_CNT_THR_OFFLINE && acked_token > c)) //记录目前检测到最小token
acked_token = c;
}
if (acked_token != __RTE_QSBR_CNT_MAX) //记录检测到得token
__atomic_store_n(&v->acked_token, acked_token,
__ATOMIC_RELAXED);
return 1;
}
|
资源回收
入队
| C |
|---|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26 | int rte_rcu_qsbr_dq_enqueue(struct rte_rcu_qsbr_dq *dq, void *e)
{
__rte_rcu_qsbr_dq_elem_t *dq_elem;
uint32_t cur_size;
char data[dq->esize];
dq_elem = (__rte_rcu_qsbr_dq_elem_t *)data;
/* Start the grace period */
dq_elem->token = rte_rcu_qsbr_start(dq->v); //更新token
cur_size = rte_ring_count(dq->r);
if (cur_size > dq->trigger_reclaim_limit) { //超过阈值触发回收
rte_rcu_qsbr_dq_reclaim(dq, dq->max_reclaim_size,
NULL, NULL, NULL);
}
memcpy(dq_elem->elem, e, dq->esize - __RTE_QSBR_TOKEN_SIZE); //拷贝数据
if (rte_ring_enqueue_elem(dq->r, data, dq->esize) != 0) { //入队
return 1;
}
return 0;
}
|
回收
| C |
|---|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36 | /* Reclaim resources from the defer queue. */
int
rte_rcu_qsbr_dq_reclaim(struct rte_rcu_qsbr_dq *dq, unsigned int n,
unsigned int *freed, unsigned int *pending,
unsigned int *available)
{
uint32_t cnt;
__rte_rcu_qsbr_dq_elem_t *dq_elem;
cnt = 0;
char data[dq->esize];
while (cnt < n && //释放上限
rte_ring_dequeue_bulk_elem_start(dq->r, &data,
dq->esize, 1, available) != 0) { //PEEK数据
dq_elem = (__rte_rcu_qsbr_dq_elem_t *)data;
if (rte_rcu_qsbr_check(dq->v, dq_elem->token, false) != 1) { //检测读线程是否进入静态
rte_ring_dequeue_elem_finish(dq->r, 0);
break; //否,返回
}
rte_ring_dequeue_elem_finish(dq->r, 1); //是,
dq->free_fn(dq->p, dq_elem->elem, 1); //释放资源
cnt++;
}
if (freed != NULL)
*freed = cnt;
if (pending != NULL)
*pending = rte_ring_count(dq->r);
return 0;
}
|