diff options
Diffstat (limited to 'net/strparser/strparser.c')
| -rw-r--r-- | net/strparser/strparser.c | 316 | 
1 files changed, 187 insertions, 129 deletions
| diff --git a/net/strparser/strparser.c b/net/strparser/strparser.c index b5c279b22680..c5fda15ba319 100644 --- a/net/strparser/strparser.c +++ b/net/strparser/strparser.c @@ -29,44 +29,46 @@  static struct workqueue_struct *strp_wq; -struct _strp_rx_msg { -	/* Internal cb structure. struct strp_rx_msg must be first for passing +struct _strp_msg { +	/* Internal cb structure. struct strp_msg must be first for passing  	 * to upper layer.  	 */ -	struct strp_rx_msg strp; +	struct strp_msg strp;  	int accum_len;  	int early_eaten;  }; -static inline struct _strp_rx_msg *_strp_rx_msg(struct sk_buff *skb) +static inline struct _strp_msg *_strp_msg(struct sk_buff *skb)  { -	return (struct _strp_rx_msg *)((void *)skb->cb + +	return (struct _strp_msg *)((void *)skb->cb +  		offsetof(struct qdisc_skb_cb, data));  }  /* Lower lock held */ -static void strp_abort_rx_strp(struct strparser *strp, int err) +static void strp_abort_strp(struct strparser *strp, int err)  { -	struct sock *csk = strp->sk; -  	/* Unrecoverable error in receive */ -	del_timer(&strp->rx_msg_timer); +	cancel_delayed_work(&strp->msg_timer_work); -	if (strp->rx_stopped) +	if (strp->stopped)  		return; -	strp->rx_stopped = 1; +	strp->stopped = 1; + +	if (strp->sk) { +		struct sock *sk = strp->sk; -	/* Report an error on the lower socket */ -	csk->sk_err = err; -	csk->sk_error_report(csk); +		/* Report an error on the lower socket */ +		sk->sk_err = err; +		sk->sk_error_report(sk); +	}  } -static void strp_start_rx_timer(struct strparser *strp) +static void strp_start_timer(struct strparser *strp, long timeo)  { -	if (strp->sk->sk_rcvtimeo) -		mod_timer(&strp->rx_msg_timer, strp->sk->sk_rcvtimeo); +	if (timeo) +		mod_delayed_work(strp_wq, &strp->msg_timer_work, timeo);  }  /* Lower lock held */ @@ -74,46 +76,55 @@ static void strp_parser_err(struct strparser *strp, int err,  			    read_descriptor_t *desc)  {  	desc->error = err; -	kfree_skb(strp->rx_skb_head); -	strp->rx_skb_head = NULL; +	kfree_skb(strp->skb_head); +	strp->skb_head = NULL;  	strp->cb.abort_parser(strp, err);  }  static inline int strp_peek_len(struct strparser *strp)  { -	struct socket *sock = strp->sk->sk_socket; +	if (strp->sk) { +		struct socket *sock = strp->sk->sk_socket; + +		return sock->ops->peek_len(sock); +	} + +	/* If we don't have an associated socket there's nothing to peek. +	 * Return int max to avoid stopping the strparser. +	 */ -	return sock->ops->peek_len(sock); +	return INT_MAX;  }  /* Lower socket lock held */ -static int strp_recv(read_descriptor_t *desc, struct sk_buff *orig_skb, -		     unsigned int orig_offset, size_t orig_len) +static int __strp_recv(read_descriptor_t *desc, struct sk_buff *orig_skb, +		       unsigned int orig_offset, size_t orig_len, +		       size_t max_msg_size, long timeo)  {  	struct strparser *strp = (struct strparser *)desc->arg.data; -	struct _strp_rx_msg *rxm; +	struct _strp_msg *stm;  	struct sk_buff *head, *skb;  	size_t eaten = 0, cand_len;  	ssize_t extra;  	int err;  	bool cloned_orig = false; -	if (strp->rx_paused) +	if (strp->paused)  		return 0; -	head = strp->rx_skb_head; +	head = strp->skb_head;  	if (head) {  		/* Message already in progress */ -		rxm = _strp_rx_msg(head); -		if (unlikely(rxm->early_eaten)) { +		stm = _strp_msg(head); +		if (unlikely(stm->early_eaten)) {  			/* Already some number of bytes on the receive sock -			 * data saved in rx_skb_head, just indicate they +			 * data saved in skb_head, just indicate they  			 * are consumed.  			 */ -			eaten = orig_len <= rxm->early_eaten ? -				orig_len : rxm->early_eaten; -			rxm->early_eaten -= eaten; +			eaten = orig_len <= stm->early_eaten ? +				orig_len : stm->early_eaten; +			stm->early_eaten -= eaten;  			return eaten;  		} @@ -126,12 +137,12 @@ static int strp_recv(read_descriptor_t *desc, struct sk_buff *orig_skb,  			 */  			orig_skb = skb_clone(orig_skb, GFP_ATOMIC);  			if (!orig_skb) { -				STRP_STATS_INCR(strp->stats.rx_mem_fail); +				STRP_STATS_INCR(strp->stats.mem_fail);  				desc->error = -ENOMEM;  				return 0;  			}  			if (!pskb_pull(orig_skb, orig_offset)) { -				STRP_STATS_INCR(strp->stats.rx_mem_fail); +				STRP_STATS_INCR(strp->stats.mem_fail);  				kfree_skb(orig_skb);  				desc->error = -ENOMEM;  				return 0; @@ -140,13 +151,13 @@ static int strp_recv(read_descriptor_t *desc, struct sk_buff *orig_skb,  			orig_offset = 0;  		} -		if (!strp->rx_skb_nextp) { +		if (!strp->skb_nextp) {  			/* We are going to append to the frags_list of head.  			 * Need to unshare the frag_list.  			 */  			err = skb_unclone(head, GFP_ATOMIC);  			if (err) { -				STRP_STATS_INCR(strp->stats.rx_mem_fail); +				STRP_STATS_INCR(strp->stats.mem_fail);  				desc->error = err;  				return 0;  			} @@ -165,20 +176,20 @@ static int strp_recv(read_descriptor_t *desc, struct sk_buff *orig_skb,  				skb = alloc_skb(0, GFP_ATOMIC);  				if (!skb) { -					STRP_STATS_INCR(strp->stats.rx_mem_fail); +					STRP_STATS_INCR(strp->stats.mem_fail);  					desc->error = -ENOMEM;  					return 0;  				}  				skb->len = head->len;  				skb->data_len = head->len;  				skb->truesize = head->truesize; -				*_strp_rx_msg(skb) = *_strp_rx_msg(head); -				strp->rx_skb_nextp = &head->next; +				*_strp_msg(skb) = *_strp_msg(head); +				strp->skb_nextp = &head->next;  				skb_shinfo(skb)->frag_list = head; -				strp->rx_skb_head = skb; +				strp->skb_head = skb;  				head = skb;  			} else { -				strp->rx_skb_nextp = +				strp->skb_nextp =  				    &skb_shinfo(head)->frag_list;  			}  		} @@ -188,112 +199,112 @@ static int strp_recv(read_descriptor_t *desc, struct sk_buff *orig_skb,  		/* Always clone since we will consume something */  		skb = skb_clone(orig_skb, GFP_ATOMIC);  		if (!skb) { -			STRP_STATS_INCR(strp->stats.rx_mem_fail); +			STRP_STATS_INCR(strp->stats.mem_fail);  			desc->error = -ENOMEM;  			break;  		}  		cand_len = orig_len - eaten; -		head = strp->rx_skb_head; +		head = strp->skb_head;  		if (!head) {  			head = skb; -			strp->rx_skb_head = head; -			/* Will set rx_skb_nextp on next packet if needed */ -			strp->rx_skb_nextp = NULL; -			rxm = _strp_rx_msg(head); -			memset(rxm, 0, sizeof(*rxm)); -			rxm->strp.offset = orig_offset + eaten; +			strp->skb_head = head; +			/* Will set skb_nextp on next packet if needed */ +			strp->skb_nextp = NULL; +			stm = _strp_msg(head); +			memset(stm, 0, sizeof(*stm)); +			stm->strp.offset = orig_offset + eaten;  		} else {  			/* Unclone since we may be appending to an skb that we  			 * already share a frag_list with.  			 */  			err = skb_unclone(skb, GFP_ATOMIC);  			if (err) { -				STRP_STATS_INCR(strp->stats.rx_mem_fail); +				STRP_STATS_INCR(strp->stats.mem_fail);  				desc->error = err;  				break;  			} -			rxm = _strp_rx_msg(head); -			*strp->rx_skb_nextp = skb; -			strp->rx_skb_nextp = &skb->next; +			stm = _strp_msg(head); +			*strp->skb_nextp = skb; +			strp->skb_nextp = &skb->next;  			head->data_len += skb->len;  			head->len += skb->len;  			head->truesize += skb->truesize;  		} -		if (!rxm->strp.full_len) { +		if (!stm->strp.full_len) {  			ssize_t len;  			len = (*strp->cb.parse_msg)(strp, head);  			if (!len) {  				/* Need more header to determine length */ -				if (!rxm->accum_len) { +				if (!stm->accum_len) {  					/* Start RX timer for new message */ -					strp_start_rx_timer(strp); +					strp_start_timer(strp, timeo);  				} -				rxm->accum_len += cand_len; +				stm->accum_len += cand_len;  				eaten += cand_len; -				STRP_STATS_INCR(strp->stats.rx_need_more_hdr); +				STRP_STATS_INCR(strp->stats.need_more_hdr);  				WARN_ON(eaten != orig_len);  				break;  			} else if (len < 0) { -				if (len == -ESTRPIPE && rxm->accum_len) { +				if (len == -ESTRPIPE && stm->accum_len) {  					len = -ENODATA; -					strp->rx_unrecov_intr = 1; +					strp->unrecov_intr = 1;  				} else { -					strp->rx_interrupted = 1; +					strp->interrupted = 1;  				}  				strp_parser_err(strp, len, desc);  				break; -			} else if (len > strp->sk->sk_rcvbuf) { +			} else if (len > max_msg_size) {  				/* Message length exceeds maximum allowed */ -				STRP_STATS_INCR(strp->stats.rx_msg_too_big); +				STRP_STATS_INCR(strp->stats.msg_too_big);  				strp_parser_err(strp, -EMSGSIZE, desc);  				break;  			} else if (len <= (ssize_t)head->len - -					  skb->len - rxm->strp.offset) { +					  skb->len - stm->strp.offset) {  				/* Length must be into new skb (and also  				 * greater than zero)  				 */ -				STRP_STATS_INCR(strp->stats.rx_bad_hdr_len); +				STRP_STATS_INCR(strp->stats.bad_hdr_len);  				strp_parser_err(strp, -EPROTO, desc);  				break;  			} -			rxm->strp.full_len = len; +			stm->strp.full_len = len;  		} -		extra = (ssize_t)(rxm->accum_len + cand_len) - -			rxm->strp.full_len; +		extra = (ssize_t)(stm->accum_len + cand_len) - +			stm->strp.full_len;  		if (extra < 0) {  			/* Message not complete yet. */ -			if (rxm->strp.full_len - rxm->accum_len > +			if (stm->strp.full_len - stm->accum_len >  			    strp_peek_len(strp)) { -				/* Don't have the whole messages in the socket -				 * buffer. Set strp->rx_need_bytes to wait for +				/* Don't have the whole message in the socket +				 * buffer. Set strp->need_bytes to wait for  				 * the rest of the message. Also, set "early  				 * eaten" since we've already buffered the skb  				 * but don't consume yet per strp_read_sock.  				 */ -				if (!rxm->accum_len) { +				if (!stm->accum_len) {  					/* Start RX timer for new message */ -					strp_start_rx_timer(strp); +					strp_start_timer(strp, timeo);  				} -				strp->rx_need_bytes = rxm->strp.full_len - -						       rxm->accum_len; -				rxm->accum_len += cand_len; -				rxm->early_eaten = cand_len; -				STRP_STATS_ADD(strp->stats.rx_bytes, cand_len); +				strp->need_bytes = stm->strp.full_len - +						       stm->accum_len; +				stm->accum_len += cand_len; +				stm->early_eaten = cand_len; +				STRP_STATS_ADD(strp->stats.bytes, cand_len);  				desc->count = 0; /* Stop reading socket */  				break;  			} -			rxm->accum_len += cand_len; +			stm->accum_len += cand_len;  			eaten += cand_len;  			WARN_ON(eaten != orig_len);  			break; @@ -308,14 +319,14 @@ static int strp_recv(read_descriptor_t *desc, struct sk_buff *orig_skb,  		eaten += (cand_len - extra);  		/* Hurray, we have a new message! */ -		del_timer(&strp->rx_msg_timer); -		strp->rx_skb_head = NULL; -		STRP_STATS_INCR(strp->stats.rx_msgs); +		cancel_delayed_work(&strp->msg_timer_work); +		strp->skb_head = NULL; +		STRP_STATS_INCR(strp->stats.msgs);  		/* Give skb to upper layer */  		strp->cb.rcv_msg(strp, head); -		if (unlikely(strp->rx_paused)) { +		if (unlikely(strp->paused)) {  			/* Upper layer paused strp */  			break;  		} @@ -324,11 +335,33 @@ static int strp_recv(read_descriptor_t *desc, struct sk_buff *orig_skb,  	if (cloned_orig)  		kfree_skb(orig_skb); -	STRP_STATS_ADD(strp->stats.rx_bytes, eaten); +	STRP_STATS_ADD(strp->stats.bytes, eaten);  	return eaten;  } +int strp_process(struct strparser *strp, struct sk_buff *orig_skb, +		 unsigned int orig_offset, size_t orig_len, +		 size_t max_msg_size, long timeo) +{ +	read_descriptor_t desc; /* Dummy arg to strp_recv */ + +	desc.arg.data = strp; + +	return __strp_recv(&desc, orig_skb, orig_offset, orig_len, +			   max_msg_size, timeo); +} +EXPORT_SYMBOL_GPL(strp_process); + +static int strp_recv(read_descriptor_t *desc, struct sk_buff *orig_skb, +		     unsigned int orig_offset, size_t orig_len) +{ +	struct strparser *strp = (struct strparser *)desc->arg.data; + +	return __strp_recv(desc, orig_skb, orig_offset, orig_len, +			   strp->sk->sk_rcvbuf, strp->sk->sk_rcvtimeo); +} +  static int default_read_sock_done(struct strparser *strp, int err)  {  	return err; @@ -340,6 +373,9 @@ static int strp_read_sock(struct strparser *strp)  	struct socket *sock = strp->sk->sk_socket;  	read_descriptor_t desc; +	if (unlikely(!sock || !sock->ops || !sock->ops->read_sock)) +		return -EBUSY; +  	desc.arg.data = strp;  	desc.error = 0;  	desc.count = 1; /* give more than one skb per call */ @@ -355,101 +391,123 @@ static int strp_read_sock(struct strparser *strp)  /* Lower sock lock held */  void strp_data_ready(struct strparser *strp)  { -	if (unlikely(strp->rx_stopped)) +	if (unlikely(strp->stopped))  		return; -	/* This check is needed to synchronize with do_strp_rx_work. -	 * do_strp_rx_work acquires a process lock (lock_sock) whereas +	/* This check is needed to synchronize with do_strp_work. +	 * do_strp_work acquires a process lock (lock_sock) whereas  	 * the lock held here is bh_lock_sock. The two locks can be  	 * held by different threads at the same time, but bh_lock_sock  	 * allows a thread in BH context to safely check if the process  	 * lock is held. In this case, if the lock is held, queue work.  	 */  	if (sock_owned_by_user(strp->sk)) { -		queue_work(strp_wq, &strp->rx_work); +		queue_work(strp_wq, &strp->work);  		return;  	} -	if (strp->rx_paused) +	if (strp->paused)  		return; -	if (strp->rx_need_bytes) { -		if (strp_peek_len(strp) >= strp->rx_need_bytes) -			strp->rx_need_bytes = 0; +	if (strp->need_bytes) { +		if (strp_peek_len(strp) >= strp->need_bytes) +			strp->need_bytes = 0;  		else  			return;  	}  	if (strp_read_sock(strp) == -ENOMEM) -		queue_work(strp_wq, &strp->rx_work); +		queue_work(strp_wq, &strp->work);  }  EXPORT_SYMBOL_GPL(strp_data_ready); -static void do_strp_rx_work(struct strparser *strp) +static void do_strp_work(struct strparser *strp)  {  	read_descriptor_t rd_desc; -	struct sock *csk = strp->sk;  	/* We need the read lock to synchronize with strp_data_ready. We  	 * need the socket lock for calling strp_read_sock.  	 */ -	lock_sock(csk); +	strp->cb.lock(strp); -	if (unlikely(strp->rx_stopped)) +	if (unlikely(strp->stopped))  		goto out; -	if (strp->rx_paused) +	if (strp->paused)  		goto out;  	rd_desc.arg.data = strp;  	if (strp_read_sock(strp) == -ENOMEM) -		queue_work(strp_wq, &strp->rx_work); +		queue_work(strp_wq, &strp->work);  out: -	release_sock(csk); +	strp->cb.unlock(strp);  } -static void strp_rx_work(struct work_struct *w) +static void strp_work(struct work_struct *w)  { -	do_strp_rx_work(container_of(w, struct strparser, rx_work)); +	do_strp_work(container_of(w, struct strparser, work));  } -static void strp_rx_msg_timeout(unsigned long arg) +static void strp_msg_timeout(struct work_struct *w)  { -	struct strparser *strp = (struct strparser *)arg; +	struct strparser *strp = container_of(w, struct strparser, +					      msg_timer_work.work);  	/* Message assembly timed out */ -	STRP_STATS_INCR(strp->stats.rx_msg_timeouts); -	lock_sock(strp->sk); +	STRP_STATS_INCR(strp->stats.msg_timeouts); +	strp->cb.lock(strp);  	strp->cb.abort_parser(strp, ETIMEDOUT); +	strp->cb.unlock(strp); +} + +static void strp_sock_lock(struct strparser *strp) +{ +	lock_sock(strp->sk); +} + +static void strp_sock_unlock(struct strparser *strp) +{  	release_sock(strp->sk);  } -int strp_init(struct strparser *strp, struct sock *csk, -	      struct strp_callbacks *cb) +int strp_init(struct strparser *strp, struct sock *sk, +	      const struct strp_callbacks *cb)  { -	struct socket *sock = csk->sk_socket;  	if (!cb || !cb->rcv_msg || !cb->parse_msg)  		return -EINVAL; -	if (!sock->ops->read_sock || !sock->ops->peek_len) -		return -EAFNOSUPPORT; - -	memset(strp, 0, sizeof(*strp)); +	/* The sk (sock) arg determines the mode of the stream parser. +	 * +	 * If the sock is set then the strparser is in receive callback mode. +	 * The upper layer calls strp_data_ready to kick receive processing +	 * and strparser calls the read_sock function on the socket to +	 * get packets. +	 * +	 * If the sock is not set then the strparser is in general mode. +	 * The upper layer calls strp_process for each skb to be parsed. +	 */ -	strp->sk = csk; +	if (!sk) { +		if (!cb->lock || !cb->unlock) +			return -EINVAL; +	} -	setup_timer(&strp->rx_msg_timer, strp_rx_msg_timeout, -		    (unsigned long)strp); +	memset(strp, 0, sizeof(*strp)); -	INIT_WORK(&strp->rx_work, strp_rx_work); +	strp->sk = sk; +	strp->cb.lock = cb->lock ? : strp_sock_lock; +	strp->cb.unlock = cb->unlock ? : strp_sock_unlock;  	strp->cb.rcv_msg = cb->rcv_msg;  	strp->cb.parse_msg = cb->parse_msg;  	strp->cb.read_sock_done = cb->read_sock_done ? : default_read_sock_done; -	strp->cb.abort_parser = cb->abort_parser ? : strp_abort_rx_strp; +	strp->cb.abort_parser = cb->abort_parser ? : strp_abort_strp; + +	INIT_DELAYED_WORK(&strp->msg_timer_work, strp_msg_timeout); +	INIT_WORK(&strp->work, strp_work);  	return 0;  } @@ -457,12 +515,12 @@ EXPORT_SYMBOL_GPL(strp_init);  void strp_unpause(struct strparser *strp)  { -	strp->rx_paused = 0; +	strp->paused = 0; -	/* Sync setting rx_paused with RX work */ +	/* Sync setting paused with RX work */  	smp_mb(); -	queue_work(strp_wq, &strp->rx_work); +	queue_work(strp_wq, &strp->work);  }  EXPORT_SYMBOL_GPL(strp_unpause); @@ -471,27 +529,27 @@ EXPORT_SYMBOL_GPL(strp_unpause);   */  void strp_done(struct strparser *strp)  { -	WARN_ON(!strp->rx_stopped); +	WARN_ON(!strp->stopped); -	del_timer_sync(&strp->rx_msg_timer); -	cancel_work_sync(&strp->rx_work); +	cancel_delayed_work_sync(&strp->msg_timer_work); +	cancel_work_sync(&strp->work); -	if (strp->rx_skb_head) { -		kfree_skb(strp->rx_skb_head); -		strp->rx_skb_head = NULL; +	if (strp->skb_head) { +		kfree_skb(strp->skb_head); +		strp->skb_head = NULL;  	}  }  EXPORT_SYMBOL_GPL(strp_done);  void strp_stop(struct strparser *strp)  { -	strp->rx_stopped = 1; +	strp->stopped = 1;  }  EXPORT_SYMBOL_GPL(strp_stop);  void strp_check_rcv(struct strparser *strp)  { -	queue_work(strp_wq, &strp->rx_work); +	queue_work(strp_wq, &strp->work);  }  EXPORT_SYMBOL_GPL(strp_check_rcv); |