summaryrefslogtreecommitdiffstats
path: root/net/xdp
diff options
context:
space:
mode:
Diffstat (limited to 'net/xdp')
-rw-r--r--net/xdp/xdp_umem.c93
-rw-r--r--net/xdp/xsk.c483
-rw-r--r--net/xdp/xsk.h13
-rw-r--r--net/xdp/xsk_diag.c5
-rw-r--r--net/xdp/xsk_queue.c15
-rw-r--r--net/xdp/xsk_queue.h344
6 files changed, 690 insertions, 263 deletions
diff --git a/net/xdp/xdp_umem.c b/net/xdp/xdp_umem.c
index 83de74ca729a..fa7bb5e060d0 100644
--- a/net/xdp/xdp_umem.c
+++ b/net/xdp/xdp_umem.c
@@ -14,6 +14,7 @@
#include <linux/netdevice.h>
#include <linux/rtnetlink.h>
#include <linux/idr.h>
+#include <linux/vmalloc.h>
#include "xdp_umem.h"
#include "xsk_queue.h"
@@ -26,6 +27,9 @@ void xdp_add_sk_umem(struct xdp_umem *umem, struct xdp_sock *xs)
{
unsigned long flags;
+ if (!xs->tx)
+ return;
+
spin_lock_irqsave(&umem->xsk_list_lock, flags);
list_add_rcu(&xs->list, &umem->xsk_list);
spin_unlock_irqrestore(&umem->xsk_list_lock, flags);
@@ -35,6 +39,9 @@ void xdp_del_sk_umem(struct xdp_umem *umem, struct xdp_sock *xs)
{
unsigned long flags;
+ if (!xs->tx)
+ return;
+
spin_lock_irqsave(&umem->xsk_list_lock, flags);
list_del_rcu(&xs->list);
spin_unlock_irqrestore(&umem->xsk_list_lock, flags);
@@ -105,14 +112,22 @@ int xdp_umem_assign_dev(struct xdp_umem *umem, struct net_device *dev,
umem->dev = dev;
umem->queue_id = queue_id;
+ if (flags & XDP_USE_NEED_WAKEUP) {
+ umem->flags |= XDP_UMEM_USES_NEED_WAKEUP;
+ /* Tx needs to be explicitly woken up the first time.
+ * Also for supporting drivers that do not implement this
+ * feature. They will always have to call sendto().
+ */
+ xsk_set_tx_need_wakeup(umem);
+ }
+
dev_hold(dev);
if (force_copy)
/* For copy-mode, we are done. */
return 0;
- if (!dev->netdev_ops->ndo_bpf ||
- !dev->netdev_ops->ndo_xsk_async_xmit) {
+ if (!dev->netdev_ops->ndo_bpf || !dev->netdev_ops->ndo_xsk_wakeup) {
err = -EOPNOTSUPP;
goto err_unreg_umem;
}
@@ -164,17 +179,41 @@ void xdp_umem_clear_dev(struct xdp_umem *umem)
umem->zc = false;
}
-static void xdp_umem_unpin_pages(struct xdp_umem *umem)
+static void xdp_umem_unmap_pages(struct xdp_umem *umem)
{
unsigned int i;
+ for (i = 0; i < umem->npgs; i++)
+ if (PageHighMem(umem->pgs[i]))
+ vunmap(umem->pages[i].addr);
+}
+
+static int xdp_umem_map_pages(struct xdp_umem *umem)
+{
+ unsigned int i;
+ void *addr;
+
for (i = 0; i < umem->npgs; i++) {
- struct page *page = umem->pgs[i];
+ if (PageHighMem(umem->pgs[i]))
+ addr = vmap(&umem->pgs[i], 1, VM_MAP, PAGE_KERNEL);
+ else
+ addr = page_address(umem->pgs[i]);
+
+ if (!addr) {
+ xdp_umem_unmap_pages(umem);
+ return -ENOMEM;
+ }
- set_page_dirty_lock(page);
- put_page(page);
+ umem->pages[i].addr = addr;
}
+ return 0;
+}
+
+static void xdp_umem_unpin_pages(struct xdp_umem *umem)
+{
+ unpin_user_pages_dirty_lock(umem->pgs, umem->npgs, true);
+
kfree(umem->pgs);
umem->pgs = NULL;
}
@@ -207,9 +246,10 @@ static void xdp_umem_release(struct xdp_umem *umem)
xsk_reuseq_destroy(umem);
+ xdp_umem_unmap_pages(umem);
xdp_umem_unpin_pages(umem);
- kfree(umem->pages);
+ kvfree(umem->pages);
umem->pages = NULL;
xdp_umem_unaccount_pages(umem);
@@ -251,7 +291,7 @@ static int xdp_umem_pin_pages(struct xdp_umem *umem)
return -ENOMEM;
down_read(&current->mm->mmap_sem);
- npgs = get_user_pages(umem->address, umem->npgs,
+ npgs = pin_user_pages(umem->address, umem->npgs,
gup_flags | FOLL_LONGTERM, &umem->pgs[0], NULL);
up_read(&current->mm->mmap_sem);
@@ -299,10 +339,11 @@ static int xdp_umem_account_pages(struct xdp_umem *umem)
static int xdp_umem_reg(struct xdp_umem *umem, struct xdp_umem_reg *mr)
{
+ bool unaligned_chunks = mr->flags & XDP_UMEM_UNALIGNED_CHUNK_FLAG;
u32 chunk_size = mr->chunk_size, headroom = mr->headroom;
unsigned int chunks, chunks_per_page;
u64 addr = mr->addr, size = mr->len;
- int size_chk, err, i;
+ int size_chk, err;
if (chunk_size < XDP_UMEM_MIN_CHUNK_SIZE || chunk_size > PAGE_SIZE) {
/* Strictly speaking we could support this, if:
@@ -314,7 +355,11 @@ static int xdp_umem_reg(struct xdp_umem *umem, struct xdp_umem_reg *mr)
return -EINVAL;
}
- if (!is_power_of_2(chunk_size))
+ if (mr->flags & ~(XDP_UMEM_UNALIGNED_CHUNK_FLAG |
+ XDP_UMEM_USES_NEED_WAKEUP))
+ return -EINVAL;
+
+ if (!unaligned_chunks && !is_power_of_2(chunk_size))
return -EINVAL;
if (!PAGE_ALIGNED(addr)) {
@@ -331,24 +376,26 @@ static int xdp_umem_reg(struct xdp_umem *umem, struct xdp_umem_reg *mr)
if (chunks == 0)
return -EINVAL;
- chunks_per_page = PAGE_SIZE / chunk_size;
- if (chunks < chunks_per_page || chunks % chunks_per_page)
- return -EINVAL;
-
- headroom = ALIGN(headroom, 64);
+ if (!unaligned_chunks) {
+ chunks_per_page = PAGE_SIZE / chunk_size;
+ if (chunks < chunks_per_page || chunks % chunks_per_page)
+ return -EINVAL;
+ }
size_chk = chunk_size - headroom - XDP_PACKET_HEADROOM;
if (size_chk < 0)
return -EINVAL;
umem->address = (unsigned long)addr;
- umem->chunk_mask = ~((u64)chunk_size - 1);
+ umem->chunk_mask = unaligned_chunks ? XSK_UNALIGNED_BUF_ADDR_MASK
+ : ~((u64)chunk_size - 1);
umem->size = size;
umem->headroom = headroom;
umem->chunk_size_nohr = chunk_size - headroom;
umem->npgs = size / PAGE_SIZE;
umem->pgs = NULL;
umem->user = NULL;
+ umem->flags = mr->flags;
INIT_LIST_HEAD(&umem->xsk_list);
spin_lock_init(&umem->xsk_list_lock);
@@ -362,17 +409,21 @@ static int xdp_umem_reg(struct xdp_umem *umem, struct xdp_umem_reg *mr)
if (err)
goto out_account;
- umem->pages = kcalloc(umem->npgs, sizeof(*umem->pages), GFP_KERNEL);
+ umem->pages = kvcalloc(umem->npgs, sizeof(*umem->pages),
+ GFP_KERNEL_ACCOUNT);
if (!umem->pages) {
err = -ENOMEM;
- goto out_account;
+ goto out_pin;
}
- for (i = 0; i < umem->npgs; i++)
- umem->pages[i].addr = page_address(umem->pgs[i]);
+ err = xdp_umem_map_pages(umem);
+ if (!err)
+ return 0;
- return 0;
+ kvfree(umem->pages);
+out_pin:
+ xdp_umem_unpin_pages(umem);
out_account:
xdp_umem_unaccount_pages(umem);
return err;
diff --git a/net/xdp/xsk.c b/net/xdp/xsk.c
index 59b57d708697..df600487a68d 100644
--- a/net/xdp/xsk.c
+++ b/net/xdp/xsk.c
@@ -31,6 +31,8 @@
#define TX_BATCH_SIZE 16
+static DEFINE_PER_CPU(struct list_head, xskmap_flush_list);
+
bool xsk_is_setup_for_bpf_map(struct xdp_sock *xs)
{
return READ_ONCE(xs->rx) && READ_ONCE(xs->umem) &&
@@ -39,37 +41,119 @@ bool xsk_is_setup_for_bpf_map(struct xdp_sock *xs)
bool xsk_umem_has_addrs(struct xdp_umem *umem, u32 cnt)
{
- return xskq_has_addrs(umem->fq, cnt);
+ return xskq_cons_has_entries(umem->fq, cnt);
}
EXPORT_SYMBOL(xsk_umem_has_addrs);
-u64 *xsk_umem_peek_addr(struct xdp_umem *umem, u64 *addr)
+bool xsk_umem_peek_addr(struct xdp_umem *umem, u64 *addr)
{
- return xskq_peek_addr(umem->fq, addr);
+ return xskq_cons_peek_addr(umem->fq, addr, umem);
}
EXPORT_SYMBOL(xsk_umem_peek_addr);
-void xsk_umem_discard_addr(struct xdp_umem *umem)
+void xsk_umem_release_addr(struct xdp_umem *umem)
{
- xskq_discard_addr(umem->fq);
+ xskq_cons_release(umem->fq);
+}
+EXPORT_SYMBOL(xsk_umem_release_addr);
+
+void xsk_set_rx_need_wakeup(struct xdp_umem *umem)
+{
+ if (umem->need_wakeup & XDP_WAKEUP_RX)
+ return;
+
+ umem->fq->ring->flags |= XDP_RING_NEED_WAKEUP;
+ umem->need_wakeup |= XDP_WAKEUP_RX;
+}
+EXPORT_SYMBOL(xsk_set_rx_need_wakeup);
+
+void xsk_set_tx_need_wakeup(struct xdp_umem *umem)
+{
+ struct xdp_sock *xs;
+
+ if (umem->need_wakeup & XDP_WAKEUP_TX)
+ return;
+
+ rcu_read_lock();
+ list_for_each_entry_rcu(xs, &umem->xsk_list, list) {
+ xs->tx->ring->flags |= XDP_RING_NEED_WAKEUP;
+ }
+ rcu_read_unlock();
+
+ umem->need_wakeup |= XDP_WAKEUP_TX;
+}
+EXPORT_SYMBOL(xsk_set_tx_need_wakeup);
+
+void xsk_clear_rx_need_wakeup(struct xdp_umem *umem)
+{
+ if (!(umem->need_wakeup & XDP_WAKEUP_RX))
+ return;
+
+ umem->fq->ring->flags &= ~XDP_RING_NEED_WAKEUP;
+ umem->need_wakeup &= ~XDP_WAKEUP_RX;
+}
+EXPORT_SYMBOL(xsk_clear_rx_need_wakeup);
+
+void xsk_clear_tx_need_wakeup(struct xdp_umem *umem)
+{
+ struct xdp_sock *xs;
+
+ if (!(umem->need_wakeup & XDP_WAKEUP_TX))
+ return;
+
+ rcu_read_lock();
+ list_for_each_entry_rcu(xs, &umem->xsk_list, list) {
+ xs->tx->ring->flags &= ~XDP_RING_NEED_WAKEUP;
+ }
+ rcu_read_unlock();
+
+ umem->need_wakeup &= ~XDP_WAKEUP_TX;
+}
+EXPORT_SYMBOL(xsk_clear_tx_need_wakeup);
+
+bool xsk_umem_uses_need_wakeup(struct xdp_umem *umem)
+{
+ return umem->flags & XDP_UMEM_USES_NEED_WAKEUP;
+}
+EXPORT_SYMBOL(xsk_umem_uses_need_wakeup);
+
+/* If a buffer crosses a page boundary, we need to do 2 memcpy's, one for
+ * each page. This is only required in copy mode.
+ */
+static void __xsk_rcv_memcpy(struct xdp_umem *umem, u64 addr, void *from_buf,
+ u32 len, u32 metalen)
+{
+ void *to_buf = xdp_umem_get_data(umem, addr);
+
+ addr = xsk_umem_add_offset_to_addr(addr);
+ if (xskq_cons_crosses_non_contig_pg(umem, addr, len + metalen)) {
+ void *next_pg_addr = umem->pages[(addr >> PAGE_SHIFT) + 1].addr;
+ u64 page_start = addr & ~(PAGE_SIZE - 1);
+ u64 first_len = PAGE_SIZE - (addr - page_start);
+
+ memcpy(to_buf, from_buf, first_len + metalen);
+ memcpy(next_pg_addr, from_buf + first_len, len - first_len);
+
+ return;
+ }
+
+ memcpy(to_buf, from_buf, len + metalen);
}
-EXPORT_SYMBOL(xsk_umem_discard_addr);
static int __xsk_rcv(struct xdp_sock *xs, struct xdp_buff *xdp, u32 len)
{
- void *to_buf, *from_buf;
+ u64 offset = xs->umem->headroom;
+ u64 addr, memcpy_addr;
+ void *from_buf;
u32 metalen;
- u64 addr;
int err;
- if (!xskq_peek_addr(xs->umem->fq, &addr) ||
+ if (!xskq_cons_peek_addr(xs->umem->fq, &addr, xs->umem) ||
len > xs->umem->chunk_size_nohr - XDP_PACKET_HEADROOM) {
xs->rx_dropped++;
return -ENOSPC;
}
- addr += xs->umem->headroom;
-
if (unlikely(xdp_data_meta_unsupported(xdp))) {
from_buf = xdp->data;
metalen = 0;
@@ -78,12 +162,14 @@ static int __xsk_rcv(struct xdp_sock *xs, struct xdp_buff *xdp, u32 len)
metalen = xdp->data - xdp->data_meta;
}
- to_buf = xdp_umem_get_data(xs->umem, addr);
- memcpy(to_buf, from_buf, len + metalen);
- addr += metalen;
- err = xskq_produce_batch_desc(xs->rx, addr, len);
+ memcpy_addr = xsk_umem_adjust_offset(xs->umem, addr, offset);
+ __xsk_rcv_memcpy(xs->umem, memcpy_addr, from_buf, len, metalen);
+
+ offset += metalen;
+ addr = xsk_umem_adjust_offset(xs->umem, addr, offset);
+ err = xskq_prod_reserve_desc(xs->rx, addr, len);
if (!err) {
- xskq_discard_addr(xs->umem->fq);
+ xskq_cons_release(xs->umem->fq);
xdp_return_buff(xdp);
return 0;
}
@@ -94,7 +180,7 @@ static int __xsk_rcv(struct xdp_sock *xs, struct xdp_buff *xdp, u32 len)
static int __xsk_rcv_zc(struct xdp_sock *xs, struct xdp_buff *xdp, u32 len)
{
- int err = xskq_produce_batch_desc(xs->rx, (u64)xdp->handle, len);
+ int err = xskq_prod_reserve_desc(xs->rx, xdp->handle, len);
if (err)
xs->rx_dropped++;
@@ -102,10 +188,23 @@ static int __xsk_rcv_zc(struct xdp_sock *xs, struct xdp_buff *xdp, u32 len)
return err;
}
-int xsk_rcv(struct xdp_sock *xs, struct xdp_buff *xdp)
+static bool xsk_is_bound(struct xdp_sock *xs)
+{
+ if (READ_ONCE(xs->state) == XSK_BOUND) {
+ /* Matches smp_wmb() in bind(). */
+ smp_rmb();
+ return true;
+ }
+ return false;
+}
+
+static int xsk_rcv(struct xdp_sock *xs, struct xdp_buff *xdp)
{
u32 len;
+ if (!xsk_is_bound(xs))
+ return -EINVAL;
+
if (xs->dev != xdp->rxq->dev || xs->queue_id != xdp->rxq->queue_index)
return -EINVAL;
@@ -115,16 +214,17 @@ int xsk_rcv(struct xdp_sock *xs, struct xdp_buff *xdp)
__xsk_rcv_zc(xs, xdp, len) : __xsk_rcv(xs, xdp, len);
}
-void xsk_flush(struct xdp_sock *xs)
+static void xsk_flush(struct xdp_sock *xs)
{
- xskq_produce_flush_desc(xs->rx);
- xs->sk.sk_data_ready(&xs->sk);
+ xskq_prod_submit(xs->rx);
+ sock_def_readable(&xs->sk);
}
int xsk_generic_rcv(struct xdp_sock *xs, struct xdp_buff *xdp)
{
u32 metalen = xdp->data - xdp->data_meta;
u32 len = xdp->data_end - xdp->data;
+ u64 offset = xs->umem->headroom;
void *buffer;
u64 addr;
int err;
@@ -136,23 +236,23 @@ int xsk_generic_rcv(struct xdp_sock *xs, struct xdp_buff *xdp)
goto out_unlock;
}
- if (!xskq_peek_addr(xs->umem->fq, &addr) ||
+ if (!xskq_cons_peek_addr(xs->umem->fq, &addr, xs->umem) ||
len > xs->umem->chunk_size_nohr - XDP_PACKET_HEADROOM) {
err = -ENOSPC;
goto out_drop;
}
- addr += xs->umem->headroom;
-
+ addr = xsk_umem_adjust_offset(xs->umem, addr, offset);
buffer = xdp_umem_get_data(xs->umem, addr);
memcpy(buffer, xdp->data_meta, len + metalen);
- addr += metalen;
- err = xskq_produce_batch_desc(xs->rx, addr, len);
+
+ addr = xsk_umem_adjust_offset(xs->umem, addr, metalen);
+ err = xskq_prod_reserve_desc(xs->rx, addr, len);
if (err)
goto out_drop;
- xskq_discard_addr(xs->umem->fq);
- xskq_produce_flush_desc(xs->rx);
+ xskq_cons_release(xs->umem->fq);
+ xskq_prod_submit(xs->rx);
spin_unlock_bh(&xs->rx_lock);
@@ -166,9 +266,35 @@ out_unlock:
return err;
}
+int __xsk_map_redirect(struct xdp_sock *xs, struct xdp_buff *xdp)
+{
+ struct list_head *flush_list = this_cpu_ptr(&xskmap_flush_list);
+ int err;
+
+ err = xsk_rcv(xs, xdp);
+ if (err)
+ return err;
+
+ if (!xs->flush_node.prev)
+ list_add(&xs->flush_node, flush_list);
+
+ return 0;
+}
+
+void __xsk_map_flush(void)
+{
+ struct list_head *flush_list = this_cpu_ptr(&xskmap_flush_list);
+ struct xdp_sock *xs, *tmp;
+
+ list_for_each_entry_safe(xs, tmp, flush_list, flush_node) {
+ xsk_flush(xs);
+ __list_del_clearprev(&xs->flush_node);
+ }
+}
+
void xsk_umem_complete_tx(struct xdp_umem *umem, u32 nb_entries)
{
- xskq_produce_flush_addr_n(umem->cq, nb_entries);
+ xskq_prod_submit_n(umem->cq, nb_entries);
}
EXPORT_SYMBOL(xsk_umem_complete_tx);
@@ -190,13 +316,18 @@ bool xsk_umem_consume_tx(struct xdp_umem *umem, struct xdp_desc *desc)
rcu_read_lock();
list_for_each_entry_rcu(xs, &umem->xsk_list, list) {
- if (!xskq_peek_desc(xs->tx, desc))
+ if (!xskq_cons_peek_desc(xs->tx, desc, umem))
continue;
- if (xskq_produce_addr_lazy(umem->cq, desc->addr))
+ /* This is the backpreassure mechanism for the Tx path.
+ * Reserve space in the completion queue and only proceed
+ * if there is space in it. This avoids having to implement
+ * any buffering in the Tx path.
+ */
+ if (xskq_prod_reserve_addr(umem->cq, desc->addr))
goto out;
- xskq_discard_desc(xs->tx);
+ xskq_cons_release(xs->tx);
rcu_read_unlock();
return true;
}
@@ -207,12 +338,21 @@ out:
}
EXPORT_SYMBOL(xsk_umem_consume_tx);
-static int xsk_zc_xmit(struct sock *sk)
+static int xsk_wakeup(struct xdp_sock *xs, u8 flags)
{
- struct xdp_sock *xs = xdp_sk(sk);
struct net_device *dev = xs->dev;
+ int err;
- return dev->netdev_ops->ndo_xsk_async_xmit(dev, xs->queue_id);
+ rcu_read_lock();
+ err = dev->netdev_ops->ndo_xsk_wakeup(dev, xs->queue_id, flags);
+ rcu_read_unlock();
+
+ return err;
+}
+
+static int xsk_zc_xmit(struct xdp_sock *xs)
+{
+ return xsk_wakeup(xs, XDP_WAKEUP_TX);
}
static void xsk_destruct_skb(struct sk_buff *skb)
@@ -222,17 +362,16 @@ static void xsk_destruct_skb(struct sk_buff *skb)
unsigned long flags;
spin_lock_irqsave(&xs->tx_completion_lock, flags);
- WARN_ON_ONCE(xskq_produce_addr(xs->umem->cq, addr));
+ xskq_prod_submit_addr(xs->umem->cq, addr);
spin_unlock_irqrestore(&xs->tx_completion_lock, flags);
sock_wfree(skb);
}
-static int xsk_generic_xmit(struct sock *sk, struct msghdr *m,
- size_t total_len)
+static int xsk_generic_xmit(struct sock *sk)
{
- u32 max_batch = TX_BATCH_SIZE;
struct xdp_sock *xs = xdp_sk(sk);
+ u32 max_batch = TX_BATCH_SIZE;
bool sent_frame = false;
struct xdp_desc desc;
struct sk_buff *skb;
@@ -243,7 +382,7 @@ static int xsk_generic_xmit(struct sock *sk, struct msghdr *m,
if (xs->queue_id >= xs->dev->real_num_tx_queues)
goto out;
- while (xskq_peek_desc(xs->tx, &desc)) {
+ while (xskq_cons_peek_desc(xs->tx, &desc, xs->umem)) {
char *buffer;
u64 addr;
u32 len;
@@ -264,7 +403,12 @@ static int xsk_generic_xmit(struct sock *sk, struct msghdr *m,
addr = desc.addr;
buffer = xdp_umem_get_data(xs->umem, addr);
err = skb_store_bits(skb, 0, buffer, len);
- if (unlikely(err) || xskq_reserve_addr(xs->umem->cq)) {
+ /* This is the backpreassure mechanism for the Tx path.
+ * Reserve space in the completion queue and only proceed
+ * if there is space in it. This avoids having to implement
+ * any buffering in the Tx path.
+ */
+ if (unlikely(err) || xskq_prod_reserve(xs->umem->cq)) {
kfree_skb(skb);
goto out;
}
@@ -272,11 +416,11 @@ static int xsk_generic_xmit(struct sock *sk, struct msghdr *m,
skb->dev = xs->dev;
skb->priority = sk->sk_priority;
skb->mark = sk->sk_mark;
- skb_shinfo(skb)->destructor_arg = (void *)(long)addr;
+ skb_shinfo(skb)->destructor_arg = (void *)(long)desc.addr;
skb->destructor = xsk_destruct_skb;
err = dev_direct_xmit(skb, xs->queue_id);
- xskq_discard_desc(xs->tx);
+ xskq_cons_release(xs->tx);
/* Ignore NET_XMIT_CN as packet might have been sent */
if (err == NET_XMIT_DROP || err == NETDEV_TX_BUSY) {
/* SKB completed but not sent */
@@ -295,35 +439,57 @@ out:
return err;
}
-static int xsk_sendmsg(struct socket *sock, struct msghdr *m, size_t total_len)
+static int __xsk_sendmsg(struct sock *sk)
{
- bool need_wait = !(m->msg_flags & MSG_DONTWAIT);
- struct sock *sk = sock->sk;
struct xdp_sock *xs = xdp_sk(sk);
- if (unlikely(!xs->dev))
- return -ENXIO;
if (unlikely(!(xs->dev->flags & IFF_UP)))
return -ENETDOWN;
if (unlikely(!xs->tx))
return -ENOBUFS;
- if (need_wait)
+
+ return xs->zc ? xsk_zc_xmit(xs) : xsk_generic_xmit(sk);
+}
+
+static int xsk_sendmsg(struct socket *sock, struct msghdr *m, size_t total_len)
+{
+ bool need_wait = !(m->msg_flags & MSG_DONTWAIT);
+ struct sock *sk = sock->sk;
+ struct xdp_sock *xs = xdp_sk(sk);
+
+ if (unlikely(!xsk_is_bound(xs)))
+ return -ENXIO;
+ if (unlikely(need_wait))
return -EOPNOTSUPP;
- return (xs->zc) ? xsk_zc_xmit(sk) : xsk_generic_xmit(sk, m, total_len);
+ return __xsk_sendmsg(sk);
}
-static unsigned int xsk_poll(struct file *file, struct socket *sock,
+static __poll_t xsk_poll(struct file *file, struct socket *sock,
struct poll_table_struct *wait)
{
- unsigned int mask = datagram_poll(file, sock, wait);
+ __poll_t mask = datagram_poll(file, sock, wait);
struct sock *sk = sock->sk;
struct xdp_sock *xs = xdp_sk(sk);
+ struct xdp_umem *umem;
- if (xs->rx && !xskq_empty_desc(xs->rx))
- mask |= POLLIN | POLLRDNORM;
- if (xs->tx && !xskq_full_desc(xs->tx))
- mask |= POLLOUT | POLLWRNORM;
+ if (unlikely(!xsk_is_bound(xs)))
+ return mask;
+
+ umem = xs->umem;
+
+ if (umem->need_wakeup) {
+ if (xs->zc)
+ xsk_wakeup(xs, umem->need_wakeup);
+ else
+ /* Poll needs to drive Tx also in copy mode */
+ __xsk_sendmsg(sk);
+ }
+
+ if (xs->rx && !xskq_prod_is_empty(xs->rx))
+ mask |= EPOLLIN | EPOLLRDNORM;
+ if (xs->tx && !xskq_cons_is_full(xs->tx))
+ mask |= EPOLLOUT | EPOLLWRNORM;
return mask;
}
@@ -342,7 +508,7 @@ static int xsk_init_queue(u32 entries, struct xsk_queue **queue,
/* Make sure queue is ready before it can be seen by others */
smp_wmb();
- *queue = q;
+ WRITE_ONCE(*queue, q);
return 0;
}
@@ -350,10 +516,9 @@ static void xsk_unbind_dev(struct xdp_sock *xs)
{
struct net_device *dev = xs->dev;
- if (!dev || xs->state != XSK_BOUND)
+ if (xs->state != XSK_BOUND)
return;
-
- xs->state = XSK_UNBOUND;
+ WRITE_ONCE(xs->state, XSK_UNBOUND);
/* Wait for driver to stop using the xdp socket. */
xdp_del_sk_umem(xs->umem, xs);
@@ -362,6 +527,52 @@ static void xsk_unbind_dev(struct xdp_sock *xs)
dev_put(dev);
}
+static struct xsk_map *xsk_get_map_list_entry(struct xdp_sock *xs,
+ struct xdp_sock ***map_entry)
+{
+ struct xsk_map *map = NULL;
+ struct xsk_map_node *node;
+
+ *map_entry = NULL;
+
+ spin_lock_bh(&xs->map_list_lock);
+ node = list_first_entry_or_null(&xs->map_list, struct xsk_map_node,
+ node);
+ if (node) {
+ WARN_ON(xsk_map_inc(node->map));
+ map = node->map;
+ *map_entry = node->map_entry;
+ }
+ spin_unlock_bh(&xs->map_list_lock);
+ return map;
+}
+
+static void xsk_delete_from_maps(struct xdp_sock *xs)
+{
+ /* This function removes the current XDP socket from all the
+ * maps it resides in. We need to take extra care here, due to
+ * the two locks involved. Each map has a lock synchronizing
+ * updates to the entries, and each socket has a lock that
+ * synchronizes access to the list of maps (map_list). For
+ * deadlock avoidance the locks need to be taken in the order
+ * "map lock"->"socket map list lock". We start off by
+ * accessing the socket map list, and take a reference to the
+ * map to guarantee existence between the
+ * xsk_get_map_list_entry() and xsk_map_try_sock_delete()
+ * calls. Then we ask the map to remove the socket, which
+ * tries to remove the socket from the map. Note that there
+ * might be updates to the map between
+ * xsk_get_map_list_entry() and xsk_map_try_sock_delete().
+ */
+ struct xdp_sock **map_entry = NULL;
+ struct xsk_map *map;
+
+ while ((map = xsk_get_map_list_entry(xs, &map_entry))) {
+ xsk_map_try_sock_delete(map, xs, map_entry);
+ xsk_map_put(map);
+ }
+}
+
static int xsk_release(struct socket *sock)
{
struct sock *sk = sock->sk;
@@ -381,7 +592,10 @@ static int xsk_release(struct socket *sock)
sock_prot_inuse_add(net, sk->sk_prot, -1);
local_bh_enable();
+ xsk_delete_from_maps(xs);
+ mutex_lock(&xs->mutex);
xsk_unbind_dev(xs);
+ mutex_unlock(&xs->mutex);
xskq_destroy(xs->rx);
xskq_destroy(xs->tx);
@@ -412,6 +626,24 @@ static struct socket *xsk_lookup_xsk_from_fd(int fd)
return sock;
}
+/* Check if umem pages are contiguous.
+ * If zero-copy mode, use the DMA address to do the page contiguity check
+ * For all other modes we use addr (kernel virtual address)
+ * Store the result in the low bits of addr.
+ */
+static void xsk_check_page_contiguity(struct xdp_umem *umem, u32 flags)
+{
+ struct xdp_umem_page *pgs = umem->pages;
+ int i, is_contig;
+
+ for (i = 0; i < umem->npgs - 1; i++) {
+ is_contig = (flags & XDP_ZEROCOPY) ?
+ (pgs[i].dma + PAGE_SIZE == pgs[i + 1].dma) :
+ (pgs[i].addr + PAGE_SIZE == pgs[i + 1].addr);
+ pgs[i].addr += is_contig << XSK_NEXT_PG_CONTIG_SHIFT;
+ }
+}
+
static int xsk_bind(struct socket *sock, struct sockaddr *addr, int addr_len)
{
struct sockaddr_xdp *sxdp = (struct sockaddr_xdp *)addr;
@@ -427,7 +659,8 @@ static int xsk_bind(struct socket *sock, struct sockaddr *addr, int addr_len)
return -EINVAL;
flags = sxdp->sxdp_flags;
- if (flags & ~(XDP_SHARED_UMEM | XDP_COPY | XDP_ZEROCOPY))
+ if (flags & ~(XDP_SHARED_UMEM | XDP_COPY | XDP_ZEROCOPY |
+ XDP_USE_NEED_WAKEUP))
return -EINVAL;
rtnl_lock();
@@ -454,7 +687,8 @@ static int xsk_bind(struct socket *sock, struct sockaddr *addr, int addr_len)
struct xdp_sock *umem_xs;
struct socket *sock;
- if ((flags & XDP_COPY) || (flags & XDP_ZEROCOPY)) {
+ if ((flags & XDP_COPY) || (flags & XDP_ZEROCOPY) ||
+ (flags & XDP_USE_NEED_WAKEUP)) {
/* Cannot specify flags for shared sockets. */
err = -EINVAL;
goto out_unlock;
@@ -473,19 +707,19 @@ static int xsk_bind(struct socket *sock, struct sockaddr *addr, int addr_len)
}
umem_xs = xdp_sk(sock->sk);
- if (!umem_xs->umem) {
- /* No umem to inherit. */
+ if (!xsk_is_bound(umem_xs)) {
err = -EBADF;
sockfd_put(sock);
goto out_unlock;
- } else if (umem_xs->dev != dev || umem_xs->queue_id != qid) {
+ }
+ if (umem_xs->dev != dev || umem_xs->queue_id != qid) {
err = -EINVAL;
sockfd_put(sock);
goto out_unlock;
}
xdp_get_umem(umem_xs->umem);
- xs->umem = umem_xs->umem;
+ WRITE_ONCE(xs->umem, umem_xs->umem);
sockfd_put(sock);
} else if (!xs->umem || !xdp_umem_validate_queues(xs->umem)) {
err = -EINVAL;
@@ -500,6 +734,8 @@ static int xsk_bind(struct socket *sock, struct sockaddr *addr, int addr_len)
err = xdp_umem_assign_dev(xs->umem, dev, qid, flags);
if (err)
goto out_unlock;
+
+ xsk_check_page_contiguity(xs->umem, flags);
}
xs->dev = dev;
@@ -510,16 +746,28 @@ static int xsk_bind(struct socket *sock, struct sockaddr *addr, int addr_len)
xdp_add_sk_umem(xs->umem, xs);
out_unlock:
- if (err)
+ if (err) {
dev_put(dev);
- else
- xs->state = XSK_BOUND;
+ } else {
+ /* Matches smp_rmb() in bind() for shared umem
+ * sockets, and xsk_is_bound().
+ */
+ smp_wmb();
+ WRITE_ONCE(xs->state, XSK_BOUND);
+ }
out_release:
mutex_unlock(&xs->mutex);
rtnl_unlock();
return err;
}
+struct xdp_umem_reg_v1 {
+ __u64 addr; /* Start of packet data area */
+ __u64 len; /* Length of packet data area */
+ __u32 chunk_size;
+ __u32 headroom;
+};
+
static int xsk_setsockopt(struct socket *sock, int level, int optname,
char __user *optval, unsigned int optlen)
{
@@ -549,15 +797,24 @@ static int xsk_setsockopt(struct socket *sock, int level, int optname,
}
q = (optname == XDP_TX_RING) ? &xs->tx : &xs->rx;
err = xsk_init_queue(entries, q, false);
+ if (!err && optname == XDP_TX_RING)
+ /* Tx needs to be explicitly woken up the first time */
+ xs->tx->ring->flags |= XDP_RING_NEED_WAKEUP;
mutex_unlock(&xs->mutex);
return err;
}
case XDP_UMEM_REG:
{
- struct xdp_umem_reg mr;
+ size_t mr_size = sizeof(struct xdp_umem_reg);
+ struct xdp_umem_reg mr = {};
struct xdp_umem *umem;
- if (copy_from_user(&mr, optval, sizeof(mr)))
+ if (optlen < sizeof(struct xdp_umem_reg_v1))
+ return -EINVAL;
+ else if (optlen < sizeof(mr))
+ mr_size = sizeof(struct xdp_umem_reg_v1);
+
+ if (copy_from_user(&mr, optval, mr_size))
return -EFAULT;
mutex_lock(&xs->mutex);
@@ -574,7 +831,7 @@ static int xsk_setsockopt(struct socket *sock, int level, int optname,
/* Make sure umem is ready before it can be seen by others */
smp_wmb();
- xs->umem = umem;
+ WRITE_ONCE(xs->umem, umem);
mutex_unlock(&xs->mutex);
return 0;
}
@@ -610,6 +867,20 @@ static int xsk_setsockopt(struct socket *sock, int level, int optname,
return -ENOPROTOOPT;
}
+static void xsk_enter_rxtx_offsets(struct xdp_ring_offset_v1 *ring)
+{
+ ring->producer = offsetof(struct xdp_rxtx_ring, ptrs.producer);
+ ring->consumer = offsetof(struct xdp_rxtx_ring, ptrs.consumer);
+ ring->desc = offsetof(struct xdp_rxtx_ring, desc);
+}
+
+static void xsk_enter_umem_offsets(struct xdp_ring_offset_v1 *ring)
+{
+ ring->producer = offsetof(struct xdp_umem_ring, ptrs.producer);
+ ring->consumer = offsetof(struct xdp_umem_ring, ptrs.consumer);
+ ring->desc = offsetof(struct xdp_umem_ring, desc);
+}
+
static int xsk_getsockopt(struct socket *sock, int level, int optname,
char __user *optval, int __user *optlen)
{
@@ -649,26 +920,49 @@ static int xsk_getsockopt(struct socket *sock, int level, int optname,
case XDP_MMAP_OFFSETS:
{
struct xdp_mmap_offsets off;
+ struct xdp_mmap_offsets_v1 off_v1;
+ bool flags_supported = true;
+ void *to_copy;
- if (len < sizeof(off))
+ if (len < sizeof(off_v1))
return -EINVAL;
+ else if (len < sizeof(off))
+ flags_supported = false;
+
+ if (flags_supported) {
+ /* xdp_ring_offset is identical to xdp_ring_offset_v1
+ * except for the flags field added to the end.
+ */
+ xsk_enter_rxtx_offsets((struct xdp_ring_offset_v1 *)
+ &off.rx);
+ xsk_enter_rxtx_offsets((struct xdp_ring_offset_v1 *)
+ &off.tx);
+ xsk_enter_umem_offsets((struct xdp_ring_offset_v1 *)
+ &off.fr);
+ xsk_enter_umem_offsets((struct xdp_ring_offset_v1 *)
+ &off.cr);
+ off.rx.flags = offsetof(struct xdp_rxtx_ring,
+ ptrs.flags);
+ off.tx.flags = offsetof(struct xdp_rxtx_ring,
+ ptrs.flags);
+ off.fr.flags = offsetof(struct xdp_umem_ring,
+ ptrs.flags);
+ off.cr.flags = offsetof(struct xdp_umem_ring,
+ ptrs.flags);
+
+ len = sizeof(off);
+ to_copy = &off;
+ } else {
+ xsk_enter_rxtx_offsets(&off_v1.rx);
+ xsk_enter_rxtx_offsets(&off_v1.tx);
+ xsk_enter_umem_offsets(&off_v1.fr);
+ xsk_enter_umem_offsets(&off_v1.cr);
+
+ len = sizeof(off_v1);
+ to_copy = &off_v1;
+ }
- off.rx.producer = offsetof(struct xdp_rxtx_ring, ptrs.producer);
- off.rx.consumer = offsetof(struct xdp_rxtx_ring, ptrs.consumer);
- off.rx.desc = offsetof(struct xdp_rxtx_ring, desc);
- off.tx.producer = offsetof(struct xdp_rxtx_ring, ptrs.producer);
- off.tx.consumer = offsetof(struct xdp_rxtx_ring, ptrs.consumer);
- off.tx.desc = offsetof(struct xdp_rxtx_ring, desc);
-
- off.fr.producer = offsetof(struct xdp_umem_ring, ptrs.producer);
- off.fr.consumer = offsetof(struct xdp_umem_ring, ptrs.consumer);
- off.fr.desc = offsetof(struct xdp_umem_ring, desc);
- off.cr.producer = offsetof(struct xdp_umem_ring, ptrs.producer);
- off.cr.consumer = offsetof(struct xdp_umem_ring, ptrs.consumer);
- off.cr.desc = offsetof(struct xdp_umem_ring, desc);
-
- len = sizeof(off);
- if (copy_to_user(optval, &off, len))
+ if (copy_to_user(optval, to_copy, len))
return -EFAULT;
if (put_user(len, optlen))
return -EFAULT;
@@ -713,7 +1007,7 @@ static int xsk_mmap(struct file *file, struct socket *sock,
unsigned long pfn;
struct page *qpg;
- if (xs->state != XSK_READY)
+ if (READ_ONCE(xs->state) != XSK_READY)
return -EBUSY;
if (offset == XDP_PGOFF_RX_RING) {
@@ -739,7 +1033,7 @@ static int xsk_mmap(struct file *file, struct socket *sock,
/* Matches the smp_wmb() in xsk_init_queue */
smp_rmb();
qpg = virt_to_head_page(q->ring);
- if (size > (PAGE_SIZE << compound_order(qpg)))
+ if (size > page_size(qpg))
return -EINVAL;
pfn = virt_to_phys(q->ring) >> PAGE_SHIFT;
@@ -855,6 +1149,9 @@ static int xsk_create(struct net *net, struct socket *sock, int protocol,
spin_lock_init(&xs->rx_lock);
spin_lock_init(&xs->tx_completion_lock);
+ INIT_LIST_HEAD(&xs->map_list);
+ spin_lock_init(&xs->map_list_lock);
+
mutex_lock(&net->xdp.lock);
sk_add_node_rcu(sk, &net->xdp.list);
mutex_unlock(&net->xdp.lock);
@@ -895,7 +1192,7 @@ static struct pernet_operations xsk_net_ops = {
static int __init xsk_init(void)
{
- int err;
+ int err, cpu;
err = proto_register(&xsk_proto, 0 /* no slab */);
if (err)
@@ -913,6 +1210,8 @@ static int __init xsk_init(void)
if (err)
goto out_pernet;
+ for_each_possible_cpu(cpu)
+ INIT_LIST_HEAD(&per_cpu(xskmap_flush_list, cpu));
return 0;
out_pernet:
diff --git a/net/xdp/xsk.h b/net/xdp/xsk.h
index ba8120610426..4cfd106bdb53 100644
--- a/net/xdp/xsk.h
+++ b/net/xdp/xsk.h
@@ -4,6 +4,19 @@
#ifndef XSK_H_
#define XSK_H_
+struct xdp_ring_offset_v1 {
+ __u64 producer;
+ __u64 consumer;
+ __u64 desc;
+};
+
+struct xdp_mmap_offsets_v1 {
+ struct xdp_ring_offset_v1 rx;
+ struct xdp_ring_offset_v1 tx;
+ struct xdp_ring_offset_v1 fr;
+ struct xdp_ring_offset_v1 cr;
+};
+
static inline struct xdp_sock *xdp_sk(struct sock *sk)
{
return (struct xdp_sock *)sk;
diff --git a/net/xdp/xsk_diag.c b/net/xdp/xsk_diag.c
index d5e06c8e0cbf..f59791ba43a0 100644
--- a/net/xdp/xsk_diag.c
+++ b/net/xdp/xsk_diag.c
@@ -56,7 +56,7 @@ static int xsk_diag_put_umem(const struct xdp_sock *xs, struct sk_buff *nlskb)
du.id = umem->id;
du.size = umem->size;
du.num_pages = umem->npgs;
- du.chunk_size = (__u32)(~umem->chunk_mask + 1);
+ du.chunk_size = umem->chunk_size_nohr + umem->headroom;
du.headroom = umem->headroom;
du.ifindex = umem->dev ? umem->dev->ifindex : 0;
du.queue_id = umem->queue_id;
@@ -97,6 +97,7 @@ static int xsk_diag_fill(struct sock *sk, struct sk_buff *nlskb,
msg->xdiag_ino = sk_ino;
sock_diag_save_cookie(sk, msg->xdiag_cookie);
+ mutex_lock(&xs->mutex);
if ((req->xdiag_show & XDP_SHOW_INFO) && xsk_diag_put_info(xs, nlskb))
goto out_nlmsg_trim;
@@ -117,10 +118,12 @@ static int xsk_diag_fill(struct sock *sk, struct sk_buff *nlskb,
sock_diag_put_meminfo(sk, nlskb, XDP_DIAG_MEMINFO))
goto out_nlmsg_trim;
+ mutex_unlock(&xs->mutex);
nlmsg_end(nlskb, nlh);
return 0;
out_nlmsg_trim:
+ mutex_unlock(&xs->mutex);
nlmsg_cancel(nlskb, nlh);
return -EMSGSIZE;
}
diff --git a/net/xdp/xsk_queue.c b/net/xdp/xsk_queue.c
index b66504592d9b..c90e9c1e3c63 100644
--- a/net/xdp/xsk_queue.c
+++ b/net/xdp/xsk_queue.c
@@ -18,14 +18,14 @@ void xskq_set_umem(struct xsk_queue *q, u64 size, u64 chunk_mask)
q->chunk_mask = chunk_mask;
}
-static u32 xskq_umem_get_ring_size(struct xsk_queue *q)
+static size_t xskq_get_ring_size(struct xsk_queue *q, bool umem_queue)
{
- return sizeof(struct xdp_umem_ring) + q->nentries * sizeof(u64);
-}
+ struct xdp_umem_ring *umem_ring;
+ struct xdp_rxtx_ring *rxtx_ring;
-static u32 xskq_rxtx_get_ring_size(struct xsk_queue *q)
-{
- return sizeof(struct xdp_ring) + q->nentries * sizeof(struct xdp_desc);
+ if (umem_queue)
+ return struct_size(umem_ring, desc, q->nentries);
+ return struct_size(rxtx_ring, desc, q->nentries);
}
struct xsk_queue *xskq_create(u32 nentries, bool umem_queue)
@@ -43,8 +43,7 @@ struct xsk_queue *xskq_create(u32 nentries, bool umem_queue)
gfp_flags = GFP_KERNEL | __GFP_ZERO | __GFP_NOWARN |
__GFP_COMP | __GFP_NORETRY;
- size = umem_queue ? xskq_umem_get_ring_size(q) :
- xskq_rxtx_get_ring_size(q);
+ size = xskq_get_ring_size(q, umem_queue);
q->ring = (struct xdp_ring *)__get_free_pages(gfp_flags,
get_order(size));
diff --git a/net/xdp/xsk_queue.h b/net/xdp/xsk_queue.h
index 909c5168ed0f..bec2af11853a 100644
--- a/net/xdp/xsk_queue.h
+++ b/net/xdp/xsk_queue.h
@@ -10,12 +10,10 @@
#include <linux/if_xdp.h>
#include <net/xdp_sock.h>
-#define RX_BATCH_SIZE 16
-#define LAZY_UPDATE_THRESHOLD 128
-
struct xdp_ring {
u32 producer ____cacheline_aligned_in_smp;
u32 consumer ____cacheline_aligned_in_smp;
+ u32 flags;
};
/* Used for the RX and TX queues for packets */
@@ -35,10 +33,8 @@ struct xsk_queue {
u64 size;
u32 ring_mask;
u32 nentries;
- u32 prod_head;
- u32 prod_tail;
- u32 cons_head;
- u32 cons_tail;
+ u32 cached_prod;
+ u32 cached_cons;
struct xdp_ring *ring;
u64 invalid_descs;
};
@@ -85,57 +81,116 @@ struct xsk_queue {
* now and again after circling through the ring.
*/
-/* Common functions operating for both RXTX and umem queues */
+/* The operations on the rings are the following:
+ *
+ * producer consumer
+ *
+ * RESERVE entries PEEK in the ring for entries
+ * WRITE data into the ring READ data from the ring
+ * SUBMIT entries RELEASE entries
+ *
+ * The producer reserves one or more entries in the ring. It can then
+ * fill in these entries and finally submit them so that they can be
+ * seen and read by the consumer.
+ *
+ * The consumer peeks into the ring to see if the producer has written
+ * any new entries. If so, the producer can then read these entries
+ * and when it is done reading them release them back to the producer
+ * so that the producer can use these slots to fill in new entries.
+ *
+ * The function names below reflect these operations.
+ */
+
+/* Functions that read and validate content from consumer rings. */
-static inline u64 xskq_nb_invalid_descs(struct xsk_queue *q)
+static inline bool xskq_cons_crosses_non_contig_pg(struct xdp_umem *umem,
+ u64 addr,
+ u64 length)
{
- return q ? q->invalid_descs : 0;
+ bool cross_pg = (addr & (PAGE_SIZE - 1)) + length > PAGE_SIZE;
+ bool next_pg_contig =
+ (unsigned long)umem->pages[(addr >> PAGE_SHIFT)].addr &
+ XSK_NEXT_PG_CONTIG_MASK;
+
+ return cross_pg && !next_pg_contig;
}
-static inline u32 xskq_nb_avail(struct xsk_queue *q, u32 dcnt)
+static inline bool xskq_cons_is_valid_unaligned(struct xsk_queue *q,
+ u64 addr,
+ u64 length,
+ struct xdp_umem *umem)
{
- u32 entries = q->prod_tail - q->cons_tail;
+ u64 base_addr = xsk_umem_extract_addr(addr);
- if (entries == 0) {
- /* Refresh the local pointer */
- q->prod_tail = READ_ONCE(q->ring->producer);
- entries = q->prod_tail - q->cons_tail;
+ addr = xsk_umem_add_offset_to_addr(addr);
+ if (base_addr >= q->size || addr >= q->size ||
+ xskq_cons_crosses_non_contig_pg(umem, addr, length)) {
+ q->invalid_descs++;
+ return false;
}
- return (entries > dcnt) ? dcnt : entries;
+ return true;
}
-static inline u32 xskq_nb_free(struct xsk_queue *q, u32 producer, u32 dcnt)
+static inline bool xskq_cons_is_valid_addr(struct xsk_queue *q, u64 addr)
{
- u32 free_entries = q->nentries - (producer - q->cons_tail);
-
- if (free_entries >= dcnt)
- return free_entries;
+ if (addr >= q->size) {
+ q->invalid_descs++;
+ return false;
+ }
- /* Refresh the local tail pointer */
- q->cons_tail = READ_ONCE(q->ring->consumer);
- return q->nentries - (producer - q->cons_tail);
+ return true;
}
-static inline bool xskq_has_addrs(struct xsk_queue *q, u32 cnt)
+static inline bool xskq_cons_read_addr(struct xsk_queue *q, u64 *addr,
+ struct xdp_umem *umem)
{
- u32 entries = q->prod_tail - q->cons_tail;
+ struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring;
- if (entries >= cnt)
- return true;
+ while (q->cached_cons != q->cached_prod) {
+ u32 idx = q->cached_cons & q->ring_mask;
- /* Refresh the local pointer. */
- q->prod_tail = READ_ONCE(q->ring->producer);
- entries = q->prod_tail - q->cons_tail;
+ *addr = ring->desc[idx] & q->chunk_mask;
- return entries >= cnt;
-}
+ if (umem->flags & XDP_UMEM_UNALIGNED_CHUNK_FLAG) {
+ if (xskq_cons_is_valid_unaligned(q, *addr,
+ umem->chunk_size_nohr,
+ umem))
+ return true;
+ goto out;
+ }
-/* UMEM queue */
+ if (xskq_cons_is_valid_addr(q, *addr))
+ return true;
-static inline bool xskq_is_valid_addr(struct xsk_queue *q, u64 addr)
+out:
+ q->cached_cons++;
+ }
+
+ return false;
+}
+
+static inline bool xskq_cons_is_valid_desc(struct xsk_queue *q,
+ struct xdp_desc *d,
+ struct xdp_umem *umem)
{
- if (addr >= q->size) {
+ if (umem->flags & XDP_UMEM_UNALIGNED_CHUNK_FLAG) {
+ if (!xskq_cons_is_valid_unaligned(q, d->addr, d->len, umem))
+ return false;
+
+ if (d->len > umem->chunk_size_nohr || d->options) {
+ q->invalid_descs++;
+ return false;
+ }
+
+ return true;
+ }
+
+ if (!xskq_cons_is_valid_addr(q, d->addr))
+ return false;
+
+ if (((d->addr + d->len) & q->chunk_mask) != (d->addr & q->chunk_mask) ||
+ d->options) {
q->invalid_descs++;
return false;
}
@@ -143,177 +198,184 @@ static inline bool xskq_is_valid_addr(struct xsk_queue *q, u64 addr)
return true;
}
-static inline u64 *xskq_validate_addr(struct xsk_queue *q, u64 *addr)
+static inline bool xskq_cons_read_desc(struct xsk_queue *q,
+ struct xdp_desc *desc,
+ struct xdp_umem *umem)
{
- while (q->cons_tail != q->cons_head) {
- struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring;
- unsigned int idx = q->cons_tail & q->ring_mask;
+ while (q->cached_cons != q->cached_prod) {
+ struct xdp_rxtx_ring *ring = (struct xdp_rxtx_ring *)q->ring;
+ u32 idx = q->cached_cons & q->ring_mask;
- *addr = READ_ONCE(ring->desc[idx]) & q->chunk_mask;
- if (xskq_is_valid_addr(q, *addr))
- return addr;
+ *desc = ring->desc[idx];
+ if (xskq_cons_is_valid_desc(q, desc, umem))
+ return true;
- q->cons_tail++;
+ q->cached_cons++;
}
- return NULL;
+ return false;
}
-static inline u64 *xskq_peek_addr(struct xsk_queue *q, u64 *addr)
-{
- if (q->cons_tail == q->cons_head) {
- smp_mb(); /* D, matches A */
- WRITE_ONCE(q->ring->consumer, q->cons_tail);
- q->cons_head = q->cons_tail + xskq_nb_avail(q, RX_BATCH_SIZE);
-
- /* Order consumer and data */
- smp_rmb();
- }
+/* Functions for consumers */
- return xskq_validate_addr(q, addr);
+static inline void __xskq_cons_release(struct xsk_queue *q)
+{
+ smp_mb(); /* D, matches A */
+ WRITE_ONCE(q->ring->consumer, q->cached_cons);
}
-static inline void xskq_discard_addr(struct xsk_queue *q)
+static inline void __xskq_cons_peek(struct xsk_queue *q)
{
- q->cons_tail++;
+ /* Refresh the local pointer */
+ q->cached_prod = READ_ONCE(q->ring->producer);
+ smp_rmb(); /* C, matches B */
}
-static inline int xskq_produce_addr(struct xsk_queue *q, u64 addr)
+static inline void xskq_cons_get_entries(struct xsk_queue *q)
{
- struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring;
+ __xskq_cons_release(q);
+ __xskq_cons_peek(q);
+}
- if (xskq_nb_free(q, q->prod_tail, 1) == 0)
- return -ENOSPC;
+static inline bool xskq_cons_has_entries(struct xsk_queue *q, u32 cnt)
+{
+ u32 entries = q->cached_prod - q->cached_cons;
- /* A, matches D */
- ring->desc[q->prod_tail++ & q->ring_mask] = addr;
+ if (entries >= cnt)
+ return true;
- /* Order producer and data */
- smp_wmb(); /* B, matches C */
+ __xskq_cons_peek(q);
+ entries = q->cached_prod - q->cached_cons;
- WRITE_ONCE(q->ring->producer, q->prod_tail);
- return 0;
+ return entries >= cnt;
}
-static inline int xskq_produce_addr_lazy(struct xsk_queue *q, u64 addr)
+static inline bool xskq_cons_peek_addr(struct xsk_queue *q, u64 *addr,
+ struct xdp_umem *umem)
{
- struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring;
-
- if (xskq_nb_free(q, q->prod_head, LAZY_UPDATE_THRESHOLD) == 0)
- return -ENOSPC;
-
- /* A, matches D */
- ring->desc[q->prod_head++ & q->ring_mask] = addr;
- return 0;
+ if (q->cached_prod == q->cached_cons)
+ xskq_cons_get_entries(q);
+ return xskq_cons_read_addr(q, addr, umem);
}
-static inline void xskq_produce_flush_addr_n(struct xsk_queue *q,
- u32 nb_entries)
+static inline bool xskq_cons_peek_desc(struct xsk_queue *q,
+ struct xdp_desc *desc,
+ struct xdp_umem *umem)
{
- /* Order producer and data */
- smp_wmb(); /* B, matches C */
-
- q->prod_tail += nb_entries;
- WRITE_ONCE(q->ring->producer, q->prod_tail);
+ if (q->cached_prod == q->cached_cons)
+ xskq_cons_get_entries(q);
+ return xskq_cons_read_desc(q, desc, umem);
}
-static inline int xskq_reserve_addr(struct xsk_queue *q)
+static inline void xskq_cons_release(struct xsk_queue *q)
{
- if (xskq_nb_free(q, q->prod_head, 1) == 0)
- return -ENOSPC;
+ /* To improve performance, only update local state here.
+ * Reflect this to global state when we get new entries
+ * from the ring in xskq_cons_get_entries().
+ */
+ q->cached_cons++;
+}
- /* A, matches D */
- q->prod_head++;
- return 0;
+static inline bool xskq_cons_is_full(struct xsk_queue *q)
+{
+ /* No barriers needed since data is not accessed */
+ return READ_ONCE(q->ring->producer) - READ_ONCE(q->ring->consumer) ==
+ q->nentries;
}
-/* Rx/Tx queue */
+/* Functions for producers */
-static inline bool xskq_is_valid_desc(struct xsk_queue *q, struct xdp_desc *d)
+static inline bool xskq_prod_is_full(struct xsk_queue *q)
{
- if (!xskq_is_valid_addr(q, d->addr))
- return false;
+ u32 free_entries = q->nentries - (q->cached_prod - q->cached_cons);
- if (((d->addr + d->len) & q->chunk_mask) != (d->addr & q->chunk_mask) ||
- d->options) {
- q->invalid_descs++;
+ if (free_entries)
return false;
- }
- return true;
+ /* Refresh the local tail pointer */
+ q->cached_cons = READ_ONCE(q->ring->consumer);
+ free_entries = q->nentries - (q->cached_prod - q->cached_cons);
+
+ return !free_entries;
}
-static inline struct xdp_desc *xskq_validate_desc(struct xsk_queue *q,
- struct xdp_desc *desc)
+static inline int xskq_prod_reserve(struct xsk_queue *q)
{
- while (q->cons_tail != q->cons_head) {
- struct xdp_rxtx_ring *ring = (struct xdp_rxtx_ring *)q->ring;
- unsigned int idx = q->cons_tail & q->ring_mask;
-
- *desc = READ_ONCE(ring->desc[idx]);
- if (xskq_is_valid_desc(q, desc))
- return desc;
-
- q->cons_tail++;
- }
+ if (xskq_prod_is_full(q))
+ return -ENOSPC;
- return NULL;
+ /* A, matches D */
+ q->cached_prod++;
+ return 0;
}
-static inline struct xdp_desc *xskq_peek_desc(struct xsk_queue *q,
- struct xdp_desc *desc)
+static inline int xskq_prod_reserve_addr(struct xsk_queue *q, u64 addr)
{
- if (q->cons_tail == q->cons_head) {
- smp_mb(); /* D, matches A */
- WRITE_ONCE(q->ring->consumer, q->cons_tail);
- q->cons_head = q->cons_tail + xskq_nb_avail(q, RX_BATCH_SIZE);
-
- /* Order consumer and data */
- smp_rmb(); /* C, matches B */
- }
+ struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring;
- return xskq_validate_desc(q, desc);
-}
+ if (xskq_prod_is_full(q))
+ return -ENOSPC;
-static inline void xskq_discard_desc(struct xsk_queue *q)
-{
- q->cons_tail++;
+ /* A, matches D */
+ ring->desc[q->cached_prod++ & q->ring_mask] = addr;
+ return 0;
}
-static inline int xskq_produce_batch_desc(struct xsk_queue *q,
- u64 addr, u32 len)
+static inline int xskq_prod_reserve_desc(struct xsk_queue *q,
+ u64 addr, u32 len)
{
struct xdp_rxtx_ring *ring = (struct xdp_rxtx_ring *)q->ring;
- unsigned int idx;
+ u32 idx;
- if (xskq_nb_free(q, q->prod_head, 1) == 0)
+ if (xskq_prod_is_full(q))
return -ENOSPC;
/* A, matches D */
- idx = (q->prod_head++) & q->ring_mask;
+ idx = q->cached_prod++ & q->ring_mask;
ring->desc[idx].addr = addr;
ring->desc[idx].len = len;
return 0;
}
-static inline void xskq_produce_flush_desc(struct xsk_queue *q)
+static inline void __xskq_prod_submit(struct xsk_queue *q, u32 idx)
{
- /* Order producer and data */
smp_wmb(); /* B, matches C */
- q->prod_tail = q->prod_head;
- WRITE_ONCE(q->ring->producer, q->prod_tail);
+ WRITE_ONCE(q->ring->producer, idx);
+}
+
+static inline void xskq_prod_submit(struct xsk_queue *q)
+{
+ __xskq_prod_submit(q, q->cached_prod);
+}
+
+static inline void xskq_prod_submit_addr(struct xsk_queue *q, u64 addr)
+{
+ struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring;
+ u32 idx = q->ring->producer;
+
+ ring->desc[idx++ & q->ring_mask] = addr;
+
+ __xskq_prod_submit(q, idx);
}
-static inline bool xskq_full_desc(struct xsk_queue *q)
+static inline void xskq_prod_submit_n(struct xsk_queue *q, u32 nb_entries)
{
- return xskq_nb_avail(q, q->nentries) == q->nentries;
+ __xskq_prod_submit(q, q->ring->producer + nb_entries);
}
-static inline bool xskq_empty_desc(struct xsk_queue *q)
+static inline bool xskq_prod_is_empty(struct xsk_queue *q)
{
- return xskq_nb_free(q, q->prod_tail, q->nentries) == q->nentries;
+ /* No barriers needed since data is not accessed */
+ return READ_ONCE(q->ring->consumer) == READ_ONCE(q->ring->producer);
+}
+
+/* For both producers and consumers */
+
+static inline u64 xskq_nb_invalid_descs(struct xsk_queue *q)
+{
+ return q ? q->invalid_descs : 0;
}
void xskq_set_umem(struct xsk_queue *q, u64 size, u64 chunk_mask);
OpenPOWER on IntegriCloud