diff options
Diffstat (limited to 'net/xdp')
-rw-r--r-- | net/xdp/xdp_umem.c | 93 | ||||
-rw-r--r-- | net/xdp/xsk.c | 483 | ||||
-rw-r--r-- | net/xdp/xsk.h | 13 | ||||
-rw-r--r-- | net/xdp/xsk_diag.c | 5 | ||||
-rw-r--r-- | net/xdp/xsk_queue.c | 15 | ||||
-rw-r--r-- | net/xdp/xsk_queue.h | 344 |
6 files changed, 690 insertions, 263 deletions
diff --git a/net/xdp/xdp_umem.c b/net/xdp/xdp_umem.c index 83de74ca729a..fa7bb5e060d0 100644 --- a/net/xdp/xdp_umem.c +++ b/net/xdp/xdp_umem.c @@ -14,6 +14,7 @@ #include <linux/netdevice.h> #include <linux/rtnetlink.h> #include <linux/idr.h> +#include <linux/vmalloc.h> #include "xdp_umem.h" #include "xsk_queue.h" @@ -26,6 +27,9 @@ void xdp_add_sk_umem(struct xdp_umem *umem, struct xdp_sock *xs) { unsigned long flags; + if (!xs->tx) + return; + spin_lock_irqsave(&umem->xsk_list_lock, flags); list_add_rcu(&xs->list, &umem->xsk_list); spin_unlock_irqrestore(&umem->xsk_list_lock, flags); @@ -35,6 +39,9 @@ void xdp_del_sk_umem(struct xdp_umem *umem, struct xdp_sock *xs) { unsigned long flags; + if (!xs->tx) + return; + spin_lock_irqsave(&umem->xsk_list_lock, flags); list_del_rcu(&xs->list); spin_unlock_irqrestore(&umem->xsk_list_lock, flags); @@ -105,14 +112,22 @@ int xdp_umem_assign_dev(struct xdp_umem *umem, struct net_device *dev, umem->dev = dev; umem->queue_id = queue_id; + if (flags & XDP_USE_NEED_WAKEUP) { + umem->flags |= XDP_UMEM_USES_NEED_WAKEUP; + /* Tx needs to be explicitly woken up the first time. + * Also for supporting drivers that do not implement this + * feature. They will always have to call sendto(). + */ + xsk_set_tx_need_wakeup(umem); + } + dev_hold(dev); if (force_copy) /* For copy-mode, we are done. */ return 0; - if (!dev->netdev_ops->ndo_bpf || - !dev->netdev_ops->ndo_xsk_async_xmit) { + if (!dev->netdev_ops->ndo_bpf || !dev->netdev_ops->ndo_xsk_wakeup) { err = -EOPNOTSUPP; goto err_unreg_umem; } @@ -164,17 +179,41 @@ void xdp_umem_clear_dev(struct xdp_umem *umem) umem->zc = false; } -static void xdp_umem_unpin_pages(struct xdp_umem *umem) +static void xdp_umem_unmap_pages(struct xdp_umem *umem) { unsigned int i; + for (i = 0; i < umem->npgs; i++) + if (PageHighMem(umem->pgs[i])) + vunmap(umem->pages[i].addr); +} + +static int xdp_umem_map_pages(struct xdp_umem *umem) +{ + unsigned int i; + void *addr; + for (i = 0; i < umem->npgs; i++) { - struct page *page = umem->pgs[i]; + if (PageHighMem(umem->pgs[i])) + addr = vmap(&umem->pgs[i], 1, VM_MAP, PAGE_KERNEL); + else + addr = page_address(umem->pgs[i]); + + if (!addr) { + xdp_umem_unmap_pages(umem); + return -ENOMEM; + } - set_page_dirty_lock(page); - put_page(page); + umem->pages[i].addr = addr; } + return 0; +} + +static void xdp_umem_unpin_pages(struct xdp_umem *umem) +{ + unpin_user_pages_dirty_lock(umem->pgs, umem->npgs, true); + kfree(umem->pgs); umem->pgs = NULL; } @@ -207,9 +246,10 @@ static void xdp_umem_release(struct xdp_umem *umem) xsk_reuseq_destroy(umem); + xdp_umem_unmap_pages(umem); xdp_umem_unpin_pages(umem); - kfree(umem->pages); + kvfree(umem->pages); umem->pages = NULL; xdp_umem_unaccount_pages(umem); @@ -251,7 +291,7 @@ static int xdp_umem_pin_pages(struct xdp_umem *umem) return -ENOMEM; down_read(¤t->mm->mmap_sem); - npgs = get_user_pages(umem->address, umem->npgs, + npgs = pin_user_pages(umem->address, umem->npgs, gup_flags | FOLL_LONGTERM, &umem->pgs[0], NULL); up_read(¤t->mm->mmap_sem); @@ -299,10 +339,11 @@ static int xdp_umem_account_pages(struct xdp_umem *umem) static int xdp_umem_reg(struct xdp_umem *umem, struct xdp_umem_reg *mr) { + bool unaligned_chunks = mr->flags & XDP_UMEM_UNALIGNED_CHUNK_FLAG; u32 chunk_size = mr->chunk_size, headroom = mr->headroom; unsigned int chunks, chunks_per_page; u64 addr = mr->addr, size = mr->len; - int size_chk, err, i; + int size_chk, err; if (chunk_size < XDP_UMEM_MIN_CHUNK_SIZE || chunk_size > PAGE_SIZE) { /* Strictly speaking we could support this, if: @@ -314,7 +355,11 @@ static int xdp_umem_reg(struct xdp_umem *umem, struct xdp_umem_reg *mr) return -EINVAL; } - if (!is_power_of_2(chunk_size)) + if (mr->flags & ~(XDP_UMEM_UNALIGNED_CHUNK_FLAG | + XDP_UMEM_USES_NEED_WAKEUP)) + return -EINVAL; + + if (!unaligned_chunks && !is_power_of_2(chunk_size)) return -EINVAL; if (!PAGE_ALIGNED(addr)) { @@ -331,24 +376,26 @@ static int xdp_umem_reg(struct xdp_umem *umem, struct xdp_umem_reg *mr) if (chunks == 0) return -EINVAL; - chunks_per_page = PAGE_SIZE / chunk_size; - if (chunks < chunks_per_page || chunks % chunks_per_page) - return -EINVAL; - - headroom = ALIGN(headroom, 64); + if (!unaligned_chunks) { + chunks_per_page = PAGE_SIZE / chunk_size; + if (chunks < chunks_per_page || chunks % chunks_per_page) + return -EINVAL; + } size_chk = chunk_size - headroom - XDP_PACKET_HEADROOM; if (size_chk < 0) return -EINVAL; umem->address = (unsigned long)addr; - umem->chunk_mask = ~((u64)chunk_size - 1); + umem->chunk_mask = unaligned_chunks ? XSK_UNALIGNED_BUF_ADDR_MASK + : ~((u64)chunk_size - 1); umem->size = size; umem->headroom = headroom; umem->chunk_size_nohr = chunk_size - headroom; umem->npgs = size / PAGE_SIZE; umem->pgs = NULL; umem->user = NULL; + umem->flags = mr->flags; INIT_LIST_HEAD(&umem->xsk_list); spin_lock_init(&umem->xsk_list_lock); @@ -362,17 +409,21 @@ static int xdp_umem_reg(struct xdp_umem *umem, struct xdp_umem_reg *mr) if (err) goto out_account; - umem->pages = kcalloc(umem->npgs, sizeof(*umem->pages), GFP_KERNEL); + umem->pages = kvcalloc(umem->npgs, sizeof(*umem->pages), + GFP_KERNEL_ACCOUNT); if (!umem->pages) { err = -ENOMEM; - goto out_account; + goto out_pin; } - for (i = 0; i < umem->npgs; i++) - umem->pages[i].addr = page_address(umem->pgs[i]); + err = xdp_umem_map_pages(umem); + if (!err) + return 0; - return 0; + kvfree(umem->pages); +out_pin: + xdp_umem_unpin_pages(umem); out_account: xdp_umem_unaccount_pages(umem); return err; diff --git a/net/xdp/xsk.c b/net/xdp/xsk.c index 59b57d708697..df600487a68d 100644 --- a/net/xdp/xsk.c +++ b/net/xdp/xsk.c @@ -31,6 +31,8 @@ #define TX_BATCH_SIZE 16 +static DEFINE_PER_CPU(struct list_head, xskmap_flush_list); + bool xsk_is_setup_for_bpf_map(struct xdp_sock *xs) { return READ_ONCE(xs->rx) && READ_ONCE(xs->umem) && @@ -39,37 +41,119 @@ bool xsk_is_setup_for_bpf_map(struct xdp_sock *xs) bool xsk_umem_has_addrs(struct xdp_umem *umem, u32 cnt) { - return xskq_has_addrs(umem->fq, cnt); + return xskq_cons_has_entries(umem->fq, cnt); } EXPORT_SYMBOL(xsk_umem_has_addrs); -u64 *xsk_umem_peek_addr(struct xdp_umem *umem, u64 *addr) +bool xsk_umem_peek_addr(struct xdp_umem *umem, u64 *addr) { - return xskq_peek_addr(umem->fq, addr); + return xskq_cons_peek_addr(umem->fq, addr, umem); } EXPORT_SYMBOL(xsk_umem_peek_addr); -void xsk_umem_discard_addr(struct xdp_umem *umem) +void xsk_umem_release_addr(struct xdp_umem *umem) { - xskq_discard_addr(umem->fq); + xskq_cons_release(umem->fq); +} +EXPORT_SYMBOL(xsk_umem_release_addr); + +void xsk_set_rx_need_wakeup(struct xdp_umem *umem) +{ + if (umem->need_wakeup & XDP_WAKEUP_RX) + return; + + umem->fq->ring->flags |= XDP_RING_NEED_WAKEUP; + umem->need_wakeup |= XDP_WAKEUP_RX; +} +EXPORT_SYMBOL(xsk_set_rx_need_wakeup); + +void xsk_set_tx_need_wakeup(struct xdp_umem *umem) +{ + struct xdp_sock *xs; + + if (umem->need_wakeup & XDP_WAKEUP_TX) + return; + + rcu_read_lock(); + list_for_each_entry_rcu(xs, &umem->xsk_list, list) { + xs->tx->ring->flags |= XDP_RING_NEED_WAKEUP; + } + rcu_read_unlock(); + + umem->need_wakeup |= XDP_WAKEUP_TX; +} +EXPORT_SYMBOL(xsk_set_tx_need_wakeup); + +void xsk_clear_rx_need_wakeup(struct xdp_umem *umem) +{ + if (!(umem->need_wakeup & XDP_WAKEUP_RX)) + return; + + umem->fq->ring->flags &= ~XDP_RING_NEED_WAKEUP; + umem->need_wakeup &= ~XDP_WAKEUP_RX; +} +EXPORT_SYMBOL(xsk_clear_rx_need_wakeup); + +void xsk_clear_tx_need_wakeup(struct xdp_umem *umem) +{ + struct xdp_sock *xs; + + if (!(umem->need_wakeup & XDP_WAKEUP_TX)) + return; + + rcu_read_lock(); + list_for_each_entry_rcu(xs, &umem->xsk_list, list) { + xs->tx->ring->flags &= ~XDP_RING_NEED_WAKEUP; + } + rcu_read_unlock(); + + umem->need_wakeup &= ~XDP_WAKEUP_TX; +} +EXPORT_SYMBOL(xsk_clear_tx_need_wakeup); + +bool xsk_umem_uses_need_wakeup(struct xdp_umem *umem) +{ + return umem->flags & XDP_UMEM_USES_NEED_WAKEUP; +} +EXPORT_SYMBOL(xsk_umem_uses_need_wakeup); + +/* If a buffer crosses a page boundary, we need to do 2 memcpy's, one for + * each page. This is only required in copy mode. + */ +static void __xsk_rcv_memcpy(struct xdp_umem *umem, u64 addr, void *from_buf, + u32 len, u32 metalen) +{ + void *to_buf = xdp_umem_get_data(umem, addr); + + addr = xsk_umem_add_offset_to_addr(addr); + if (xskq_cons_crosses_non_contig_pg(umem, addr, len + metalen)) { + void *next_pg_addr = umem->pages[(addr >> PAGE_SHIFT) + 1].addr; + u64 page_start = addr & ~(PAGE_SIZE - 1); + u64 first_len = PAGE_SIZE - (addr - page_start); + + memcpy(to_buf, from_buf, first_len + metalen); + memcpy(next_pg_addr, from_buf + first_len, len - first_len); + + return; + } + + memcpy(to_buf, from_buf, len + metalen); } -EXPORT_SYMBOL(xsk_umem_discard_addr); static int __xsk_rcv(struct xdp_sock *xs, struct xdp_buff *xdp, u32 len) { - void *to_buf, *from_buf; + u64 offset = xs->umem->headroom; + u64 addr, memcpy_addr; + void *from_buf; u32 metalen; - u64 addr; int err; - if (!xskq_peek_addr(xs->umem->fq, &addr) || + if (!xskq_cons_peek_addr(xs->umem->fq, &addr, xs->umem) || len > xs->umem->chunk_size_nohr - XDP_PACKET_HEADROOM) { xs->rx_dropped++; return -ENOSPC; } - addr += xs->umem->headroom; - if (unlikely(xdp_data_meta_unsupported(xdp))) { from_buf = xdp->data; metalen = 0; @@ -78,12 +162,14 @@ static int __xsk_rcv(struct xdp_sock *xs, struct xdp_buff *xdp, u32 len) metalen = xdp->data - xdp->data_meta; } - to_buf = xdp_umem_get_data(xs->umem, addr); - memcpy(to_buf, from_buf, len + metalen); - addr += metalen; - err = xskq_produce_batch_desc(xs->rx, addr, len); + memcpy_addr = xsk_umem_adjust_offset(xs->umem, addr, offset); + __xsk_rcv_memcpy(xs->umem, memcpy_addr, from_buf, len, metalen); + + offset += metalen; + addr = xsk_umem_adjust_offset(xs->umem, addr, offset); + err = xskq_prod_reserve_desc(xs->rx, addr, len); if (!err) { - xskq_discard_addr(xs->umem->fq); + xskq_cons_release(xs->umem->fq); xdp_return_buff(xdp); return 0; } @@ -94,7 +180,7 @@ static int __xsk_rcv(struct xdp_sock *xs, struct xdp_buff *xdp, u32 len) static int __xsk_rcv_zc(struct xdp_sock *xs, struct xdp_buff *xdp, u32 len) { - int err = xskq_produce_batch_desc(xs->rx, (u64)xdp->handle, len); + int err = xskq_prod_reserve_desc(xs->rx, xdp->handle, len); if (err) xs->rx_dropped++; @@ -102,10 +188,23 @@ static int __xsk_rcv_zc(struct xdp_sock *xs, struct xdp_buff *xdp, u32 len) return err; } -int xsk_rcv(struct xdp_sock *xs, struct xdp_buff *xdp) +static bool xsk_is_bound(struct xdp_sock *xs) +{ + if (READ_ONCE(xs->state) == XSK_BOUND) { + /* Matches smp_wmb() in bind(). */ + smp_rmb(); + return true; + } + return false; +} + +static int xsk_rcv(struct xdp_sock *xs, struct xdp_buff *xdp) { u32 len; + if (!xsk_is_bound(xs)) + return -EINVAL; + if (xs->dev != xdp->rxq->dev || xs->queue_id != xdp->rxq->queue_index) return -EINVAL; @@ -115,16 +214,17 @@ int xsk_rcv(struct xdp_sock *xs, struct xdp_buff *xdp) __xsk_rcv_zc(xs, xdp, len) : __xsk_rcv(xs, xdp, len); } -void xsk_flush(struct xdp_sock *xs) +static void xsk_flush(struct xdp_sock *xs) { - xskq_produce_flush_desc(xs->rx); - xs->sk.sk_data_ready(&xs->sk); + xskq_prod_submit(xs->rx); + sock_def_readable(&xs->sk); } int xsk_generic_rcv(struct xdp_sock *xs, struct xdp_buff *xdp) { u32 metalen = xdp->data - xdp->data_meta; u32 len = xdp->data_end - xdp->data; + u64 offset = xs->umem->headroom; void *buffer; u64 addr; int err; @@ -136,23 +236,23 @@ int xsk_generic_rcv(struct xdp_sock *xs, struct xdp_buff *xdp) goto out_unlock; } - if (!xskq_peek_addr(xs->umem->fq, &addr) || + if (!xskq_cons_peek_addr(xs->umem->fq, &addr, xs->umem) || len > xs->umem->chunk_size_nohr - XDP_PACKET_HEADROOM) { err = -ENOSPC; goto out_drop; } - addr += xs->umem->headroom; - + addr = xsk_umem_adjust_offset(xs->umem, addr, offset); buffer = xdp_umem_get_data(xs->umem, addr); memcpy(buffer, xdp->data_meta, len + metalen); - addr += metalen; - err = xskq_produce_batch_desc(xs->rx, addr, len); + + addr = xsk_umem_adjust_offset(xs->umem, addr, metalen); + err = xskq_prod_reserve_desc(xs->rx, addr, len); if (err) goto out_drop; - xskq_discard_addr(xs->umem->fq); - xskq_produce_flush_desc(xs->rx); + xskq_cons_release(xs->umem->fq); + xskq_prod_submit(xs->rx); spin_unlock_bh(&xs->rx_lock); @@ -166,9 +266,35 @@ out_unlock: return err; } +int __xsk_map_redirect(struct xdp_sock *xs, struct xdp_buff *xdp) +{ + struct list_head *flush_list = this_cpu_ptr(&xskmap_flush_list); + int err; + + err = xsk_rcv(xs, xdp); + if (err) + return err; + + if (!xs->flush_node.prev) + list_add(&xs->flush_node, flush_list); + + return 0; +} + +void __xsk_map_flush(void) +{ + struct list_head *flush_list = this_cpu_ptr(&xskmap_flush_list); + struct xdp_sock *xs, *tmp; + + list_for_each_entry_safe(xs, tmp, flush_list, flush_node) { + xsk_flush(xs); + __list_del_clearprev(&xs->flush_node); + } +} + void xsk_umem_complete_tx(struct xdp_umem *umem, u32 nb_entries) { - xskq_produce_flush_addr_n(umem->cq, nb_entries); + xskq_prod_submit_n(umem->cq, nb_entries); } EXPORT_SYMBOL(xsk_umem_complete_tx); @@ -190,13 +316,18 @@ bool xsk_umem_consume_tx(struct xdp_umem *umem, struct xdp_desc *desc) rcu_read_lock(); list_for_each_entry_rcu(xs, &umem->xsk_list, list) { - if (!xskq_peek_desc(xs->tx, desc)) + if (!xskq_cons_peek_desc(xs->tx, desc, umem)) continue; - if (xskq_produce_addr_lazy(umem->cq, desc->addr)) + /* This is the backpreassure mechanism for the Tx path. + * Reserve space in the completion queue and only proceed + * if there is space in it. This avoids having to implement + * any buffering in the Tx path. + */ + if (xskq_prod_reserve_addr(umem->cq, desc->addr)) goto out; - xskq_discard_desc(xs->tx); + xskq_cons_release(xs->tx); rcu_read_unlock(); return true; } @@ -207,12 +338,21 @@ out: } EXPORT_SYMBOL(xsk_umem_consume_tx); -static int xsk_zc_xmit(struct sock *sk) +static int xsk_wakeup(struct xdp_sock *xs, u8 flags) { - struct xdp_sock *xs = xdp_sk(sk); struct net_device *dev = xs->dev; + int err; - return dev->netdev_ops->ndo_xsk_async_xmit(dev, xs->queue_id); + rcu_read_lock(); + err = dev->netdev_ops->ndo_xsk_wakeup(dev, xs->queue_id, flags); + rcu_read_unlock(); + + return err; +} + +static int xsk_zc_xmit(struct xdp_sock *xs) +{ + return xsk_wakeup(xs, XDP_WAKEUP_TX); } static void xsk_destruct_skb(struct sk_buff *skb) @@ -222,17 +362,16 @@ static void xsk_destruct_skb(struct sk_buff *skb) unsigned long flags; spin_lock_irqsave(&xs->tx_completion_lock, flags); - WARN_ON_ONCE(xskq_produce_addr(xs->umem->cq, addr)); + xskq_prod_submit_addr(xs->umem->cq, addr); spin_unlock_irqrestore(&xs->tx_completion_lock, flags); sock_wfree(skb); } -static int xsk_generic_xmit(struct sock *sk, struct msghdr *m, - size_t total_len) +static int xsk_generic_xmit(struct sock *sk) { - u32 max_batch = TX_BATCH_SIZE; struct xdp_sock *xs = xdp_sk(sk); + u32 max_batch = TX_BATCH_SIZE; bool sent_frame = false; struct xdp_desc desc; struct sk_buff *skb; @@ -243,7 +382,7 @@ static int xsk_generic_xmit(struct sock *sk, struct msghdr *m, if (xs->queue_id >= xs->dev->real_num_tx_queues) goto out; - while (xskq_peek_desc(xs->tx, &desc)) { + while (xskq_cons_peek_desc(xs->tx, &desc, xs->umem)) { char *buffer; u64 addr; u32 len; @@ -264,7 +403,12 @@ static int xsk_generic_xmit(struct sock *sk, struct msghdr *m, addr = desc.addr; buffer = xdp_umem_get_data(xs->umem, addr); err = skb_store_bits(skb, 0, buffer, len); - if (unlikely(err) || xskq_reserve_addr(xs->umem->cq)) { + /* This is the backpreassure mechanism for the Tx path. + * Reserve space in the completion queue and only proceed + * if there is space in it. This avoids having to implement + * any buffering in the Tx path. + */ + if (unlikely(err) || xskq_prod_reserve(xs->umem->cq)) { kfree_skb(skb); goto out; } @@ -272,11 +416,11 @@ static int xsk_generic_xmit(struct sock *sk, struct msghdr *m, skb->dev = xs->dev; skb->priority = sk->sk_priority; skb->mark = sk->sk_mark; - skb_shinfo(skb)->destructor_arg = (void *)(long)addr; + skb_shinfo(skb)->destructor_arg = (void *)(long)desc.addr; skb->destructor = xsk_destruct_skb; err = dev_direct_xmit(skb, xs->queue_id); - xskq_discard_desc(xs->tx); + xskq_cons_release(xs->tx); /* Ignore NET_XMIT_CN as packet might have been sent */ if (err == NET_XMIT_DROP || err == NETDEV_TX_BUSY) { /* SKB completed but not sent */ @@ -295,35 +439,57 @@ out: return err; } -static int xsk_sendmsg(struct socket *sock, struct msghdr *m, size_t total_len) +static int __xsk_sendmsg(struct sock *sk) { - bool need_wait = !(m->msg_flags & MSG_DONTWAIT); - struct sock *sk = sock->sk; struct xdp_sock *xs = xdp_sk(sk); - if (unlikely(!xs->dev)) - return -ENXIO; if (unlikely(!(xs->dev->flags & IFF_UP))) return -ENETDOWN; if (unlikely(!xs->tx)) return -ENOBUFS; - if (need_wait) + + return xs->zc ? xsk_zc_xmit(xs) : xsk_generic_xmit(sk); +} + +static int xsk_sendmsg(struct socket *sock, struct msghdr *m, size_t total_len) +{ + bool need_wait = !(m->msg_flags & MSG_DONTWAIT); + struct sock *sk = sock->sk; + struct xdp_sock *xs = xdp_sk(sk); + + if (unlikely(!xsk_is_bound(xs))) + return -ENXIO; + if (unlikely(need_wait)) return -EOPNOTSUPP; - return (xs->zc) ? xsk_zc_xmit(sk) : xsk_generic_xmit(sk, m, total_len); + return __xsk_sendmsg(sk); } -static unsigned int xsk_poll(struct file *file, struct socket *sock, +static __poll_t xsk_poll(struct file *file, struct socket *sock, struct poll_table_struct *wait) { - unsigned int mask = datagram_poll(file, sock, wait); + __poll_t mask = datagram_poll(file, sock, wait); struct sock *sk = sock->sk; struct xdp_sock *xs = xdp_sk(sk); + struct xdp_umem *umem; - if (xs->rx && !xskq_empty_desc(xs->rx)) - mask |= POLLIN | POLLRDNORM; - if (xs->tx && !xskq_full_desc(xs->tx)) - mask |= POLLOUT | POLLWRNORM; + if (unlikely(!xsk_is_bound(xs))) + return mask; + + umem = xs->umem; + + if (umem->need_wakeup) { + if (xs->zc) + xsk_wakeup(xs, umem->need_wakeup); + else + /* Poll needs to drive Tx also in copy mode */ + __xsk_sendmsg(sk); + } + + if (xs->rx && !xskq_prod_is_empty(xs->rx)) + mask |= EPOLLIN | EPOLLRDNORM; + if (xs->tx && !xskq_cons_is_full(xs->tx)) + mask |= EPOLLOUT | EPOLLWRNORM; return mask; } @@ -342,7 +508,7 @@ static int xsk_init_queue(u32 entries, struct xsk_queue **queue, /* Make sure queue is ready before it can be seen by others */ smp_wmb(); - *queue = q; + WRITE_ONCE(*queue, q); return 0; } @@ -350,10 +516,9 @@ static void xsk_unbind_dev(struct xdp_sock *xs) { struct net_device *dev = xs->dev; - if (!dev || xs->state != XSK_BOUND) + if (xs->state != XSK_BOUND) return; - - xs->state = XSK_UNBOUND; + WRITE_ONCE(xs->state, XSK_UNBOUND); /* Wait for driver to stop using the xdp socket. */ xdp_del_sk_umem(xs->umem, xs); @@ -362,6 +527,52 @@ static void xsk_unbind_dev(struct xdp_sock *xs) dev_put(dev); } +static struct xsk_map *xsk_get_map_list_entry(struct xdp_sock *xs, + struct xdp_sock ***map_entry) +{ + struct xsk_map *map = NULL; + struct xsk_map_node *node; + + *map_entry = NULL; + + spin_lock_bh(&xs->map_list_lock); + node = list_first_entry_or_null(&xs->map_list, struct xsk_map_node, + node); + if (node) { + WARN_ON(xsk_map_inc(node->map)); + map = node->map; + *map_entry = node->map_entry; + } + spin_unlock_bh(&xs->map_list_lock); + return map; +} + +static void xsk_delete_from_maps(struct xdp_sock *xs) +{ + /* This function removes the current XDP socket from all the + * maps it resides in. We need to take extra care here, due to + * the two locks involved. Each map has a lock synchronizing + * updates to the entries, and each socket has a lock that + * synchronizes access to the list of maps (map_list). For + * deadlock avoidance the locks need to be taken in the order + * "map lock"->"socket map list lock". We start off by + * accessing the socket map list, and take a reference to the + * map to guarantee existence between the + * xsk_get_map_list_entry() and xsk_map_try_sock_delete() + * calls. Then we ask the map to remove the socket, which + * tries to remove the socket from the map. Note that there + * might be updates to the map between + * xsk_get_map_list_entry() and xsk_map_try_sock_delete(). + */ + struct xdp_sock **map_entry = NULL; + struct xsk_map *map; + + while ((map = xsk_get_map_list_entry(xs, &map_entry))) { + xsk_map_try_sock_delete(map, xs, map_entry); + xsk_map_put(map); + } +} + static int xsk_release(struct socket *sock) { struct sock *sk = sock->sk; @@ -381,7 +592,10 @@ static int xsk_release(struct socket *sock) sock_prot_inuse_add(net, sk->sk_prot, -1); local_bh_enable(); + xsk_delete_from_maps(xs); + mutex_lock(&xs->mutex); xsk_unbind_dev(xs); + mutex_unlock(&xs->mutex); xskq_destroy(xs->rx); xskq_destroy(xs->tx); @@ -412,6 +626,24 @@ static struct socket *xsk_lookup_xsk_from_fd(int fd) return sock; } +/* Check if umem pages are contiguous. + * If zero-copy mode, use the DMA address to do the page contiguity check + * For all other modes we use addr (kernel virtual address) + * Store the result in the low bits of addr. + */ +static void xsk_check_page_contiguity(struct xdp_umem *umem, u32 flags) +{ + struct xdp_umem_page *pgs = umem->pages; + int i, is_contig; + + for (i = 0; i < umem->npgs - 1; i++) { + is_contig = (flags & XDP_ZEROCOPY) ? + (pgs[i].dma + PAGE_SIZE == pgs[i + 1].dma) : + (pgs[i].addr + PAGE_SIZE == pgs[i + 1].addr); + pgs[i].addr += is_contig << XSK_NEXT_PG_CONTIG_SHIFT; + } +} + static int xsk_bind(struct socket *sock, struct sockaddr *addr, int addr_len) { struct sockaddr_xdp *sxdp = (struct sockaddr_xdp *)addr; @@ -427,7 +659,8 @@ static int xsk_bind(struct socket *sock, struct sockaddr *addr, int addr_len) return -EINVAL; flags = sxdp->sxdp_flags; - if (flags & ~(XDP_SHARED_UMEM | XDP_COPY | XDP_ZEROCOPY)) + if (flags & ~(XDP_SHARED_UMEM | XDP_COPY | XDP_ZEROCOPY | + XDP_USE_NEED_WAKEUP)) return -EINVAL; rtnl_lock(); @@ -454,7 +687,8 @@ static int xsk_bind(struct socket *sock, struct sockaddr *addr, int addr_len) struct xdp_sock *umem_xs; struct socket *sock; - if ((flags & XDP_COPY) || (flags & XDP_ZEROCOPY)) { + if ((flags & XDP_COPY) || (flags & XDP_ZEROCOPY) || + (flags & XDP_USE_NEED_WAKEUP)) { /* Cannot specify flags for shared sockets. */ err = -EINVAL; goto out_unlock; @@ -473,19 +707,19 @@ static int xsk_bind(struct socket *sock, struct sockaddr *addr, int addr_len) } umem_xs = xdp_sk(sock->sk); - if (!umem_xs->umem) { - /* No umem to inherit. */ + if (!xsk_is_bound(umem_xs)) { err = -EBADF; sockfd_put(sock); goto out_unlock; - } else if (umem_xs->dev != dev || umem_xs->queue_id != qid) { + } + if (umem_xs->dev != dev || umem_xs->queue_id != qid) { err = -EINVAL; sockfd_put(sock); goto out_unlock; } xdp_get_umem(umem_xs->umem); - xs->umem = umem_xs->umem; + WRITE_ONCE(xs->umem, umem_xs->umem); sockfd_put(sock); } else if (!xs->umem || !xdp_umem_validate_queues(xs->umem)) { err = -EINVAL; @@ -500,6 +734,8 @@ static int xsk_bind(struct socket *sock, struct sockaddr *addr, int addr_len) err = xdp_umem_assign_dev(xs->umem, dev, qid, flags); if (err) goto out_unlock; + + xsk_check_page_contiguity(xs->umem, flags); } xs->dev = dev; @@ -510,16 +746,28 @@ static int xsk_bind(struct socket *sock, struct sockaddr *addr, int addr_len) xdp_add_sk_umem(xs->umem, xs); out_unlock: - if (err) + if (err) { dev_put(dev); - else - xs->state = XSK_BOUND; + } else { + /* Matches smp_rmb() in bind() for shared umem + * sockets, and xsk_is_bound(). + */ + smp_wmb(); + WRITE_ONCE(xs->state, XSK_BOUND); + } out_release: mutex_unlock(&xs->mutex); rtnl_unlock(); return err; } +struct xdp_umem_reg_v1 { + __u64 addr; /* Start of packet data area */ + __u64 len; /* Length of packet data area */ + __u32 chunk_size; + __u32 headroom; +}; + static int xsk_setsockopt(struct socket *sock, int level, int optname, char __user *optval, unsigned int optlen) { @@ -549,15 +797,24 @@ static int xsk_setsockopt(struct socket *sock, int level, int optname, } q = (optname == XDP_TX_RING) ? &xs->tx : &xs->rx; err = xsk_init_queue(entries, q, false); + if (!err && optname == XDP_TX_RING) + /* Tx needs to be explicitly woken up the first time */ + xs->tx->ring->flags |= XDP_RING_NEED_WAKEUP; mutex_unlock(&xs->mutex); return err; } case XDP_UMEM_REG: { - struct xdp_umem_reg mr; + size_t mr_size = sizeof(struct xdp_umem_reg); + struct xdp_umem_reg mr = {}; struct xdp_umem *umem; - if (copy_from_user(&mr, optval, sizeof(mr))) + if (optlen < sizeof(struct xdp_umem_reg_v1)) + return -EINVAL; + else if (optlen < sizeof(mr)) + mr_size = sizeof(struct xdp_umem_reg_v1); + + if (copy_from_user(&mr, optval, mr_size)) return -EFAULT; mutex_lock(&xs->mutex); @@ -574,7 +831,7 @@ static int xsk_setsockopt(struct socket *sock, int level, int optname, /* Make sure umem is ready before it can be seen by others */ smp_wmb(); - xs->umem = umem; + WRITE_ONCE(xs->umem, umem); mutex_unlock(&xs->mutex); return 0; } @@ -610,6 +867,20 @@ static int xsk_setsockopt(struct socket *sock, int level, int optname, return -ENOPROTOOPT; } +static void xsk_enter_rxtx_offsets(struct xdp_ring_offset_v1 *ring) +{ + ring->producer = offsetof(struct xdp_rxtx_ring, ptrs.producer); + ring->consumer = offsetof(struct xdp_rxtx_ring, ptrs.consumer); + ring->desc = offsetof(struct xdp_rxtx_ring, desc); +} + +static void xsk_enter_umem_offsets(struct xdp_ring_offset_v1 *ring) +{ + ring->producer = offsetof(struct xdp_umem_ring, ptrs.producer); + ring->consumer = offsetof(struct xdp_umem_ring, ptrs.consumer); + ring->desc = offsetof(struct xdp_umem_ring, desc); +} + static int xsk_getsockopt(struct socket *sock, int level, int optname, char __user *optval, int __user *optlen) { @@ -649,26 +920,49 @@ static int xsk_getsockopt(struct socket *sock, int level, int optname, case XDP_MMAP_OFFSETS: { struct xdp_mmap_offsets off; + struct xdp_mmap_offsets_v1 off_v1; + bool flags_supported = true; + void *to_copy; - if (len < sizeof(off)) + if (len < sizeof(off_v1)) return -EINVAL; + else if (len < sizeof(off)) + flags_supported = false; + + if (flags_supported) { + /* xdp_ring_offset is identical to xdp_ring_offset_v1 + * except for the flags field added to the end. + */ + xsk_enter_rxtx_offsets((struct xdp_ring_offset_v1 *) + &off.rx); + xsk_enter_rxtx_offsets((struct xdp_ring_offset_v1 *) + &off.tx); + xsk_enter_umem_offsets((struct xdp_ring_offset_v1 *) + &off.fr); + xsk_enter_umem_offsets((struct xdp_ring_offset_v1 *) + &off.cr); + off.rx.flags = offsetof(struct xdp_rxtx_ring, + ptrs.flags); + off.tx.flags = offsetof(struct xdp_rxtx_ring, + ptrs.flags); + off.fr.flags = offsetof(struct xdp_umem_ring, + ptrs.flags); + off.cr.flags = offsetof(struct xdp_umem_ring, + ptrs.flags); + + len = sizeof(off); + to_copy = &off; + } else { + xsk_enter_rxtx_offsets(&off_v1.rx); + xsk_enter_rxtx_offsets(&off_v1.tx); + xsk_enter_umem_offsets(&off_v1.fr); + xsk_enter_umem_offsets(&off_v1.cr); + + len = sizeof(off_v1); + to_copy = &off_v1; + } - off.rx.producer = offsetof(struct xdp_rxtx_ring, ptrs.producer); - off.rx.consumer = offsetof(struct xdp_rxtx_ring, ptrs.consumer); - off.rx.desc = offsetof(struct xdp_rxtx_ring, desc); - off.tx.producer = offsetof(struct xdp_rxtx_ring, ptrs.producer); - off.tx.consumer = offsetof(struct xdp_rxtx_ring, ptrs.consumer); - off.tx.desc = offsetof(struct xdp_rxtx_ring, desc); - - off.fr.producer = offsetof(struct xdp_umem_ring, ptrs.producer); - off.fr.consumer = offsetof(struct xdp_umem_ring, ptrs.consumer); - off.fr.desc = offsetof(struct xdp_umem_ring, desc); - off.cr.producer = offsetof(struct xdp_umem_ring, ptrs.producer); - off.cr.consumer = offsetof(struct xdp_umem_ring, ptrs.consumer); - off.cr.desc = offsetof(struct xdp_umem_ring, desc); - - len = sizeof(off); - if (copy_to_user(optval, &off, len)) + if (copy_to_user(optval, to_copy, len)) return -EFAULT; if (put_user(len, optlen)) return -EFAULT; @@ -713,7 +1007,7 @@ static int xsk_mmap(struct file *file, struct socket *sock, unsigned long pfn; struct page *qpg; - if (xs->state != XSK_READY) + if (READ_ONCE(xs->state) != XSK_READY) return -EBUSY; if (offset == XDP_PGOFF_RX_RING) { @@ -739,7 +1033,7 @@ static int xsk_mmap(struct file *file, struct socket *sock, /* Matches the smp_wmb() in xsk_init_queue */ smp_rmb(); qpg = virt_to_head_page(q->ring); - if (size > (PAGE_SIZE << compound_order(qpg))) + if (size > page_size(qpg)) return -EINVAL; pfn = virt_to_phys(q->ring) >> PAGE_SHIFT; @@ -855,6 +1149,9 @@ static int xsk_create(struct net *net, struct socket *sock, int protocol, spin_lock_init(&xs->rx_lock); spin_lock_init(&xs->tx_completion_lock); + INIT_LIST_HEAD(&xs->map_list); + spin_lock_init(&xs->map_list_lock); + mutex_lock(&net->xdp.lock); sk_add_node_rcu(sk, &net->xdp.list); mutex_unlock(&net->xdp.lock); @@ -895,7 +1192,7 @@ static struct pernet_operations xsk_net_ops = { static int __init xsk_init(void) { - int err; + int err, cpu; err = proto_register(&xsk_proto, 0 /* no slab */); if (err) @@ -913,6 +1210,8 @@ static int __init xsk_init(void) if (err) goto out_pernet; + for_each_possible_cpu(cpu) + INIT_LIST_HEAD(&per_cpu(xskmap_flush_list, cpu)); return 0; out_pernet: diff --git a/net/xdp/xsk.h b/net/xdp/xsk.h index ba8120610426..4cfd106bdb53 100644 --- a/net/xdp/xsk.h +++ b/net/xdp/xsk.h @@ -4,6 +4,19 @@ #ifndef XSK_H_ #define XSK_H_ +struct xdp_ring_offset_v1 { + __u64 producer; + __u64 consumer; + __u64 desc; +}; + +struct xdp_mmap_offsets_v1 { + struct xdp_ring_offset_v1 rx; + struct xdp_ring_offset_v1 tx; + struct xdp_ring_offset_v1 fr; + struct xdp_ring_offset_v1 cr; +}; + static inline struct xdp_sock *xdp_sk(struct sock *sk) { return (struct xdp_sock *)sk; diff --git a/net/xdp/xsk_diag.c b/net/xdp/xsk_diag.c index d5e06c8e0cbf..f59791ba43a0 100644 --- a/net/xdp/xsk_diag.c +++ b/net/xdp/xsk_diag.c @@ -56,7 +56,7 @@ static int xsk_diag_put_umem(const struct xdp_sock *xs, struct sk_buff *nlskb) du.id = umem->id; du.size = umem->size; du.num_pages = umem->npgs; - du.chunk_size = (__u32)(~umem->chunk_mask + 1); + du.chunk_size = umem->chunk_size_nohr + umem->headroom; du.headroom = umem->headroom; du.ifindex = umem->dev ? umem->dev->ifindex : 0; du.queue_id = umem->queue_id; @@ -97,6 +97,7 @@ static int xsk_diag_fill(struct sock *sk, struct sk_buff *nlskb, msg->xdiag_ino = sk_ino; sock_diag_save_cookie(sk, msg->xdiag_cookie); + mutex_lock(&xs->mutex); if ((req->xdiag_show & XDP_SHOW_INFO) && xsk_diag_put_info(xs, nlskb)) goto out_nlmsg_trim; @@ -117,10 +118,12 @@ static int xsk_diag_fill(struct sock *sk, struct sk_buff *nlskb, sock_diag_put_meminfo(sk, nlskb, XDP_DIAG_MEMINFO)) goto out_nlmsg_trim; + mutex_unlock(&xs->mutex); nlmsg_end(nlskb, nlh); return 0; out_nlmsg_trim: + mutex_unlock(&xs->mutex); nlmsg_cancel(nlskb, nlh); return -EMSGSIZE; } diff --git a/net/xdp/xsk_queue.c b/net/xdp/xsk_queue.c index b66504592d9b..c90e9c1e3c63 100644 --- a/net/xdp/xsk_queue.c +++ b/net/xdp/xsk_queue.c @@ -18,14 +18,14 @@ void xskq_set_umem(struct xsk_queue *q, u64 size, u64 chunk_mask) q->chunk_mask = chunk_mask; } -static u32 xskq_umem_get_ring_size(struct xsk_queue *q) +static size_t xskq_get_ring_size(struct xsk_queue *q, bool umem_queue) { - return sizeof(struct xdp_umem_ring) + q->nentries * sizeof(u64); -} + struct xdp_umem_ring *umem_ring; + struct xdp_rxtx_ring *rxtx_ring; -static u32 xskq_rxtx_get_ring_size(struct xsk_queue *q) -{ - return sizeof(struct xdp_ring) + q->nentries * sizeof(struct xdp_desc); + if (umem_queue) + return struct_size(umem_ring, desc, q->nentries); + return struct_size(rxtx_ring, desc, q->nentries); } struct xsk_queue *xskq_create(u32 nentries, bool umem_queue) @@ -43,8 +43,7 @@ struct xsk_queue *xskq_create(u32 nentries, bool umem_queue) gfp_flags = GFP_KERNEL | __GFP_ZERO | __GFP_NOWARN | __GFP_COMP | __GFP_NORETRY; - size = umem_queue ? xskq_umem_get_ring_size(q) : - xskq_rxtx_get_ring_size(q); + size = xskq_get_ring_size(q, umem_queue); q->ring = (struct xdp_ring *)__get_free_pages(gfp_flags, get_order(size)); diff --git a/net/xdp/xsk_queue.h b/net/xdp/xsk_queue.h index 909c5168ed0f..bec2af11853a 100644 --- a/net/xdp/xsk_queue.h +++ b/net/xdp/xsk_queue.h @@ -10,12 +10,10 @@ #include <linux/if_xdp.h> #include <net/xdp_sock.h> -#define RX_BATCH_SIZE 16 -#define LAZY_UPDATE_THRESHOLD 128 - struct xdp_ring { u32 producer ____cacheline_aligned_in_smp; u32 consumer ____cacheline_aligned_in_smp; + u32 flags; }; /* Used for the RX and TX queues for packets */ @@ -35,10 +33,8 @@ struct xsk_queue { u64 size; u32 ring_mask; u32 nentries; - u32 prod_head; - u32 prod_tail; - u32 cons_head; - u32 cons_tail; + u32 cached_prod; + u32 cached_cons; struct xdp_ring *ring; u64 invalid_descs; }; @@ -85,57 +81,116 @@ struct xsk_queue { * now and again after circling through the ring. */ -/* Common functions operating for both RXTX and umem queues */ +/* The operations on the rings are the following: + * + * producer consumer + * + * RESERVE entries PEEK in the ring for entries + * WRITE data into the ring READ data from the ring + * SUBMIT entries RELEASE entries + * + * The producer reserves one or more entries in the ring. It can then + * fill in these entries and finally submit them so that they can be + * seen and read by the consumer. + * + * The consumer peeks into the ring to see if the producer has written + * any new entries. If so, the producer can then read these entries + * and when it is done reading them release them back to the producer + * so that the producer can use these slots to fill in new entries. + * + * The function names below reflect these operations. + */ + +/* Functions that read and validate content from consumer rings. */ -static inline u64 xskq_nb_invalid_descs(struct xsk_queue *q) +static inline bool xskq_cons_crosses_non_contig_pg(struct xdp_umem *umem, + u64 addr, + u64 length) { - return q ? q->invalid_descs : 0; + bool cross_pg = (addr & (PAGE_SIZE - 1)) + length > PAGE_SIZE; + bool next_pg_contig = + (unsigned long)umem->pages[(addr >> PAGE_SHIFT)].addr & + XSK_NEXT_PG_CONTIG_MASK; + + return cross_pg && !next_pg_contig; } -static inline u32 xskq_nb_avail(struct xsk_queue *q, u32 dcnt) +static inline bool xskq_cons_is_valid_unaligned(struct xsk_queue *q, + u64 addr, + u64 length, + struct xdp_umem *umem) { - u32 entries = q->prod_tail - q->cons_tail; + u64 base_addr = xsk_umem_extract_addr(addr); - if (entries == 0) { - /* Refresh the local pointer */ - q->prod_tail = READ_ONCE(q->ring->producer); - entries = q->prod_tail - q->cons_tail; + addr = xsk_umem_add_offset_to_addr(addr); + if (base_addr >= q->size || addr >= q->size || + xskq_cons_crosses_non_contig_pg(umem, addr, length)) { + q->invalid_descs++; + return false; } - return (entries > dcnt) ? dcnt : entries; + return true; } -static inline u32 xskq_nb_free(struct xsk_queue *q, u32 producer, u32 dcnt) +static inline bool xskq_cons_is_valid_addr(struct xsk_queue *q, u64 addr) { - u32 free_entries = q->nentries - (producer - q->cons_tail); - - if (free_entries >= dcnt) - return free_entries; + if (addr >= q->size) { + q->invalid_descs++; + return false; + } - /* Refresh the local tail pointer */ - q->cons_tail = READ_ONCE(q->ring->consumer); - return q->nentries - (producer - q->cons_tail); + return true; } -static inline bool xskq_has_addrs(struct xsk_queue *q, u32 cnt) +static inline bool xskq_cons_read_addr(struct xsk_queue *q, u64 *addr, + struct xdp_umem *umem) { - u32 entries = q->prod_tail - q->cons_tail; + struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring; - if (entries >= cnt) - return true; + while (q->cached_cons != q->cached_prod) { + u32 idx = q->cached_cons & q->ring_mask; - /* Refresh the local pointer. */ - q->prod_tail = READ_ONCE(q->ring->producer); - entries = q->prod_tail - q->cons_tail; + *addr = ring->desc[idx] & q->chunk_mask; - return entries >= cnt; -} + if (umem->flags & XDP_UMEM_UNALIGNED_CHUNK_FLAG) { + if (xskq_cons_is_valid_unaligned(q, *addr, + umem->chunk_size_nohr, + umem)) + return true; + goto out; + } -/* UMEM queue */ + if (xskq_cons_is_valid_addr(q, *addr)) + return true; -static inline bool xskq_is_valid_addr(struct xsk_queue *q, u64 addr) +out: + q->cached_cons++; + } + + return false; +} + +static inline bool xskq_cons_is_valid_desc(struct xsk_queue *q, + struct xdp_desc *d, + struct xdp_umem *umem) { - if (addr >= q->size) { + if (umem->flags & XDP_UMEM_UNALIGNED_CHUNK_FLAG) { + if (!xskq_cons_is_valid_unaligned(q, d->addr, d->len, umem)) + return false; + + if (d->len > umem->chunk_size_nohr || d->options) { + q->invalid_descs++; + return false; + } + + return true; + } + + if (!xskq_cons_is_valid_addr(q, d->addr)) + return false; + + if (((d->addr + d->len) & q->chunk_mask) != (d->addr & q->chunk_mask) || + d->options) { q->invalid_descs++; return false; } @@ -143,177 +198,184 @@ static inline bool xskq_is_valid_addr(struct xsk_queue *q, u64 addr) return true; } -static inline u64 *xskq_validate_addr(struct xsk_queue *q, u64 *addr) +static inline bool xskq_cons_read_desc(struct xsk_queue *q, + struct xdp_desc *desc, + struct xdp_umem *umem) { - while (q->cons_tail != q->cons_head) { - struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring; - unsigned int idx = q->cons_tail & q->ring_mask; + while (q->cached_cons != q->cached_prod) { + struct xdp_rxtx_ring *ring = (struct xdp_rxtx_ring *)q->ring; + u32 idx = q->cached_cons & q->ring_mask; - *addr = READ_ONCE(ring->desc[idx]) & q->chunk_mask; - if (xskq_is_valid_addr(q, *addr)) - return addr; + *desc = ring->desc[idx]; + if (xskq_cons_is_valid_desc(q, desc, umem)) + return true; - q->cons_tail++; + q->cached_cons++; } - return NULL; + return false; } -static inline u64 *xskq_peek_addr(struct xsk_queue *q, u64 *addr) -{ - if (q->cons_tail == q->cons_head) { - smp_mb(); /* D, matches A */ - WRITE_ONCE(q->ring->consumer, q->cons_tail); - q->cons_head = q->cons_tail + xskq_nb_avail(q, RX_BATCH_SIZE); - - /* Order consumer and data */ - smp_rmb(); - } +/* Functions for consumers */ - return xskq_validate_addr(q, addr); +static inline void __xskq_cons_release(struct xsk_queue *q) +{ + smp_mb(); /* D, matches A */ + WRITE_ONCE(q->ring->consumer, q->cached_cons); } -static inline void xskq_discard_addr(struct xsk_queue *q) +static inline void __xskq_cons_peek(struct xsk_queue *q) { - q->cons_tail++; + /* Refresh the local pointer */ + q->cached_prod = READ_ONCE(q->ring->producer); + smp_rmb(); /* C, matches B */ } -static inline int xskq_produce_addr(struct xsk_queue *q, u64 addr) +static inline void xskq_cons_get_entries(struct xsk_queue *q) { - struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring; + __xskq_cons_release(q); + __xskq_cons_peek(q); +} - if (xskq_nb_free(q, q->prod_tail, 1) == 0) - return -ENOSPC; +static inline bool xskq_cons_has_entries(struct xsk_queue *q, u32 cnt) +{ + u32 entries = q->cached_prod - q->cached_cons; - /* A, matches D */ - ring->desc[q->prod_tail++ & q->ring_mask] = addr; + if (entries >= cnt) + return true; - /* Order producer and data */ - smp_wmb(); /* B, matches C */ + __xskq_cons_peek(q); + entries = q->cached_prod - q->cached_cons; - WRITE_ONCE(q->ring->producer, q->prod_tail); - return 0; + return entries >= cnt; } -static inline int xskq_produce_addr_lazy(struct xsk_queue *q, u64 addr) +static inline bool xskq_cons_peek_addr(struct xsk_queue *q, u64 *addr, + struct xdp_umem *umem) { - struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring; - - if (xskq_nb_free(q, q->prod_head, LAZY_UPDATE_THRESHOLD) == 0) - return -ENOSPC; - - /* A, matches D */ - ring->desc[q->prod_head++ & q->ring_mask] = addr; - return 0; + if (q->cached_prod == q->cached_cons) + xskq_cons_get_entries(q); + return xskq_cons_read_addr(q, addr, umem); } -static inline void xskq_produce_flush_addr_n(struct xsk_queue *q, - u32 nb_entries) +static inline bool xskq_cons_peek_desc(struct xsk_queue *q, + struct xdp_desc *desc, + struct xdp_umem *umem) { - /* Order producer and data */ - smp_wmb(); /* B, matches C */ - - q->prod_tail += nb_entries; - WRITE_ONCE(q->ring->producer, q->prod_tail); + if (q->cached_prod == q->cached_cons) + xskq_cons_get_entries(q); + return xskq_cons_read_desc(q, desc, umem); } -static inline int xskq_reserve_addr(struct xsk_queue *q) +static inline void xskq_cons_release(struct xsk_queue *q) { - if (xskq_nb_free(q, q->prod_head, 1) == 0) - return -ENOSPC; + /* To improve performance, only update local state here. + * Reflect this to global state when we get new entries + * from the ring in xskq_cons_get_entries(). + */ + q->cached_cons++; +} - /* A, matches D */ - q->prod_head++; - return 0; +static inline bool xskq_cons_is_full(struct xsk_queue *q) +{ + /* No barriers needed since data is not accessed */ + return READ_ONCE(q->ring->producer) - READ_ONCE(q->ring->consumer) == + q->nentries; } -/* Rx/Tx queue */ +/* Functions for producers */ -static inline bool xskq_is_valid_desc(struct xsk_queue *q, struct xdp_desc *d) +static inline bool xskq_prod_is_full(struct xsk_queue *q) { - if (!xskq_is_valid_addr(q, d->addr)) - return false; + u32 free_entries = q->nentries - (q->cached_prod - q->cached_cons); - if (((d->addr + d->len) & q->chunk_mask) != (d->addr & q->chunk_mask) || - d->options) { - q->invalid_descs++; + if (free_entries) return false; - } - return true; + /* Refresh the local tail pointer */ + q->cached_cons = READ_ONCE(q->ring->consumer); + free_entries = q->nentries - (q->cached_prod - q->cached_cons); + + return !free_entries; } -static inline struct xdp_desc *xskq_validate_desc(struct xsk_queue *q, - struct xdp_desc *desc) +static inline int xskq_prod_reserve(struct xsk_queue *q) { - while (q->cons_tail != q->cons_head) { - struct xdp_rxtx_ring *ring = (struct xdp_rxtx_ring *)q->ring; - unsigned int idx = q->cons_tail & q->ring_mask; - - *desc = READ_ONCE(ring->desc[idx]); - if (xskq_is_valid_desc(q, desc)) - return desc; - - q->cons_tail++; - } + if (xskq_prod_is_full(q)) + return -ENOSPC; - return NULL; + /* A, matches D */ + q->cached_prod++; + return 0; } -static inline struct xdp_desc *xskq_peek_desc(struct xsk_queue *q, - struct xdp_desc *desc) +static inline int xskq_prod_reserve_addr(struct xsk_queue *q, u64 addr) { - if (q->cons_tail == q->cons_head) { - smp_mb(); /* D, matches A */ - WRITE_ONCE(q->ring->consumer, q->cons_tail); - q->cons_head = q->cons_tail + xskq_nb_avail(q, RX_BATCH_SIZE); - - /* Order consumer and data */ - smp_rmb(); /* C, matches B */ - } + struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring; - return xskq_validate_desc(q, desc); -} + if (xskq_prod_is_full(q)) + return -ENOSPC; -static inline void xskq_discard_desc(struct xsk_queue *q) -{ - q->cons_tail++; + /* A, matches D */ + ring->desc[q->cached_prod++ & q->ring_mask] = addr; + return 0; } -static inline int xskq_produce_batch_desc(struct xsk_queue *q, - u64 addr, u32 len) +static inline int xskq_prod_reserve_desc(struct xsk_queue *q, + u64 addr, u32 len) { struct xdp_rxtx_ring *ring = (struct xdp_rxtx_ring *)q->ring; - unsigned int idx; + u32 idx; - if (xskq_nb_free(q, q->prod_head, 1) == 0) + if (xskq_prod_is_full(q)) return -ENOSPC; /* A, matches D */ - idx = (q->prod_head++) & q->ring_mask; + idx = q->cached_prod++ & q->ring_mask; ring->desc[idx].addr = addr; ring->desc[idx].len = len; return 0; } -static inline void xskq_produce_flush_desc(struct xsk_queue *q) +static inline void __xskq_prod_submit(struct xsk_queue *q, u32 idx) { - /* Order producer and data */ smp_wmb(); /* B, matches C */ - q->prod_tail = q->prod_head; - WRITE_ONCE(q->ring->producer, q->prod_tail); + WRITE_ONCE(q->ring->producer, idx); +} + +static inline void xskq_prod_submit(struct xsk_queue *q) +{ + __xskq_prod_submit(q, q->cached_prod); +} + +static inline void xskq_prod_submit_addr(struct xsk_queue *q, u64 addr) +{ + struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring; + u32 idx = q->ring->producer; + + ring->desc[idx++ & q->ring_mask] = addr; + + __xskq_prod_submit(q, idx); } -static inline bool xskq_full_desc(struct xsk_queue *q) +static inline void xskq_prod_submit_n(struct xsk_queue *q, u32 nb_entries) { - return xskq_nb_avail(q, q->nentries) == q->nentries; + __xskq_prod_submit(q, q->ring->producer + nb_entries); } -static inline bool xskq_empty_desc(struct xsk_queue *q) +static inline bool xskq_prod_is_empty(struct xsk_queue *q) { - return xskq_nb_free(q, q->prod_tail, q->nentries) == q->nentries; + /* No barriers needed since data is not accessed */ + return READ_ONCE(q->ring->consumer) == READ_ONCE(q->ring->producer); +} + +/* For both producers and consumers */ + +static inline u64 xskq_nb_invalid_descs(struct xsk_queue *q) +{ + return q ? q->invalid_descs : 0; } void xskq_set_umem(struct xsk_queue *q, u64 size, u64 chunk_mask); |