diff options
Diffstat (limited to 'kernel/bpf/ringbuf.c')
| -rw-r--r-- | kernel/bpf/ringbuf.c | 253 | 
1 files changed, 237 insertions, 16 deletions
diff --git a/kernel/bpf/ringbuf.c b/kernel/bpf/ringbuf.c index ded4faeca192..9e832acf4692 100644 --- a/kernel/bpf/ringbuf.c +++ b/kernel/bpf/ringbuf.c @@ -38,10 +38,43 @@ struct bpf_ringbuf {  	struct page **pages;  	int nr_pages;  	spinlock_t spinlock ____cacheline_aligned_in_smp; -	/* Consumer and producer counters are put into separate pages to allow -	 * mapping consumer page as r/w, but restrict producer page to r/o. -	 * This protects producer position from being modified by user-space -	 * application and ruining in-kernel position tracking. +	/* For user-space producer ring buffers, an atomic_t busy bit is used +	 * to synchronize access to the ring buffers in the kernel, rather than +	 * the spinlock that is used for kernel-producer ring buffers. This is +	 * done because the ring buffer must hold a lock across a BPF program's +	 * callback: +	 * +	 *    __bpf_user_ringbuf_peek() // lock acquired +	 * -> program callback_fn() +	 * -> __bpf_user_ringbuf_sample_release() // lock released +	 * +	 * It is unsafe and incorrect to hold an IRQ spinlock across what could +	 * be a long execution window, so we instead simply disallow concurrent +	 * access to the ring buffer by kernel consumers, and return -EBUSY from +	 * __bpf_user_ringbuf_peek() if the busy bit is held by another task. +	 */ +	atomic_t busy ____cacheline_aligned_in_smp; +	/* Consumer and producer counters are put into separate pages to +	 * allow each position to be mapped with different permissions. +	 * This prevents a user-space application from modifying the +	 * position and ruining in-kernel tracking. The permissions of the +	 * pages depend on who is producing samples: user-space or the +	 * kernel. +	 * +	 * Kernel-producer +	 * --------------- +	 * The producer position and data pages are mapped as r/o in +	 * userspace. For this approach, bits in the header of samples are +	 * used to signal to user-space, and to other producers, whether a +	 * sample is currently being written. +	 * +	 * User-space producer +	 * ------------------- +	 * Only the page containing the consumer position is mapped r/o in +	 * user-space. User-space producers also use bits of the header to +	 * communicate to the kernel, but the kernel must carefully check and +	 * validate each sample to ensure that they're correctly formatted, and +	 * fully contained within the ring buffer.  	 */  	unsigned long consumer_pos __aligned(PAGE_SIZE);  	unsigned long producer_pos __aligned(PAGE_SIZE); @@ -116,7 +149,7 @@ static struct bpf_ringbuf *bpf_ringbuf_area_alloc(size_t data_sz, int numa_node)  err_free_pages:  	for (i = 0; i < nr_pages; i++)  		__free_page(pages[i]); -	kvfree(pages); +	bpf_map_area_free(pages);  	return NULL;  } @@ -136,6 +169,7 @@ static struct bpf_ringbuf *bpf_ringbuf_alloc(size_t data_sz, int numa_node)  		return NULL;  	spin_lock_init(&rb->spinlock); +	atomic_set(&rb->busy, 0);  	init_waitqueue_head(&rb->waitq);  	init_irq_work(&rb->work, bpf_ringbuf_notify); @@ -164,7 +198,7 @@ static struct bpf_map *ringbuf_map_alloc(union bpf_attr *attr)  		return ERR_PTR(-E2BIG);  #endif -	rb_map = kzalloc(sizeof(*rb_map), GFP_USER | __GFP_ACCOUNT); +	rb_map = bpf_map_area_alloc(sizeof(*rb_map), NUMA_NO_NODE);  	if (!rb_map)  		return ERR_PTR(-ENOMEM); @@ -172,7 +206,7 @@ static struct bpf_map *ringbuf_map_alloc(union bpf_attr *attr)  	rb_map->rb = bpf_ringbuf_alloc(attr->max_entries, rb_map->map.numa_node);  	if (!rb_map->rb) { -		kfree(rb_map); +		bpf_map_area_free(rb_map);  		return ERR_PTR(-ENOMEM);  	} @@ -190,7 +224,7 @@ static void bpf_ringbuf_free(struct bpf_ringbuf *rb)  	vunmap(rb);  	for (i = 0; i < nr_pages; i++)  		__free_page(pages[i]); -	kvfree(pages); +	bpf_map_area_free(pages);  }  static void ringbuf_map_free(struct bpf_map *map) @@ -199,7 +233,7 @@ static void ringbuf_map_free(struct bpf_map *map)  	rb_map = container_of(map, struct bpf_ringbuf_map, map);  	bpf_ringbuf_free(rb_map->rb); -	kfree(rb_map); +	bpf_map_area_free(rb_map);  }  static void *ringbuf_map_lookup_elem(struct bpf_map *map, void *key) @@ -224,7 +258,7 @@ static int ringbuf_map_get_next_key(struct bpf_map *map, void *key,  	return -ENOTSUPP;  } -static int ringbuf_map_mmap(struct bpf_map *map, struct vm_area_struct *vma) +static int ringbuf_map_mmap_kern(struct bpf_map *map, struct vm_area_struct *vma)  {  	struct bpf_ringbuf_map *rb_map; @@ -242,6 +276,26 @@ static int ringbuf_map_mmap(struct bpf_map *map, struct vm_area_struct *vma)  				   vma->vm_pgoff + RINGBUF_PGOFF);  } +static int ringbuf_map_mmap_user(struct bpf_map *map, struct vm_area_struct *vma) +{ +	struct bpf_ringbuf_map *rb_map; + +	rb_map = container_of(map, struct bpf_ringbuf_map, map); + +	if (vma->vm_flags & VM_WRITE) { +		if (vma->vm_pgoff == 0) +			/* Disallow writable mappings to the consumer pointer, +			 * and allow writable mappings to both the producer +			 * position, and the ring buffer data itself. +			 */ +			return -EPERM; +	} else { +		vma->vm_flags &= ~VM_MAYWRITE; +	} +	/* remap_vmalloc_range() checks size and offset constraints */ +	return remap_vmalloc_range(vma, rb_map->rb, vma->vm_pgoff + RINGBUF_PGOFF); +} +  static unsigned long ringbuf_avail_data_sz(struct bpf_ringbuf *rb)  {  	unsigned long cons_pos, prod_pos; @@ -251,8 +305,13 @@ static unsigned long ringbuf_avail_data_sz(struct bpf_ringbuf *rb)  	return prod_pos - cons_pos;  } -static __poll_t ringbuf_map_poll(struct bpf_map *map, struct file *filp, -				 struct poll_table_struct *pts) +static u32 ringbuf_total_data_sz(const struct bpf_ringbuf *rb) +{ +	return rb->mask + 1; +} + +static __poll_t ringbuf_map_poll_kern(struct bpf_map *map, struct file *filp, +				      struct poll_table_struct *pts)  {  	struct bpf_ringbuf_map *rb_map; @@ -264,13 +323,26 @@ static __poll_t ringbuf_map_poll(struct bpf_map *map, struct file *filp,  	return 0;  } +static __poll_t ringbuf_map_poll_user(struct bpf_map *map, struct file *filp, +				      struct poll_table_struct *pts) +{ +	struct bpf_ringbuf_map *rb_map; + +	rb_map = container_of(map, struct bpf_ringbuf_map, map); +	poll_wait(filp, &rb_map->rb->waitq, pts); + +	if (ringbuf_avail_data_sz(rb_map->rb) < ringbuf_total_data_sz(rb_map->rb)) +		return EPOLLOUT | EPOLLWRNORM; +	return 0; +} +  BTF_ID_LIST_SINGLE(ringbuf_map_btf_ids, struct, bpf_ringbuf_map)  const struct bpf_map_ops ringbuf_map_ops = {  	.map_meta_equal = bpf_map_meta_equal,  	.map_alloc = ringbuf_map_alloc,  	.map_free = ringbuf_map_free, -	.map_mmap = ringbuf_map_mmap, -	.map_poll = ringbuf_map_poll, +	.map_mmap = ringbuf_map_mmap_kern, +	.map_poll = ringbuf_map_poll_kern,  	.map_lookup_elem = ringbuf_map_lookup_elem,  	.map_update_elem = ringbuf_map_update_elem,  	.map_delete_elem = ringbuf_map_delete_elem, @@ -278,6 +350,20 @@ const struct bpf_map_ops ringbuf_map_ops = {  	.map_btf_id = &ringbuf_map_btf_ids[0],  }; +BTF_ID_LIST_SINGLE(user_ringbuf_map_btf_ids, struct, bpf_ringbuf_map) +const struct bpf_map_ops user_ringbuf_map_ops = { +	.map_meta_equal = bpf_map_meta_equal, +	.map_alloc = ringbuf_map_alloc, +	.map_free = ringbuf_map_free, +	.map_mmap = ringbuf_map_mmap_user, +	.map_poll = ringbuf_map_poll_user, +	.map_lookup_elem = ringbuf_map_lookup_elem, +	.map_update_elem = ringbuf_map_update_elem, +	.map_delete_elem = ringbuf_map_delete_elem, +	.map_get_next_key = ringbuf_map_get_next_key, +	.map_btf_id = &user_ringbuf_map_btf_ids[0], +}; +  /* Given pointer to ring buffer record metadata and struct bpf_ringbuf itself,   * calculate offset from record metadata to ring buffer in pages, rounded   * down. This page offset is stored as part of record metadata and allows to @@ -312,7 +398,7 @@ static void *__bpf_ringbuf_reserve(struct bpf_ringbuf *rb, u64 size)  		return NULL;  	len = round_up(size + BPF_RINGBUF_HDR_SZ, 8); -	if (len > rb->mask + 1) +	if (len > ringbuf_total_data_sz(rb))  		return NULL;  	cons_pos = smp_load_acquire(&rb->consumer_pos); @@ -459,7 +545,7 @@ BPF_CALL_2(bpf_ringbuf_query, struct bpf_map *, map, u64, flags)  	case BPF_RB_AVAIL_DATA:  		return ringbuf_avail_data_sz(rb);  	case BPF_RB_RING_SIZE: -		return rb->mask + 1; +		return ringbuf_total_data_sz(rb);  	case BPF_RB_CONS_POS:  		return smp_load_acquire(&rb->consumer_pos);  	case BPF_RB_PROD_POS: @@ -553,3 +639,138 @@ const struct bpf_func_proto bpf_ringbuf_discard_dynptr_proto = {  	.arg1_type	= ARG_PTR_TO_DYNPTR | DYNPTR_TYPE_RINGBUF | OBJ_RELEASE,  	.arg2_type	= ARG_ANYTHING,  }; + +static int __bpf_user_ringbuf_peek(struct bpf_ringbuf *rb, void **sample, u32 *size) +{ +	int err; +	u32 hdr_len, sample_len, total_len, flags, *hdr; +	u64 cons_pos, prod_pos; + +	/* Synchronizes with smp_store_release() in user-space producer. */ +	prod_pos = smp_load_acquire(&rb->producer_pos); +	if (prod_pos % 8) +		return -EINVAL; + +	/* Synchronizes with smp_store_release() in __bpf_user_ringbuf_sample_release() */ +	cons_pos = smp_load_acquire(&rb->consumer_pos); +	if (cons_pos >= prod_pos) +		return -ENODATA; + +	hdr = (u32 *)((uintptr_t)rb->data + (uintptr_t)(cons_pos & rb->mask)); +	/* Synchronizes with smp_store_release() in user-space producer. */ +	hdr_len = smp_load_acquire(hdr); +	flags = hdr_len & (BPF_RINGBUF_BUSY_BIT | BPF_RINGBUF_DISCARD_BIT); +	sample_len = hdr_len & ~flags; +	total_len = round_up(sample_len + BPF_RINGBUF_HDR_SZ, 8); + +	/* The sample must fit within the region advertised by the producer position. */ +	if (total_len > prod_pos - cons_pos) +		return -EINVAL; + +	/* The sample must fit within the data region of the ring buffer. */ +	if (total_len > ringbuf_total_data_sz(rb)) +		return -E2BIG; + +	/* The sample must fit into a struct bpf_dynptr. */ +	err = bpf_dynptr_check_size(sample_len); +	if (err) +		return -E2BIG; + +	if (flags & BPF_RINGBUF_DISCARD_BIT) { +		/* If the discard bit is set, the sample should be skipped. +		 * +		 * Update the consumer pos, and return -EAGAIN so the caller +		 * knows to skip this sample and try to read the next one. +		 */ +		smp_store_release(&rb->consumer_pos, cons_pos + total_len); +		return -EAGAIN; +	} + +	if (flags & BPF_RINGBUF_BUSY_BIT) +		return -ENODATA; + +	*sample = (void *)((uintptr_t)rb->data + +			   (uintptr_t)((cons_pos + BPF_RINGBUF_HDR_SZ) & rb->mask)); +	*size = sample_len; +	return 0; +} + +static void __bpf_user_ringbuf_sample_release(struct bpf_ringbuf *rb, size_t size, u64 flags) +{ +	u64 consumer_pos; +	u32 rounded_size = round_up(size + BPF_RINGBUF_HDR_SZ, 8); + +	/* Using smp_load_acquire() is unnecessary here, as the busy-bit +	 * prevents another task from writing to consumer_pos after it was read +	 * by this task with smp_load_acquire() in __bpf_user_ringbuf_peek(). +	 */ +	consumer_pos = rb->consumer_pos; +	 /* Synchronizes with smp_load_acquire() in user-space producer. */ +	smp_store_release(&rb->consumer_pos, consumer_pos + rounded_size); +} + +BPF_CALL_4(bpf_user_ringbuf_drain, struct bpf_map *, map, +	   void *, callback_fn, void *, callback_ctx, u64, flags) +{ +	struct bpf_ringbuf *rb; +	long samples, discarded_samples = 0, ret = 0; +	bpf_callback_t callback = (bpf_callback_t)callback_fn; +	u64 wakeup_flags = BPF_RB_NO_WAKEUP | BPF_RB_FORCE_WAKEUP; +	int busy = 0; + +	if (unlikely(flags & ~wakeup_flags)) +		return -EINVAL; + +	rb = container_of(map, struct bpf_ringbuf_map, map)->rb; + +	/* If another consumer is already consuming a sample, wait for them to finish. */ +	if (!atomic_try_cmpxchg(&rb->busy, &busy, 1)) +		return -EBUSY; + +	for (samples = 0; samples < BPF_MAX_USER_RINGBUF_SAMPLES && ret == 0; samples++) { +		int err; +		u32 size; +		void *sample; +		struct bpf_dynptr_kern dynptr; + +		err = __bpf_user_ringbuf_peek(rb, &sample, &size); +		if (err) { +			if (err == -ENODATA) { +				break; +			} else if (err == -EAGAIN) { +				discarded_samples++; +				continue; +			} else { +				ret = err; +				goto schedule_work_return; +			} +		} + +		bpf_dynptr_init(&dynptr, sample, BPF_DYNPTR_TYPE_LOCAL, 0, size); +		ret = callback((uintptr_t)&dynptr, (uintptr_t)callback_ctx, 0, 0, 0); +		__bpf_user_ringbuf_sample_release(rb, size, flags); +	} +	ret = samples - discarded_samples; + +schedule_work_return: +	/* Prevent the clearing of the busy-bit from being reordered before the +	 * storing of any rb consumer or producer positions. +	 */ +	smp_mb__before_atomic(); +	atomic_set(&rb->busy, 0); + +	if (flags & BPF_RB_FORCE_WAKEUP) +		irq_work_queue(&rb->work); +	else if (!(flags & BPF_RB_NO_WAKEUP) && samples > 0) +		irq_work_queue(&rb->work); +	return ret; +} + +const struct bpf_func_proto bpf_user_ringbuf_drain_proto = { +	.func		= bpf_user_ringbuf_drain, +	.ret_type	= RET_INTEGER, +	.arg1_type	= ARG_CONST_MAP_PTR, +	.arg2_type	= ARG_PTR_TO_FUNC, +	.arg3_type	= ARG_PTR_TO_STACK_OR_NULL, +	.arg4_type	= ARG_ANYTHING, +};  |