rte_ring结构是报文的缓存结构,Masscan实现时定义了transmit_thread,receive_thread两个线程。transmit线程发送探测报文,receive_thread处理接收报文。在有些场景下,处理报文后需要发送交互报文,但reveive_thread并不发送报文,而是将报文添加到缓存中,由transmit_thread负责发送报文。缓存使用rte_ring定义,如下所示:
struct rte_ring {int flags;
/**< Flags supplied at creation. *//** Ring producer status. */struct prod {uint32_t watermark;
/**< Maximum items before EDQUOT. */uint32_t sp_enqueue;
/**< True, if single producer. */uint32_t size;
/**< Size of ring. */uint32_t mask;
/**< Mask (size-1) of ring. */volatile uint32_t head;
/**< Producer head. */volatile uint32_t tail;
/**< Producer tail. */} prod __rte_cache_aligned;
/** Ring consumer status. */struct cons {uint32_t sc_dequeue;
/**< True, if single consumer. */uint32_t size;
/**< Size of the ring. */uint32_t mask;
/**< Mask (size-1) of ring. */volatile uint32_t head;
/**< Consumer head. */volatile uint32_t tail;
/**< Consumer tail. */} cons __rte_cache_aligned;
#ifdef RTE_LIBRTE_RING_DEBUGstruct rte_ring_debug_stats stats[RTE_MAX_LCORE];
#endifvoid * volatile ring[1] \__rte_cache_aligned;
/**< Memory space of ring starts here. */};
ring为指针数组,指针指向缓存的报文。
rte_ring_create函数创建rte_ring结构,申请内存,并对成员进行初始化。count为需要报文的数量。
r->flags = flags;
r->prod.watermark = count;
r->prod.sp_enqueue = !!(flags & RING_F_SP_ENQ);
r->cons.sc_dequeue = !!(flags & RING_F_SC_DEQ);
r->prod.size = r->cons.size = count;
r->prod.mask = r->cons.mask = count-1;
r->prod.head = r->cons.head = 0;
r->prod.tail = r->cons.tail = 0;
对缓存的处理采用了生产者-消费者模式,prod为生产者,cons为消费者。Pond定义缓存队列的头,cons定义了尾。prod和cons分别有head、tail指针,是为了多线程并行处理。多数情况下,prod.head==prod.tail,cons.head==cons.tail,如果两者不相等,表示存在线程进行入队或出队操作。
代码通过下面四个函数进行处理。
__rte_ring_mp_do_enqueue
将若干个对象插入缓存队列,线程安全。
__rte_ring_sp_do_enqueue
将若干个对象插入缓存队列。
__rte_ring_mc_do_dequeue
从队列中取出若干个对象,线程安全。
__rte_ring_sc_do_dequeue
从队列中取出若干个对象。
多线程处理时,实现了一种类似于自旋锁的机制实现空间分配。函数进入循环,循环先判断剩余空间是否满足要求,若满足要求,则使用原子操作修改r->prod.head,分配空间。
success = rte_atomic32_cmpset(&r->prod.head, prod_head,
prod_next);
局部变量prod_head初始化为r->prod.head,如果两者不相等,表明有进程正在分配空间,进入下一次循环。退出循环时,则表明成功分配了空间。
因为采用生产者消费者模式,对r->prod.tail的修改也需要互斥处理。代码如下:
while (unlikely(r->prod.tail != prod_head))rte_pause();
r->prod.tail = prod_next;
【Masscan缓存结构rte_ring分析】上面的代码保证了对r->prod.tail的修改按照获取缓存时的顺序进行。缓存是环状结构,只有前面分配到空间的线程copy完成之后,才能修改r->prod.tail。
推荐阅读
- Linux|CONFIG_RTC_SYSTOHC
- shiro|shiro中session实现的简单分析
- Android相关|butterknife及其背后的代码生成技术
- java|ReentrantLock详解
- java|CountDownLatch
- java|LockSupport