跳转至

RCU


更新于 2023-11-26

注意

DPDK-RCU和内存RCU实现略有不同

静态

Quiescent State (QS)
线程执行时未访问临界资源
读线程进入静态时,写线程可以无需加锁即可删除或修改临界资源

设计原理

  • 使用QS变量(原子自增类型变量)
  • 读线程可以加入离开检测数组
  • 读线程访问临界资源前要获取token,访问完后更新token下线
  • 写线程操作前更新token,通过遍历读线程token确认读线程是否进入静态

使用方法

数据结构

C
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
struct rte_rcu_qsbr_cnt {
    uint64_t cnt; //静态计数
    /**< Quiescent state counter. Value 0 indicates the thread is offline
     *   64b counter is used to avoid adding more code to address
     *   counter overflow. Changing this to 32b would require additional
     *   changes to various APIs.
     */
    uint32_t lock_cnt; //加锁计数,调试使用
    /**< Lock counter. Used when RTE_LIBRTE_RCU_DEBUG is enabled */
} __rte_cache_aligned;

struct rte_rcu_qsbr {
    uint64_t token __rte_cache_aligned;
    /**< Counter to allow for multiple concurrent quiescent state queries */
    uint64_t acked_token;
    /**< Least token acked by all the threads in the last call to
     *   rte_rcu_qsbr_check API.
     */

    uint32_t num_elems __rte_cache_aligned;
    /**< Number of elements in the thread ID array */
    uint32_t num_threads; //访问线程个数
    /**< Number of threads currently using this QS variable */
    uint32_t max_threads; //最大线程个数
    /**< Maximum number of threads using this QS variable */


    //此处内存要动态申请
    //第一块是各线程状态计数
    //第二块是线程位图
    struct rte_rcu_qsbr_cnt qsbr_cnt[0] __rte_cache_aligned;
    /**< Quiescent state counter array of 'max_threads' elements */

    /**< Registered thread IDs are stored in a bitmap array,
     *   after the quiescent state counter array.
     */
} __rte_cache_aligned;

初始化

C
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
int
rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads) //初始化函数
{
    size_t sz;

    if (v == NULL) {
        rte_log(RTE_LOG_ERR, rte_rcu_log_type,
            "%s(): Invalid input parameter\n", __func__);
        rte_errno = EINVAL;

        return 1;
    }

    sz = rte_rcu_qsbr_get_memsize(max_threads); //根据线程个数计算内存
    if (sz == 1)
        return 1;

    /* Set all the threads to offline */
    memset(v, 0, sz);                               //全清空
    v->max_threads = max_threads;                   //记录最大线程个数
    v->num_elems = RTE_ALIGN_MUL_CEIL(max_threads,
            __RTE_QSBR_THRID_ARRAY_ELM_SIZE) /
            __RTE_QSBR_THRID_ARRAY_ELM_SIZE;        //关于64向上取整,而后除64
    v->token = __RTE_QSBR_CNT_INIT;
    v->acked_token = __RTE_QSBR_CNT_INIT - 1;

    return 0;
}

size_t
rte_rcu_qsbr_get_memsize(uint32_t max_threads) //计算qsbr占用内存
{
    size_t sz;

    if (max_threads == 0) {
        rte_log(RTE_LOG_ERR, rte_rcu_log_type,
            "%s(): Invalid max_threads %u\n",
            __func__, max_threads);
        rte_errno = EINVAL;

        return 1;
    }

    sz = sizeof(struct rte_rcu_qsbr); //结构体内存大小

    /* Add the size of quiescent state counter array */  
    sz += sizeof(struct rte_rcu_qsbr_cnt) * max_threads; //线程状态占用内存大小

    /* Add the size of the registered thread ID bitmap array */
    sz += __RTE_QSBR_THRID_ARRAY_SIZE(max_threads); //线程ID位图数组

    return sz;
}

读线程

线程注册

注册即记录线程

C
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
int
rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr *v, unsigned int thread_id)
{
    unsigned int i, id, success;
    uint64_t old_bmap, new_bmap;


    id = thread_id & __RTE_QSBR_THRID_MASK;                //低6位做偏移
    i = thread_id >> __RTE_QSBR_THRID_INDEX_SHIFT;         //高26位做索引

    old_bmap = __atomic_load_n(__RTE_QSBR_THRID_ARRAY_ELM(v, i),
                    __ATOMIC_RELAXED);                     //取出64bit位图
    if (old_bmap & 1UL << id)                              //若注册过,直接返回
        return 0;

    do {
        new_bmap = old_bmap | (1UL << id);                 //未注册过,更新位图
        success = __atomic_compare_exchange(
                    __RTE_QSBR_THRID_ARRAY_ELM(v, i),
                    &old_bmap, &new_bmap, 0,
                    __ATOMIC_RELEASE, __ATOMIC_RELAXED);    //写入位图

        if (success)                                        //写成功,读线程总数自增一
            __atomic_fetch_add(&v->num_threads,
                        1, __ATOMIC_RELAXED); 
        else if (old_bmap & (1UL << id))
            /* Someone else registered this thread.
             * Counter should not be incremented.
             */
            return 0;
    } while (success == 0);

    return 0;
}

去注册

C
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
int
rte_rcu_qsbr_thread_unregister(struct rte_rcu_qsbr *v, unsigned int thread_id)
{
    unsigned int i, id, success;
    uint64_t old_bmap, new_bmap;

    id = thread_id & __RTE_QSBR_THRID_MASK;        //低6位做偏移
    i = thread_id >> __RTE_QSBR_THRID_INDEX_SHIFT; //高26位做索引

    old_bmap = __atomic_load_n(__RTE_QSBR_THRID_ARRAY_ELM(v, i),
                    __ATOMIC_RELAXED);             //读取位图
    if (!(old_bmap & (1UL << id)))                 //未注册过直接返回
        return 0;

    do {
        new_bmap = old_bmap & ~(1UL << id);        //清空特定位

        success = __atomic_compare_exchange(
                    __RTE_QSBR_THRID_ARRAY_ELM(v, i),
                    &old_bmap, &new_bmap, 0,
                    __ATOMIC_RELEASE, __ATOMIC_RELAXED); //更新位图

        if (success)                                //注册线程数减一
            __atomic_fetch_sub(&v->num_threads,
                        1, __ATOMIC_RELAXED); 
        else if (!(old_bmap & (1UL << id)))
            return 0;
    } while (success == 0);

    return 0;
}

线程上线

C
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
static __rte_always_inline void
rte_rcu_qsbr_thread_online(struct rte_rcu_qsbr *v, unsigned int thread_id)
{
    uint64_t t;

    t = __atomic_load_n(&v->token, __ATOMIC_RELAXED); //读取token

    __atomic_store_n(&v->qsbr_cnt[thread_id].cnt,
        t, __ATOMIC_RELAXED);                         //记录token到线程变量

    rte_atomic_thread_fence(__ATOMIC_SEQ_CST);
}

线程下线

C
1
2
3
4
5
6
static __rte_always_inline void
rte_rcu_qsbr_thread_offline(struct rte_rcu_qsbr *v, unsigned int thread_id)
{
    __atomic_store_n(&v->qsbr_cnt[thread_id].cnt,
        __RTE_QSBR_CNT_THR_OFFLINE, __ATOMIC_RELEASE); //将线程token置零
}

状态上报

C
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
static __rte_always_inline void
rte_rcu_qsbr_quiescent(struct rte_rcu_qsbr *v, unsigned int thread_id)
{
    uint64_t t;

    t = __atomic_load_n(&v->token, __ATOMIC_ACQUIRE);       //读取最新token

    if (t != __atomic_load_n(&v->qsbr_cnt[thread_id].cnt, __ATOMIC_RELAXED)) //若token不一致
        __atomic_store_n(&v->qsbr_cnt[thread_id].cnt,
                     t, __ATOMIC_RELEASE);                                   //更新线程token

}

写线程

允许上报

C
1
2
3
4
5
6
7
8
9
static __rte_always_inline uint64_t
rte_rcu_qsbr_start(struct rte_rcu_qsbr *v)
{
    uint64_t t;

    t = __atomic_fetch_add(&v->token, 1, __ATOMIC_RELEASE) + 1; //更新token

    return t;
}

检测

C
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
static __rte_always_inline int
rte_rcu_qsbr_check(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
{
    if (likely(t <= v->acked_token)) {  //若当前检测token比读线程得都小,直接返回成功
        return 1;
    }

    if (likely(v->num_threads == v->max_threads))
        return __rte_rcu_qsbr_check_all(v, t, wait);          //检测所有读线程
    else
        return __rte_rcu_qsbr_check_selective(v, t, wait);    //检测特定线程
}


static __rte_always_inline int
__rte_rcu_qsbr_check_all(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
{
    uint32_t i;
    struct rte_rcu_qsbr_cnt *cnt;
    uint64_t c;
    uint64_t acked_token = __RTE_QSBR_CNT_MAX;

    for (i = 0, cnt = v->qsbr_cnt; i < v->max_threads; i++, cnt++) { //遍历所有线程

        while (1) {
            c = __atomic_load_n(&cnt->cnt, __ATOMIC_ACQUIRE);        //读取读线程得token

            if (likely(c == __RTE_QSBR_CNT_THR_OFFLINE || c >= t))  //若线程下线或token已更新,则跳出
                break;

            if (!wait)                                              //是否等待
                return 0;                                           //若不等,直接返回失败

            rte_pause();                                             //等一下
        }


        if (likely(c != __RTE_QSBR_CNT_THR_OFFLINE && acked_token > c)) //记录目前检测到最小token
            acked_token = c;
    }

    if (acked_token != __RTE_QSBR_CNT_MAX)                                //记录检测到得token
        __atomic_store_n(&v->acked_token, acked_token,
            __ATOMIC_RELAXED);

    return 1;
}

资源回收

入队

C
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
int rte_rcu_qsbr_dq_enqueue(struct rte_rcu_qsbr_dq *dq, void *e)
{
    __rte_rcu_qsbr_dq_elem_t *dq_elem;
    uint32_t cur_size;


    char data[dq->esize];
    dq_elem = (__rte_rcu_qsbr_dq_elem_t *)data;
    /* Start the grace period */
    dq_elem->token = rte_rcu_qsbr_start(dq->v); //更新token


    cur_size = rte_ring_count(dq->r);
    if (cur_size > dq->trigger_reclaim_limit) { //超过阈值触发回收
        rte_rcu_qsbr_dq_reclaim(dq, dq->max_reclaim_size,
                        NULL, NULL, NULL);
    }

    memcpy(dq_elem->elem, e, dq->esize - __RTE_QSBR_TOKEN_SIZE); //拷贝数据

    if (rte_ring_enqueue_elem(dq->r, data, dq->esize) != 0) { //入队
        return 1;
    }

    return 0;
}

回收

C
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
/* Reclaim resources from the defer queue. */
int
rte_rcu_qsbr_dq_reclaim(struct rte_rcu_qsbr_dq *dq, unsigned int n,
            unsigned int *freed, unsigned int *pending,
            unsigned int *available)
{
    uint32_t cnt;
    __rte_rcu_qsbr_dq_elem_t *dq_elem;

    cnt = 0;

    char data[dq->esize];

    while (cnt < n &&                                  //释放上限
        rte_ring_dequeue_bulk_elem_start(dq->r, &data,
                    dq->esize, 1, available) != 0) { //PEEK数据
        dq_elem = (__rte_rcu_qsbr_dq_elem_t *)data;

        if (rte_rcu_qsbr_check(dq->v, dq_elem->token, false) != 1) { //检测读线程是否进入静态
            rte_ring_dequeue_elem_finish(dq->r, 0);
            break;                                               //否,返回
        }
        rte_ring_dequeue_elem_finish(dq->r, 1);                  //是,

        dq->free_fn(dq->p, dq_elem->elem, 1);                    //释放资源

        cnt++;
    }

    if (freed != NULL)
        *freed = cnt;
    if (pending != NULL)
        *pending = rte_ring_count(dq->r);

    return 0;
}