diff options
Diffstat (limited to 'net/xdp/xsk_queue.h')
-rw-r--r-- | net/xdp/xsk_queue.h | 344 |
1 files changed, 203 insertions, 141 deletions
diff --git a/net/xdp/xsk_queue.h b/net/xdp/xsk_queue.h index 909c5168ed0f..bec2af11853a 100644 --- a/net/xdp/xsk_queue.h +++ b/net/xdp/xsk_queue.h @@ -10,12 +10,10 @@ #include <linux/if_xdp.h> #include <net/xdp_sock.h> -#define RX_BATCH_SIZE 16 -#define LAZY_UPDATE_THRESHOLD 128 - struct xdp_ring { u32 producer ____cacheline_aligned_in_smp; u32 consumer ____cacheline_aligned_in_smp; + u32 flags; }; /* Used for the RX and TX queues for packets */ @@ -35,10 +33,8 @@ struct xsk_queue { u64 size; u32 ring_mask; u32 nentries; - u32 prod_head; - u32 prod_tail; - u32 cons_head; - u32 cons_tail; + u32 cached_prod; + u32 cached_cons; struct xdp_ring *ring; u64 invalid_descs; }; @@ -85,57 +81,116 @@ struct xsk_queue { * now and again after circling through the ring. */ -/* Common functions operating for both RXTX and umem queues */ +/* The operations on the rings are the following: + * + * producer consumer + * + * RESERVE entries PEEK in the ring for entries + * WRITE data into the ring READ data from the ring + * SUBMIT entries RELEASE entries + * + * The producer reserves one or more entries in the ring. It can then + * fill in these entries and finally submit them so that they can be + * seen and read by the consumer. + * + * The consumer peeks into the ring to see if the producer has written + * any new entries. If so, the producer can then read these entries + * and when it is done reading them release them back to the producer + * so that the producer can use these slots to fill in new entries. + * + * The function names below reflect these operations. + */ + +/* Functions that read and validate content from consumer rings. */ -static inline u64 xskq_nb_invalid_descs(struct xsk_queue *q) +static inline bool xskq_cons_crosses_non_contig_pg(struct xdp_umem *umem, + u64 addr, + u64 length) { - return q ? q->invalid_descs : 0; + bool cross_pg = (addr & (PAGE_SIZE - 1)) + length > PAGE_SIZE; + bool next_pg_contig = + (unsigned long)umem->pages[(addr >> PAGE_SHIFT)].addr & + XSK_NEXT_PG_CONTIG_MASK; + + return cross_pg && !next_pg_contig; } -static inline u32 xskq_nb_avail(struct xsk_queue *q, u32 dcnt) +static inline bool xskq_cons_is_valid_unaligned(struct xsk_queue *q, + u64 addr, + u64 length, + struct xdp_umem *umem) { - u32 entries = q->prod_tail - q->cons_tail; + u64 base_addr = xsk_umem_extract_addr(addr); - if (entries == 0) { - /* Refresh the local pointer */ - q->prod_tail = READ_ONCE(q->ring->producer); - entries = q->prod_tail - q->cons_tail; + addr = xsk_umem_add_offset_to_addr(addr); + if (base_addr >= q->size || addr >= q->size || + xskq_cons_crosses_non_contig_pg(umem, addr, length)) { + q->invalid_descs++; + return false; } - return (entries > dcnt) ? dcnt : entries; + return true; } -static inline u32 xskq_nb_free(struct xsk_queue *q, u32 producer, u32 dcnt) +static inline bool xskq_cons_is_valid_addr(struct xsk_queue *q, u64 addr) { - u32 free_entries = q->nentries - (producer - q->cons_tail); - - if (free_entries >= dcnt) - return free_entries; + if (addr >= q->size) { + q->invalid_descs++; + return false; + } - /* Refresh the local tail pointer */ - q->cons_tail = READ_ONCE(q->ring->consumer); - return q->nentries - (producer - q->cons_tail); + return true; } -static inline bool xskq_has_addrs(struct xsk_queue *q, u32 cnt) +static inline bool xskq_cons_read_addr(struct xsk_queue *q, u64 *addr, + struct xdp_umem *umem) { - u32 entries = q->prod_tail - q->cons_tail; + struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring; - if (entries >= cnt) - return true; + while (q->cached_cons != q->cached_prod) { + u32 idx = q->cached_cons & q->ring_mask; - /* Refresh the local pointer. */ - q->prod_tail = READ_ONCE(q->ring->producer); - entries = q->prod_tail - q->cons_tail; + *addr = ring->desc[idx] & q->chunk_mask; - return entries >= cnt; -} + if (umem->flags & XDP_UMEM_UNALIGNED_CHUNK_FLAG) { + if (xskq_cons_is_valid_unaligned(q, *addr, + umem->chunk_size_nohr, + umem)) + return true; + goto out; + } -/* UMEM queue */ + if (xskq_cons_is_valid_addr(q, *addr)) + return true; -static inline bool xskq_is_valid_addr(struct xsk_queue *q, u64 addr) +out: + q->cached_cons++; + } + + return false; +} + +static inline bool xskq_cons_is_valid_desc(struct xsk_queue *q, + struct xdp_desc *d, + struct xdp_umem *umem) { - if (addr >= q->size) { + if (umem->flags & XDP_UMEM_UNALIGNED_CHUNK_FLAG) { + if (!xskq_cons_is_valid_unaligned(q, d->addr, d->len, umem)) + return false; + + if (d->len > umem->chunk_size_nohr || d->options) { + q->invalid_descs++; + return false; + } + + return true; + } + + if (!xskq_cons_is_valid_addr(q, d->addr)) + return false; + + if (((d->addr + d->len) & q->chunk_mask) != (d->addr & q->chunk_mask) || + d->options) { q->invalid_descs++; return false; } @@ -143,177 +198,184 @@ static inline bool xskq_is_valid_addr(struct xsk_queue *q, u64 addr) return true; } -static inline u64 *xskq_validate_addr(struct xsk_queue *q, u64 *addr) +static inline bool xskq_cons_read_desc(struct xsk_queue *q, + struct xdp_desc *desc, + struct xdp_umem *umem) { - while (q->cons_tail != q->cons_head) { - struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring; - unsigned int idx = q->cons_tail & q->ring_mask; + while (q->cached_cons != q->cached_prod) { + struct xdp_rxtx_ring *ring = (struct xdp_rxtx_ring *)q->ring; + u32 idx = q->cached_cons & q->ring_mask; - *addr = READ_ONCE(ring->desc[idx]) & q->chunk_mask; - if (xskq_is_valid_addr(q, *addr)) - return addr; + *desc = ring->desc[idx]; + if (xskq_cons_is_valid_desc(q, desc, umem)) + return true; - q->cons_tail++; + q->cached_cons++; } - return NULL; + return false; } -static inline u64 *xskq_peek_addr(struct xsk_queue *q, u64 *addr) -{ - if (q->cons_tail == q->cons_head) { - smp_mb(); /* D, matches A */ - WRITE_ONCE(q->ring->consumer, q->cons_tail); - q->cons_head = q->cons_tail + xskq_nb_avail(q, RX_BATCH_SIZE); - - /* Order consumer and data */ - smp_rmb(); - } +/* Functions for consumers */ - return xskq_validate_addr(q, addr); +static inline void __xskq_cons_release(struct xsk_queue *q) +{ + smp_mb(); /* D, matches A */ + WRITE_ONCE(q->ring->consumer, q->cached_cons); } -static inline void xskq_discard_addr(struct xsk_queue *q) +static inline void __xskq_cons_peek(struct xsk_queue *q) { - q->cons_tail++; + /* Refresh the local pointer */ + q->cached_prod = READ_ONCE(q->ring->producer); + smp_rmb(); /* C, matches B */ } -static inline int xskq_produce_addr(struct xsk_queue *q, u64 addr) +static inline void xskq_cons_get_entries(struct xsk_queue *q) { - struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring; + __xskq_cons_release(q); + __xskq_cons_peek(q); +} - if (xskq_nb_free(q, q->prod_tail, 1) == 0) - return -ENOSPC; +static inline bool xskq_cons_has_entries(struct xsk_queue *q, u32 cnt) +{ + u32 entries = q->cached_prod - q->cached_cons; - /* A, matches D */ - ring->desc[q->prod_tail++ & q->ring_mask] = addr; + if (entries >= cnt) + return true; - /* Order producer and data */ - smp_wmb(); /* B, matches C */ + __xskq_cons_peek(q); + entries = q->cached_prod - q->cached_cons; - WRITE_ONCE(q->ring->producer, q->prod_tail); - return 0; + return entries >= cnt; } -static inline int xskq_produce_addr_lazy(struct xsk_queue *q, u64 addr) +static inline bool xskq_cons_peek_addr(struct xsk_queue *q, u64 *addr, + struct xdp_umem *umem) { - struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring; - - if (xskq_nb_free(q, q->prod_head, LAZY_UPDATE_THRESHOLD) == 0) - return -ENOSPC; - - /* A, matches D */ - ring->desc[q->prod_head++ & q->ring_mask] = addr; - return 0; + if (q->cached_prod == q->cached_cons) + xskq_cons_get_entries(q); + return xskq_cons_read_addr(q, addr, umem); } -static inline void xskq_produce_flush_addr_n(struct xsk_queue *q, - u32 nb_entries) +static inline bool xskq_cons_peek_desc(struct xsk_queue *q, + struct xdp_desc *desc, + struct xdp_umem *umem) { - /* Order producer and data */ - smp_wmb(); /* B, matches C */ - - q->prod_tail += nb_entries; - WRITE_ONCE(q->ring->producer, q->prod_tail); + if (q->cached_prod == q->cached_cons) + xskq_cons_get_entries(q); + return xskq_cons_read_desc(q, desc, umem); } -static inline int xskq_reserve_addr(struct xsk_queue *q) +static inline void xskq_cons_release(struct xsk_queue *q) { - if (xskq_nb_free(q, q->prod_head, 1) == 0) - return -ENOSPC; + /* To improve performance, only update local state here. + * Reflect this to global state when we get new entries + * from the ring in xskq_cons_get_entries(). + */ + q->cached_cons++; +} - /* A, matches D */ - q->prod_head++; - return 0; +static inline bool xskq_cons_is_full(struct xsk_queue *q) +{ + /* No barriers needed since data is not accessed */ + return READ_ONCE(q->ring->producer) - READ_ONCE(q->ring->consumer) == + q->nentries; } -/* Rx/Tx queue */ +/* Functions for producers */ -static inline bool xskq_is_valid_desc(struct xsk_queue *q, struct xdp_desc *d) +static inline bool xskq_prod_is_full(struct xsk_queue *q) { - if (!xskq_is_valid_addr(q, d->addr)) - return false; + u32 free_entries = q->nentries - (q->cached_prod - q->cached_cons); - if (((d->addr + d->len) & q->chunk_mask) != (d->addr & q->chunk_mask) || - d->options) { - q->invalid_descs++; + if (free_entries) return false; - } - return true; + /* Refresh the local tail pointer */ + q->cached_cons = READ_ONCE(q->ring->consumer); + free_entries = q->nentries - (q->cached_prod - q->cached_cons); + + return !free_entries; } -static inline struct xdp_desc *xskq_validate_desc(struct xsk_queue *q, - struct xdp_desc *desc) +static inline int xskq_prod_reserve(struct xsk_queue *q) { - while (q->cons_tail != q->cons_head) { - struct xdp_rxtx_ring *ring = (struct xdp_rxtx_ring *)q->ring; - unsigned int idx = q->cons_tail & q->ring_mask; - - *desc = READ_ONCE(ring->desc[idx]); - if (xskq_is_valid_desc(q, desc)) - return desc; - - q->cons_tail++; - } + if (xskq_prod_is_full(q)) + return -ENOSPC; - return NULL; + /* A, matches D */ + q->cached_prod++; + return 0; } -static inline struct xdp_desc *xskq_peek_desc(struct xsk_queue *q, - struct xdp_desc *desc) +static inline int xskq_prod_reserve_addr(struct xsk_queue *q, u64 addr) { - if (q->cons_tail == q->cons_head) { - smp_mb(); /* D, matches A */ - WRITE_ONCE(q->ring->consumer, q->cons_tail); - q->cons_head = q->cons_tail + xskq_nb_avail(q, RX_BATCH_SIZE); - - /* Order consumer and data */ - smp_rmb(); /* C, matches B */ - } + struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring; - return xskq_validate_desc(q, desc); -} + if (xskq_prod_is_full(q)) + return -ENOSPC; -static inline void xskq_discard_desc(struct xsk_queue *q) -{ - q->cons_tail++; + /* A, matches D */ + ring->desc[q->cached_prod++ & q->ring_mask] = addr; + return 0; } -static inline int xskq_produce_batch_desc(struct xsk_queue *q, - u64 addr, u32 len) +static inline int xskq_prod_reserve_desc(struct xsk_queue *q, + u64 addr, u32 len) { struct xdp_rxtx_ring *ring = (struct xdp_rxtx_ring *)q->ring; - unsigned int idx; + u32 idx; - if (xskq_nb_free(q, q->prod_head, 1) == 0) + if (xskq_prod_is_full(q)) return -ENOSPC; /* A, matches D */ - idx = (q->prod_head++) & q->ring_mask; + idx = q->cached_prod++ & q->ring_mask; ring->desc[idx].addr = addr; ring->desc[idx].len = len; return 0; } -static inline void xskq_produce_flush_desc(struct xsk_queue *q) +static inline void __xskq_prod_submit(struct xsk_queue *q, u32 idx) { - /* Order producer and data */ smp_wmb(); /* B, matches C */ - q->prod_tail = q->prod_head; - WRITE_ONCE(q->ring->producer, q->prod_tail); + WRITE_ONCE(q->ring->producer, idx); +} + +static inline void xskq_prod_submit(struct xsk_queue *q) +{ + __xskq_prod_submit(q, q->cached_prod); +} + +static inline void xskq_prod_submit_addr(struct xsk_queue *q, u64 addr) +{ + struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring; + u32 idx = q->ring->producer; + + ring->desc[idx++ & q->ring_mask] = addr; + + __xskq_prod_submit(q, idx); } -static inline bool xskq_full_desc(struct xsk_queue *q) +static inline void xskq_prod_submit_n(struct xsk_queue *q, u32 nb_entries) { - return xskq_nb_avail(q, q->nentries) == q->nentries; + __xskq_prod_submit(q, q->ring->producer + nb_entries); } -static inline bool xskq_empty_desc(struct xsk_queue *q) +static inline bool xskq_prod_is_empty(struct xsk_queue *q) { - return xskq_nb_free(q, q->prod_tail, q->nentries) == q->nentries; + /* No barriers needed since data is not accessed */ + return READ_ONCE(q->ring->consumer) == READ_ONCE(q->ring->producer); +} + +/* For both producers and consumers */ + +static inline u64 xskq_nb_invalid_descs(struct xsk_queue *q) +{ + return q ? q->invalid_descs : 0; } void xskq_set_umem(struct xsk_queue *q, u64 size, u64 chunk_mask); |