DPDK源码解读-------ring结构体

RTE RING结构体内存挂接 挂接在全局rte_ring_tailq尾队列链表上

ring_list = RTE_TAILQ_CAST(rte_ring_tailq.head, rte_ring_list);

下面是相关的结构体声明及定义
/*尾队列表头TAILQ_HEAD 宏定义*/ #define TAILQ_HEAD(name, type)\ struct name {\ struct type *tqh_first; /* first element */\ struct type **tqh_last; /* addr of last next element */\ } /*声明 rte_tailq_entry_head 结构体*/ TAILQ_HEAD(rte_tailq_entry_head, rte_tailq_entry); /*声明 rte_ring_list 结构体*/ TAILQ_HEAD(rte_ring_list, rte_tailq_entry); /*可以看出来上面两个结构体是同一类型*//*尾队列表头TAILQ_ENTRY 宏定义*/ #define TAILQ_ENTRY(type)\ __MISMATCH_TAGS_PUSH\ struct {\ struct type *tqe_next; /* next element */\ struct type **tqe_prev; /* address of previous next element */\ TRACEBUF\ } struct rte_tailq_entry { TAILQ_ENTRY(rte_tailq_entry) next; /**< Pointer entries for a tailq list */ void *data; /**< Pointer to the data referenced by this tailq entry */ }; /*定义全局rte_ring_tailq尾队列*/ static struct rte_tailq_elem rte_ring_tailq = { .name = RTE_TAILQ_RING_NAME, }; struct rte_tailq_entry *te = rte_zmalloc("RING_TAILQ_ENTRY", sizeof(*te), 0);

下面是内存分布及挂接方式:
DPDK源码解读-------ring结构体
文章图片

/*上图标红的是尾队列插入时的顺序*/ TAILQ_INSERT_TAIL(ring_list, te, next); #define TAILQ_INSERT_TAIL(head, elm, field) do {\ TAILQ_NEXT((elm), field) = NULL; 1\ (elm)->field.tqe_prev = (head)->tqh_last; 2\ *(head)->tqh_last = (elm); 3\ (head)->tqh_last = &TAILQ_NEXT((elm), field); 4\ QMD_TRACE_HEAD(head); \ QMD_TRACE_ELEM(&(elm)->field); \ } while (0)

##RTE_ring结构体字段描述:
DPDK源码解读-------ring结构体
文章图片

ret_ring 创建 rte ring的创建是通过rte_ring_create()函数来实现的,具体代码分析如下:
函数参数分析:
1、socket_id这里的socket不是unix网络编程中的socket,而是指的numa节点,numa架构下,如果processer访问的内存和自己不在一个numa node上会产生非常严重的性能损耗。
2、flags决定了这个队列的性质,也就是是“什么性质的安全”,例如如果指定RING_F_SP_ENQ那么就会创建一个单生产者安全的队列(实际上完全是扯淡,创建时的flags实际上影响的并不是队列本身的性质而是调用队列的函数__rte_ring_do_enqueue参数)
/* create the ring */ struct rte_ring * rte_ring_create(const char *name, unsigned count, int socket_id, unsigned flags) { char mz_name[RTE_MEMZONE_NAMESIZE]; struct rte_ring *r; struct rte_tailq_entry *te; const struct rte_memzone *mz; ssize_t ring_size; int mz_flags = 0; struct rte_ring_list* ring_list = NULL; const unsigned int requested_count = count; int ret; /*首先找到ring_list,rte_ring_tailq是维护所有rte ring队列的,不区分socket*/ ring_list = RTE_TAILQ_CAST(rte_ring_tailq.head, rte_ring_list); /* for an exact size ring, round up from count to a power of two */ if (flags & RING_F_EXACT_SZ) count = rte_align32pow2(count + 1); /*获取ring的大小空间*/ ring_size = rte_ring_get_memsize(count); if (ring_size < 0) { rte_errno = ring_size; return NULL; } ret = snprintf(mz_name, sizeof(mz_name), "%s%s", RTE_RING_MZ_PREFIX, name); if (ret < 0 || ret >= (int)sizeof(mz_name)) { rte_errno = ENAMETOOLONG; return NULL; }/*分配一个struct rte_tailq_entry *te; 结构,在创建完成ring后,挂接这个队列元素到队列中去*/ te = rte_zmalloc("RING_TAILQ_ENTRY", sizeof(*te), 0); if (te == NULL) { RTE_LOG(ERR, RING, "Cannot reserve memory for tailq\n"); rte_errno = ENOMEM; return NULL; } rte_rwlock_write_lock(RTE_EAL_TAILQ_RWLOCK); /* reserve a memory zone for this ring. If we can't get rte_config or * we are secondary process, the memzone_reserve function will set * rte_errno for us appropriately - hence no check in this this function * 为新创建的ring分配内存空间咯,使用了rte_memzone_reserve_aligned()函数分配*/ mz = rte_memzone_reserve_aligned(mz_name, ring_size, socket_id, mz_flags, __alignof__(*r)); if (mz != NULL) { r = mz->addr; /* no need to check return value here, we already checked the * arguments above 队列初始化 */ rte_ring_init(r, name, requested_count, flags); te->data = https://www.it610.com/article/(void *) r; r->memzone = mz; /*te挂接到全局尾队列上,便于统一管理*/ TAILQ_INSERT_TAIL(ring_list, te, next); } else { r = NULL; RTE_LOG(ERR, RING, "Cannot reserve memory\n"); rte_free(te); } rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK); return r; }

ring队列入队操作
static __rte_always_inline unsigned int __rte_ring_do_enqueue(struct rte_ring *r, void * const *obj_table, unsigned int n, enum rte_ring_queue_behavior behavior, unsigned int is_sp, unsigned int *free_space) { uint32_t prod_head, prod_next; uint32_t free_entries; //第一步,先偏移头指针,抢占生产位置 n = __rte_ring_move_prod_head(r, is_sp, n, behavior, &prod_head, &prod_next, &free_entries); if (n == 0) goto end; //第二步,塞数据 ENQUEUE_PTRS(r, &r[1], prod_head, obj_table, n, void *); //第三部,更新尾指针,让消费者可以消费 update_tail(&r->prod, prod_head, prod_next, is_sp, 1); end: if (free_space != NULL) *free_space = free_entries - n; return n; }static __rte_always_inline unsigned int __rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp, unsigned int n, enum rte_ring_queue_behavior behavior, uint32_t *old_head, uint32_t *new_head, uint32_t *free_entries) { const uint32_t capacity = r->capacity; unsigned int max = n; int success; do { //1.先确定生产者要生产多少个元素 n = max; //2.拿到现在生产者的head位置,也就是即将生产的位置 *old_head = r->prod.head; //内存屏障 rte_smp_rmb(); //3.计算剩余的空间 *free_entries = (capacity + r->cons.tail - *old_head); //4.比较生产的元素个数和剩余空间 if (unlikely(n > *free_entries)) n = (behavior == RTE_RING_QUEUE_FIXED) ? : *free_entries; if (n == 0) return 0; //5.计算生产后的新位置 *new_head = *old_head + n; if (is_sp) r->prod.head = *new_head, success = 1; else //6.如果是多生产者的话调用cpmset函数实现生产位置抢占 success = rte_atomic32_cmpset(&r->prod.head, *old_head, *new_head); } while (unlikely(success == 0)); return n; } static __rte_always_inline void update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t new_val, uint32_t single, uint32_t enqueue) { //1.内存屏障 if (enqueue) rte_smp_wmb(); else rte_smp_rmb(); //2.如果有其他生产者生产数据,那么需要等待其将数据生产完更新tail指针后,本生产者才能更新tail指针 if (!single) while (unlikely(ht->tail != old_val)) rte_pause(); //3.更新tail指针,更新的位置为最新的生产位置,意味着刚刚生产的数据已经全部可以被消费者消费 ht->tail = new_val; }

ring队列出队操作
static __rte_always_inline unsigned int __rte_ring_do_dequeue(struct rte_ring *r, void **obj_table, unsigned int n, enum rte_ring_queue_behavior behavior, unsigned int is_sc, unsigned int *available) { uint32_t cons_head, cons_next; uint32_t entries; n = __rte_ring_move_cons_head(r, (int)is_sc, n, behavior, &cons_head, &cons_next, &entries); if (n == 0) goto end; DEQUEUE_PTRS(r, &r[1], cons_head, obj_table, n, void *); update_tail(&r->cons, cons_head, cons_next, is_sc, 0); end: if (available != NULL) *available = entries - n; return n; }

【DPDK源码解读-------ring结构体】参考资料:
1、https://www.cnblogs.com/jungle1996/p/12194243.html

    推荐阅读