aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--net/xdp/xsk_queue.h56
-rw-r--r--tools/lib/bpf/libbpf_util.h30
-rw-r--r--tools/lib/bpf/xsk.h22
3 files changed, 98 insertions, 10 deletions
diff --git a/net/xdp/xsk_queue.h b/net/xdp/xsk_queue.h
index 610c0bdc0c2b..88b9ae24658d 100644
--- a/net/xdp/xsk_queue.h
+++ b/net/xdp/xsk_queue.h
@@ -43,6 +43,48 @@ struct xsk_queue {
u64 invalid_descs;
};
+/* The structure of the shared state of the rings are the same as the
+ * ring buffer in kernel/events/ring_buffer.c. For the Rx and completion
+ * ring, the kernel is the producer and user space is the consumer. For
+ * the Tx and fill rings, the kernel is the consumer and user space is
+ * the producer.
+ *
+ * producer consumer
+ *
+ * if (LOAD ->consumer) { LOAD ->producer
+ * (A) smp_rmb() (C)
+ * STORE $data LOAD $data
+ * smp_wmb() (B) smp_mb() (D)
+ * STORE ->producer STORE ->consumer
+ * }
+ *
+ * (A) pairs with (D), and (B) pairs with (C).
+ *
+ * Starting with (B), it protects the data from being written after
+ * the producer pointer. If this barrier was missing, the consumer
+ * could observe the producer pointer being set and thus load the data
+ * before the producer has written the new data. The consumer would in
+ * this case load the old data.
+ *
+ * (C) protects the consumer from speculatively loading the data before
+ * the producer pointer actually has been read. If we do not have this
+ * barrier, some architectures could load old data as speculative loads
+ * are not discarded as the CPU does not know there is a dependency
+ * between ->producer and data.
+ *
+ * (A) is a control dependency that separates the load of ->consumer
+ * from the stores of $data. In case ->consumer indicates there is no
+ * room in the buffer to store $data we do not. So no barrier is needed.
+ *
+ * (D) protects the load of the data to be observed to happen after the
+ * store of the consumer pointer. If we did not have this memory
+ * barrier, the producer could observe the consumer pointer being set
+ * and overwrite the data with a new value before the consumer got the
+ * chance to read the old value. The consumer would thus miss reading
+ * the old entry and very likely read the new entry twice, once right
+ * now and again after circling through the ring.
+ */
+
/* Common functions operating for both RXTX and umem queues */
static inline u64 xskq_nb_invalid_descs(struct xsk_queue *q)
@@ -106,6 +148,7 @@ static inline u64 *xskq_validate_addr(struct xsk_queue *q, u64 *addr)
static inline u64 *xskq_peek_addr(struct xsk_queue *q, u64 *addr)
{
if (q->cons_tail == q->cons_head) {
+ smp_mb(); /* D, matches A */
WRITE_ONCE(q->ring->consumer, q->cons_tail);
q->cons_head = q->cons_tail + xskq_nb_avail(q, RX_BATCH_SIZE);
@@ -128,10 +171,11 @@ static inline int xskq_produce_addr(struct xsk_queue *q, u64 addr)
if (xskq_nb_free(q, q->prod_tail, 1) == 0)
return -ENOSPC;
+ /* A, matches D */
ring->desc[q->prod_tail++ & q->ring_mask] = addr;
/* Order producer and data */
- smp_wmb();
+ smp_wmb(); /* B, matches C */
WRITE_ONCE(q->ring->producer, q->prod_tail);
return 0;
@@ -144,6 +188,7 @@ static inline int xskq_produce_addr_lazy(struct xsk_queue *q, u64 addr)
if (xskq_nb_free(q, q->prod_head, LAZY_UPDATE_THRESHOLD) == 0)
return -ENOSPC;
+ /* A, matches D */
ring->desc[q->prod_head++ & q->ring_mask] = addr;
return 0;
}
@@ -152,7 +197,7 @@ static inline void xskq_produce_flush_addr_n(struct xsk_queue *q,
u32 nb_entries)
{
/* Order producer and data */
- smp_wmb();
+ smp_wmb(); /* B, matches C */
q->prod_tail += nb_entries;
WRITE_ONCE(q->ring->producer, q->prod_tail);
@@ -163,6 +208,7 @@ static inline int xskq_reserve_addr(struct xsk_queue *q)
if (xskq_nb_free(q, q->prod_head, 1) == 0)
return -ENOSPC;
+ /* A, matches D */
q->prod_head++;
return 0;
}
@@ -204,11 +250,12 @@ static inline struct xdp_desc *xskq_peek_desc(struct xsk_queue *q,
struct xdp_desc *desc)
{
if (q->cons_tail == q->cons_head) {
+ smp_mb(); /* D, matches A */
WRITE_ONCE(q->ring->consumer, q->cons_tail);
q->cons_head = q->cons_tail + xskq_nb_avail(q, RX_BATCH_SIZE);
/* Order consumer and data */
- smp_rmb();
+ smp_rmb(); /* C, matches B */
}
return xskq_validate_desc(q, desc);
@@ -228,6 +275,7 @@ static inline int xskq_produce_batch_desc(struct xsk_queue *q,
if (xskq_nb_free(q, q->prod_head, 1) == 0)
return -ENOSPC;
+ /* A, matches D */
idx = (q->prod_head++) & q->ring_mask;
ring->desc[idx].addr = addr;
ring->desc[idx].len = len;
@@ -238,7 +286,7 @@ static inline int xskq_produce_batch_desc(struct xsk_queue *q,
static inline void xskq_produce_flush_desc(struct xsk_queue *q)
{
/* Order producer and data */
- smp_wmb();
+ smp_wmb(); /* B, matches C */
q->prod_tail = q->prod_head,
WRITE_ONCE(q->ring->producer, q->prod_tail);
diff --git a/tools/lib/bpf/libbpf_util.h b/tools/lib/bpf/libbpf_util.h
index 81ecda0cb9c9..172b707e007b 100644
--- a/tools/lib/bpf/libbpf_util.h
+++ b/tools/lib/bpf/libbpf_util.h
@@ -23,6 +23,36 @@ do { \
#define pr_info(fmt, ...) __pr(LIBBPF_INFO, fmt, ##__VA_ARGS__)
#define pr_debug(fmt, ...) __pr(LIBBPF_DEBUG, fmt, ##__VA_ARGS__)
+/* Use these barrier functions instead of smp_[rw]mb() when they are
+ * used in a libbpf header file. That way they can be built into the
+ * application that uses libbpf.
+ */
+#if defined(__i386__) || defined(__x86_64__)
+# define libbpf_smp_rmb() asm volatile("" : : : "memory")
+# define libbpf_smp_wmb() asm volatile("" : : : "memory")
+# define libbpf_smp_mb() \
+ asm volatile("lock; addl $0,-4(%%rsp)" : : : "memory", "cc")
+/* Hinders stores to be observed before older loads. */
+# define libbpf_smp_rwmb() asm volatile("" : : : "memory")
+#elif defined(__aarch64__)
+# define libbpf_smp_rmb() asm volatile("dmb ishld" : : : "memory")
+# define libbpf_smp_wmb() asm volatile("dmb ishst" : : : "memory")
+# define libbpf_smp_mb() asm volatile("dmb ish" : : : "memory")
+# define libbpf_smp_rwmb() libbpf_smp_mb()
+#elif defined(__arm__)
+/* These are only valid for armv7 and above */
+# define libbpf_smp_rmb() asm volatile("dmb ish" : : : "memory")
+# define libbpf_smp_wmb() asm volatile("dmb ishst" : : : "memory")
+# define libbpf_smp_mb() asm volatile("dmb ish" : : : "memory")
+# define libbpf_smp_rwmb() libbpf_smp_mb()
+#else
+# warning Architecture missing native barrier functions in libbpf_util.h.
+# define libbpf_smp_rmb() __sync_synchronize()
+# define libbpf_smp_wmb() __sync_synchronize()
+# define libbpf_smp_mb() __sync_synchronize()
+# define libbpf_smp_rwmb() __sync_synchronize()
+#endif
+
#ifdef __cplusplus
} /* extern "C" */
#endif
diff --git a/tools/lib/bpf/xsk.h b/tools/lib/bpf/xsk.h
index a497f00e2962..82ea71a0f3ec 100644
--- a/tools/lib/bpf/xsk.h
+++ b/tools/lib/bpf/xsk.h
@@ -16,6 +16,7 @@
#include <linux/if_xdp.h>
#include "libbpf.h"
+#include "libbpf_util.h"
#ifdef __cplusplus
extern "C" {
@@ -36,6 +37,10 @@ struct name { \
DEFINE_XSK_RING(xsk_ring_prod);
DEFINE_XSK_RING(xsk_ring_cons);
+/* For a detailed explanation on the memory barriers associated with the
+ * ring, please take a look at net/xdp/xsk_queue.h.
+ */
+
struct xsk_umem;
struct xsk_socket;
@@ -105,7 +110,7 @@ static inline __u32 xsk_cons_nb_avail(struct xsk_ring_cons *r, __u32 nb)
static inline size_t xsk_ring_prod__reserve(struct xsk_ring_prod *prod,
size_t nb, __u32 *idx)
{
- if (unlikely(xsk_prod_nb_free(prod, nb) < nb))
+ if (xsk_prod_nb_free(prod, nb) < nb)
return 0;
*idx = prod->cached_prod;
@@ -116,10 +121,10 @@ static inline size_t xsk_ring_prod__reserve(struct xsk_ring_prod *prod,
static inline void xsk_ring_prod__submit(struct xsk_ring_prod *prod, size_t nb)
{
- /* Make sure everything has been written to the ring before signalling
- * this to the kernel.
+ /* Make sure everything has been written to the ring before indicating
+ * this to the kernel by writing the producer pointer.
*/
- smp_wmb();
+ libbpf_smp_wmb();
*prod->producer += nb;
}
@@ -129,11 +134,11 @@ static inline size_t xsk_ring_cons__peek(struct xsk_ring_cons *cons,
{
size_t entries = xsk_cons_nb_avail(cons, nb);
- if (likely(entries > 0)) {
+ if (entries > 0) {
/* Make sure we do not speculatively read the data before
* we have received the packet buffers from the ring.
*/
- smp_rmb();
+ libbpf_smp_rmb();
*idx = cons->cached_cons;
cons->cached_cons += entries;
@@ -144,6 +149,11 @@ static inline size_t xsk_ring_cons__peek(struct xsk_ring_cons *cons,
static inline void xsk_ring_cons__release(struct xsk_ring_cons *cons, size_t nb)
{
+ /* Make sure data has been read before indicating we are done
+ * with the entries by updating the consumer pointer.
+ */
+ libbpf_smp_rwmb();
+
*cons->consumer += nb;
}