掘金 后端 ( ) • 2024-04-02 17:07

简介

DPDK中ring(无锁环形队列)在mempool(内存池)中使用,同时常用于异步消息通信的场景,这都可以归功于其独特的结构设计,在保证使用方便的同时也保障了高性能

特点

  • FIFO(First In First Out),即先进先出队列
  • Maximum size is fixed, the objects are stored in a table,固定大小
  • Objects can be pointers or elements of multiple of 4 byte size,
  • Lockless implementation,无锁实现
  • Multi-consumer or single-consumer dequeue,多消费者、单消费者出队列
  • Multi-producer or single-producer enqueue,多生产者、单生产者如队列
  • Bulk dequeue - Dequeues the specified count of objects if successful; otherwise fails,批量出队列
  • Bulk enqueue - Enqueues the specified count of objects if successful; otherwise fails,批量入队列
  • Burst dequeue - Dequeue the maximum available objects if the specified count cannot be fulfilled,全部出队列
  • Burst enqueue - Enqueue the maximum available objects if the specified count cannot be fulfilled,全部入队列

结构

  • 消费者头尾指针,在消费者出队列时更新。
  • 生产者头尾指针,在生产者入队列时更新。

image.png

示例

函数源码

#include <rte_errno.h>
#include <rte_ring.h>

#define RING_SIZE 64

static void ring_test(void)
{
	printf("ring test\n");
	int ret;
	struct rte_ring *rt;
	// 使用单生产者入队列、单消费者出队列模式
	rt = rte_ring_create("ring_test", RING_SIZE, rte_socket_id(), RING_F_SP_ENQ | RING_F_SC_DEQ);
	if (rt == NULL)
		rte_exit(EXIT_FAILURE, "%s\n", rte_strerror(rte_errno));

	// 打印队列信息
	rte_ring_dump(stdout, rt);

	// 消息入队列
	int msg[8] = {1,2,3,4};	
	char msg2[8] = "hello";	
	rte_ring_enqueue(rt, msg);
	rte_ring_enqueue(rt, msg2);

	// 打印队列信息
	rte_ring_dump(stdout, rt);

	// 消息出队列
	int *obj = NULL;
	ret = rte_ring_dequeue(rt, (void **)&obj);
	if (ret == 0) {
		printf("obj:%d,%d,%d,%d,%d,%d,%d,%d\n", *obj, *(obj+1), *(obj+2), *(obj+3), *(obj+4), *(obj+5), *(obj+6), *(obj+7));
	}
	
	// 消息出队列
	char *obj2 = NULL;
	ret = rte_ring_dequeue(rt, (void **)&obj2);
	if (ret == 0) {
		printf("obj2:%c,%c,%c,%c,%c,%c,%c,%c\n", *obj2, *(obj2+1), *(obj2+2), *(obj2+3), *(obj2+4), *(obj2+5), *(obj2+6), *(obj2+7));
	}

	// 打印队列信息
	rte_ring_dump(stdout, rt);
}

运行结果

ring test
ring <ring_test>@0x1003fb6c0
  flags=3
  size=64
  capacity=63
  ct=0
  ch=0
  pt=0
  ph=0
  used=0
  avail=63
ring <ring_test>@0x1003fb6c0
  flags=3
  size=64
  capacity=63
  ct=0
  ch=0
  pt=2
  ph=2
  used=2
  avail=61
obj:1,2,3,4,0,0,0,0
obj2:h,e,l,l,o,,,
ring <ring_test>@0x1003fb6c0
  flags=3
  size=64
  capacity=63
  ct=2
  ch=2
  pt=2
  ph=2
  used=0
  avail=63

结构分析

初始化

  • 使用memzone_reserve()分配内存
  • 使用rte_ring_init()初始化一个空队列
  • count为队列的size,必须是2的倍数,实际队列的大小为count-1,用于区分满队列和空队列
struct rte_ring * rte_ring_create (const char *name, unsigned int count, int socket_id, unsigned int flags)

入队列

  • 将一个obj进行入队列
  • 根据flag决定调用多生产者、还是单生产者函数
  • 生产者头尾指针会更新
static __rte_always_inline int rte_ring_enqueue (struct rte_ring *r, void *obj)

image.png

入队实现

  1. 第一步

ring->prod_head and ring->cons_tail are copied in local variables. The prod_next local variable points to the next element of the table image.png

  1. 第二步

modify ring->prod_head in ring structure to point to the same location as prod_next.

The added object is copied in the ring (obj4).

image.png

  1. 第三步

ring->prod_tail in the ring structure is modified to point to the same location as ring->prod_head

image.png

出队列

  • 将一个obj进行出队列
  • 根据flag决定调用多消费者、还是单消费者函数
  • 消费者头尾指针会更新
static __rte_always_inline int rte_ring_dequeue (struct rte_ring *r, void **obj_p)
  1. 第一步

ring->cons_head and ring->prod_tail are copied in local variables. The cons_next local variable points to the next element of the table

image.png

  1. 第二步

to modify ring->cons_head in the ring structure to point to the same location as cons_next.

The dequeued object (obj1) is copied in the pointer given by the user

image.png

  1. 第三步

ring->cons_tail in the ring structure is modified to point to the same location as ring->cons_head image.png

总结

  • 介绍了SP/SC单生产者入队列和单消费者出队列的使用和实现
  • 默认模式为MP/MC多生产者入队列和多消费者出队列
  • 其他模式还有MP_RTS/MC_RTSMP_HTS/MC_HTS
  • 多核并发通过CAS机制如rte_atomic32_cmpset()来实现

参考

DPDK Ring API

DPDK Ring Library