Zero-copy mmap of libpcap

1. User space

After creating fd through socket(AF_PACKET,…), establish the receiving queue

//pcap-linux.c
static int
pcap_activate_linux(pcap_t *handle) {<!-- -->
...
ret = setup_mmapped(handle, & amp;status);
...
}

1.1 Set the default ring bufer size

static int
setup_mmapped(pcap_t *handle, int *status)
{<!-- -->
...
\t//1.
if (handle->opt.buffer_size == 0) {<!-- -->
/* by default request 2M for the ring buffer */
handle->opt.buffer_size = 2*1024*1024;
}
...
}

1.2 Determine tpacket version

Different versions have slightly different formats.

static int
setup_mmapped(pcap_t *handle, int *status)
{<!-- -->
...
\t//1.
...
\t//2.
ret = prepare_tpacket_socket(handle);
...
}

1.3 Create ring

static int
setup_mmapped(pcap_t *handle, int *status)
{<!-- -->
...
\t//1.
...
\t//2.
...
//3.
ret = create_ring(handle, status);
...
}

Calculate block_size, frame_size, etc.

Take TPACKET3 as an example

static int
create_ring(pcap_t *handle, int *status)
{<!-- -->
struct pcap_linux *handlep = handle->priv;
unsigned i, j, frames_per_block;
...
struct tpacket_req3 req;
...
socklen_t len;
unsigned int sk_type, tp_reserve, maclen, tp_hdrlen, netoff, macoff;
unsigned int frame_size;
...

switch (handlep->tp_version) {<!-- -->

case TPACKET_V2:
...
break;

case TPACKET_V3:
req.tp_frame_size = MAXIMUM_SNAPLEN;
req.tp_frame_nr = (handle->opt.buffer_size + req.tp_frame_size - 1)/req.tp_frame_size;
break;
default:
...
*status = PCAP_ERROR;
return -1;
}
\t
req.tp_block_size = getpagesize();
while (req.tp_block_size < req.tp_frame_size)
req.tp_block_size <<= 1;

...

Request the kernel to create a receive queue ring buffer

 ...
frames_per_block = req.tp_block_size/req.tp_frame_size;
...
/* ask the kernel to create the ring */
retry:
req.tp_block_nr = req.tp_frame_nr / frames_per_block;

/* req.tp_frame_nr is requested to match frames_per_block*req.tp_block_nr */
req.tp_frame_nr = req.tp_block_nr * frames_per_block;
...
if (setsockopt(handle->fd, SOL_PACKET, PACKET_RX_RING,
(void *) & amp;req, sizeof(req))) {<!-- -->
...
}

Map the ring buffer created by the kernel to user space

 /* memory map the rx ring */
handlep->mmapbuflen = req.tp_block_nr * req.tp_block_size;
handlep->mmapbuf = mmap(0, handlep->mmapbuflen,
PROT_READ|PROT_WRITE, MAP_SHARED, handle->fd, 0);
...

Initialize the mapping space header of user mode

...
/* fill the header ring with proper frame ptr*/
handle->offset = 0;
for (i=0; i<req.tp_block_nr; + + i) {<!-- -->
u_char *base = &handlep->mmapbuf[i*req.tp_block_size];
for (j=0; j<frames_per_block; + + j, + + handle->offset) {<!-- -->
RING_GET_CURRENT_FRAME(handle) = base;
base + = req.tp_frame_size;
}
}

handle->bufsize = req.tp_frame_size;
handle->offset = 0;
...

2. Kernel space

2.1 Create rx ring

// net/packet/af_packet.c
static int
packet_setsockopt(struct socket *sock, int level, int optname, char __user *optval, unsigned int optlen)
{<!-- -->
struct sock *sk = sock->sk;
struct packet_sock *po = pkt_sk(sk);
int ret;

if (level != SOL_PACKET)
return -ENOPROTOOPT;

switch (optname) {<!-- -->
...
case PACKET_RX_RING:
case PACKET_TX_RING:
{<!-- -->
union tpacket_req_u req_u;
int len;

lock_sock(sk);
switch (po->tp_version) {<!-- -->
case TPACKET_V1:
case TPACKET_V2:
len = sizeof(req_u.req);
break;
case TPACKET_V3:
default:
len = sizeof(req_u.req3);
break;
}
if (optlen < len) {<!-- -->
ret = -EINVAL;
} else {<!-- -->
if (copy_from_user( & amp;req_u.req, optval, len))
ret = -EFAULT;
else
ret = packet_set_ring(sk, & amp;req_u, 0,
optname == PACKET_TX_RING);
}
release_sock(sk);
return ret;
}
\t
default:
return -ENOPROTOOPT;
}
...
}

Allocate space

static int packet_set_ring(struct sock *sk, union tpacket_req_u *req_u,
int closing, int tx_ring)
{<!-- -->
struct pgv *pg_vec = NULL;
struct packet_sock *po = pkt_sk(sk);
...
int was_running, order = 0;
...
/* Added to avoid minimal code churn */
struct tpacket_req *req = & amp;req_u->req;

...
order = get_order(req->tp_block_size);
pg_vec = alloc_pg_vec(req, order);

Initialization

...
switch (po->tp_version) {<!-- -->
case TPACKET_V3:
/* Block transmit is not supported yet */
if (!tx_ring) {<!-- -->
init_prb_bdqc(po, rb, pg_vec, req_u);
} else {<!-- -->
...
}
break;
default:
break;
}

Put new space on the queue

mutex_lock( & amp;po->pg_vec_lock);
if (closing || atomic_read( & amp;po->mapped) == 0) {<!-- -->
err = 0;
spin_lock_bh( & amp;rb_queue->lock);
swap(rb->pg_vec, pg_vec);
rb->frame_max = (req->tp_frame_nr - 1);
rb->head = 0;
rb->frame_size = req->tp_frame_size;
spin_unlock_bh( & amp;rb_queue->lock);

swap(rb->pg_vec_order, order);
swap(rb->pg_vec_len, req->tp_block_nr);

rb->pg_vec_pages = req->tp_block_size/PAGE_SIZE;
po->prot_hook.func = (po->rx_ring.pg_vec) ?
tpacket_rcv : packet_rcv;
skb_queue_purge(rb_queue);
if (atomic_read( & amp;po->mapped))
pr_err("packet_mmap: vma is busy: %d\\
",
atomic_read( & amp;po->mapped));
}
mutex_unlock( & amp;po->pg_vec_lock);

2.2 Mapping queue space to user space

static int packet_mmap(struct file *file, struct socket *sock,
struct vm_area_struct *vma)
{<!-- -->
struct sock *sk = sock->sk;
struct packet_sock *po = pkt_sk(sk);
unsigned long size, expected_size;
struct packet_ring_buffer *rb;
unsigned long start;
int err = -EINVAL;
int i;

...

start = vma->vm_start;
for (rb = & amp;po->rx_ring; rb <= & amp;po->tx_ring; rb + + ) {<!-- -->
if (rb->pg_vec == NULL)
continue;

for (i = 0; i < rb->pg_vec_len; i + + ) {<!-- -->
struct page *page;
void *kaddr = rb->pg_vec[i].buffer;
int pg_num;

for (pg_num = 0; pg_num < rb->pg_vec_pages; pg_num + + ) {<!-- -->
page = pgv_to_page(kaddr);
err = vm_insert_page(vma, start, page);
if (unlikely(err))
goto out;
start + = PAGE_SIZE;
kaddr + = PAGE_SIZE;
}
}
}

atomic_inc( & amp;po->mapped);
vma->vm_ops = & amp;packet_mmap_ops;
err = 0;

out:
mutex_unlock( & amp;po->pg_vec_lock);
return err;
}