diff options
Diffstat (limited to 'net/rxrpc/sendmsg.c')
| -rw-r--r-- | net/rxrpc/sendmsg.c | 232 | 
1 files changed, 159 insertions, 73 deletions
| diff --git a/net/rxrpc/sendmsg.c b/net/rxrpc/sendmsg.c index 9ea6f972767e..09f2a3e05221 100644 --- a/net/rxrpc/sendmsg.c +++ b/net/rxrpc/sendmsg.c @@ -21,21 +21,78 @@  #include <net/af_rxrpc.h>  #include "ar-internal.h" -enum rxrpc_command { -	RXRPC_CMD_SEND_DATA,		/* send data message */ -	RXRPC_CMD_SEND_ABORT,		/* request abort generation */ -	RXRPC_CMD_ACCEPT,		/* [server] accept incoming call */ -	RXRPC_CMD_REJECT_BUSY,		/* [server] reject a call as busy */ -}; - -struct rxrpc_send_params { -	s64			tx_total_len;	/* Total Tx data length (if send data) */ -	unsigned long		user_call_ID;	/* User's call ID */ -	u32			abort_code;	/* Abort code to Tx (if abort) */ -	enum rxrpc_command	command : 8;	/* The command to implement */ -	bool			exclusive;	/* Shared or exclusive call */ -	bool			upgrade;	/* If the connection is upgradeable */ -}; +/* + * Wait for space to appear in the Tx queue or a signal to occur. + */ +static int rxrpc_wait_for_tx_window_intr(struct rxrpc_sock *rx, +					 struct rxrpc_call *call, +					 long *timeo) +{ +	for (;;) { +		set_current_state(TASK_INTERRUPTIBLE); +		if (call->tx_top - call->tx_hard_ack < +		    min_t(unsigned int, call->tx_winsize, +			  call->cong_cwnd + call->cong_extra)) +			return 0; + +		if (call->state >= RXRPC_CALL_COMPLETE) +			return call->error; + +		if (signal_pending(current)) +			return sock_intr_errno(*timeo); + +		trace_rxrpc_transmit(call, rxrpc_transmit_wait); +		mutex_unlock(&call->user_mutex); +		*timeo = schedule_timeout(*timeo); +		if (mutex_lock_interruptible(&call->user_mutex) < 0) +			return sock_intr_errno(*timeo); +	} +} + +/* + * Wait for space to appear in the Tx queue uninterruptibly, but with + * a timeout of 2*RTT if no progress was made and a signal occurred. + */ +static int rxrpc_wait_for_tx_window_nonintr(struct rxrpc_sock *rx, +					    struct rxrpc_call *call) +{ +	rxrpc_seq_t tx_start, tx_win; +	signed long rtt2, timeout; +	u64 rtt; + +	rtt = READ_ONCE(call->peer->rtt); +	rtt2 = nsecs_to_jiffies64(rtt) * 2; +	if (rtt2 < 1) +		rtt2 = 1; + +	timeout = rtt2; +	tx_start = READ_ONCE(call->tx_hard_ack); + +	for (;;) { +		set_current_state(TASK_UNINTERRUPTIBLE); + +		tx_win = READ_ONCE(call->tx_hard_ack); +		if (call->tx_top - tx_win < +		    min_t(unsigned int, call->tx_winsize, +			  call->cong_cwnd + call->cong_extra)) +			return 0; + +		if (call->state >= RXRPC_CALL_COMPLETE) +			return call->error; + +		if (timeout == 0 && +		    tx_win == tx_start && signal_pending(current)) +			return -EINTR; + +		if (tx_win != tx_start) { +			timeout = rtt2; +			tx_start = tx_win; +		} + +		trace_rxrpc_transmit(call, rxrpc_transmit_wait); +		timeout = schedule_timeout(timeout); +	} +}  /*   * wait for space to appear in the transmit/ACK window @@ -43,7 +100,8 @@ struct rxrpc_send_params {   */  static int rxrpc_wait_for_tx_window(struct rxrpc_sock *rx,  				    struct rxrpc_call *call, -				    long *timeo) +				    long *timeo, +				    bool waitall)  {  	DECLARE_WAITQUEUE(myself, current);  	int ret; @@ -53,30 +111,10 @@ static int rxrpc_wait_for_tx_window(struct rxrpc_sock *rx,  	add_wait_queue(&call->waitq, &myself); -	for (;;) { -		set_current_state(TASK_INTERRUPTIBLE); -		ret = 0; -		if (call->tx_top - call->tx_hard_ack < -		    min_t(unsigned int, call->tx_winsize, -			  call->cong_cwnd + call->cong_extra)) -			break; -		if (call->state >= RXRPC_CALL_COMPLETE) { -			ret = call->error; -			break; -		} -		if (signal_pending(current)) { -			ret = sock_intr_errno(*timeo); -			break; -		} - -		trace_rxrpc_transmit(call, rxrpc_transmit_wait); -		mutex_unlock(&call->user_mutex); -		*timeo = schedule_timeout(*timeo); -		if (mutex_lock_interruptible(&call->user_mutex) < 0) { -			ret = sock_intr_errno(*timeo); -			break; -		} -	} +	if (waitall) +		ret = rxrpc_wait_for_tx_window_nonintr(rx, call); +	else +		ret = rxrpc_wait_for_tx_window_intr(rx, call, timeo);  	remove_wait_queue(&call->waitq, &myself);  	set_current_state(TASK_RUNNING); @@ -120,6 +158,7 @@ static void rxrpc_queue_packet(struct rxrpc_sock *rx, struct rxrpc_call *call,  			       rxrpc_notify_end_tx_t notify_end_tx)  {  	struct rxrpc_skb_priv *sp = rxrpc_skb(skb); +	unsigned long now;  	rxrpc_seq_t seq = sp->hdr.seq;  	int ret, ix;  	u8 annotation = RXRPC_TX_ANNO_UNACK; @@ -159,13 +198,14 @@ static void rxrpc_queue_packet(struct rxrpc_sock *rx, struct rxrpc_call *call,  			break;  		case RXRPC_CALL_SERVER_ACK_REQUEST:  			call->state = RXRPC_CALL_SERVER_SEND_REPLY; -			call->ack_at = call->expire_at; +			now = jiffies; +			WRITE_ONCE(call->ack_at, now + MAX_JIFFY_OFFSET);  			if (call->ackr_reason == RXRPC_ACK_DELAY)  				call->ackr_reason = 0; -			__rxrpc_set_timer(call, rxrpc_timer_init_for_send_reply, -					  ktime_get_real()); +			trace_rxrpc_timer(call, rxrpc_timer_init_for_send_reply, now);  			if (!last)  				break; +			/* Fall through */  		case RXRPC_CALL_SERVER_SEND_REPLY:  			call->state = RXRPC_CALL_SERVER_AWAIT_ACK;  			rxrpc_notify_end_tx(rx, call, notify_end_tx); @@ -184,14 +224,19 @@ static void rxrpc_queue_packet(struct rxrpc_sock *rx, struct rxrpc_call *call,  		_debug("need instant resend %d", ret);  		rxrpc_instant_resend(call, ix);  	} else { -		ktime_t now = ktime_get_real(), resend_at; - -		resend_at = ktime_add_ms(now, rxrpc_resend_timeout); - -		if (ktime_before(resend_at, call->resend_at)) { -			call->resend_at = resend_at; -			rxrpc_set_timer(call, rxrpc_timer_set_for_send, now); -		} +		unsigned long now = jiffies, resend_at; + +		if (call->peer->rtt_usage > 1) +			resend_at = nsecs_to_jiffies(call->peer->rtt * 3 / 2); +		else +			resend_at = rxrpc_resend_timeout; +		if (resend_at < 1) +			resend_at = 1; + +		resend_at += now; +		WRITE_ONCE(call->resend_at, resend_at); +		rxrpc_reduce_call_timer(call, resend_at, now, +					rxrpc_timer_set_for_send);  	}  	rxrpc_free_skb(skb, rxrpc_skb_tx_freed); @@ -240,7 +285,7 @@ static int rxrpc_send_data(struct rxrpc_sock *rx,  	do {  		/* Check to see if there's a ping ACK to reply to. */  		if (call->ackr_reason == RXRPC_ACK_PING_RESPONSE) -			rxrpc_send_ack_packet(call, false); +			rxrpc_send_ack_packet(call, false, NULL);  		if (!skb) {  			size_t size, chunk, max, space; @@ -254,7 +299,8 @@ static int rxrpc_send_data(struct rxrpc_sock *rx,  				if (msg->msg_flags & MSG_DONTWAIT)  					goto maybe_error;  				ret = rxrpc_wait_for_tx_window(rx, call, -							       &timeo); +							       &timeo, +							       msg->msg_flags & MSG_WAITALL);  				if (ret < 0)  					goto maybe_error;  			} @@ -424,11 +470,11 @@ static int rxrpc_sendmsg_cmsg(struct msghdr *msg, struct rxrpc_send_params *p)  			if (msg->msg_flags & MSG_CMSG_COMPAT) {  				if (len != sizeof(u32))  					return -EINVAL; -				p->user_call_ID = *(u32 *)CMSG_DATA(cmsg); +				p->call.user_call_ID = *(u32 *)CMSG_DATA(cmsg);  			} else {  				if (len != sizeof(unsigned long))  					return -EINVAL; -				p->user_call_ID = *(unsigned long *) +				p->call.user_call_ID = *(unsigned long *)  					CMSG_DATA(cmsg);  			}  			got_user_ID = true; @@ -466,13 +512,26 @@ static int rxrpc_sendmsg_cmsg(struct msghdr *msg, struct rxrpc_send_params *p)  			break;  		case RXRPC_TX_LENGTH: -			if (p->tx_total_len != -1 || len != sizeof(__s64)) +			if (p->call.tx_total_len != -1 || len != sizeof(__s64))  				return -EINVAL; -			p->tx_total_len = *(__s64 *)CMSG_DATA(cmsg); -			if (p->tx_total_len < 0) +			p->call.tx_total_len = *(__s64 *)CMSG_DATA(cmsg); +			if (p->call.tx_total_len < 0)  				return -EINVAL;  			break; +		case RXRPC_SET_CALL_TIMEOUT: +			if (len & 3 || len < 4 || len > 12) +				return -EINVAL; +			memcpy(&p->call.timeouts, CMSG_DATA(cmsg), len); +			p->call.nr_timeouts = len / 4; +			if (p->call.timeouts.hard > INT_MAX / HZ) +				return -ERANGE; +			if (p->call.nr_timeouts >= 2 && p->call.timeouts.idle > 60 * 60 * 1000) +				return -ERANGE; +			if (p->call.nr_timeouts >= 3 && p->call.timeouts.normal > 60 * 60 * 1000) +				return -ERANGE; +			break; +  		default:  			return -EINVAL;  		} @@ -480,7 +539,7 @@ static int rxrpc_sendmsg_cmsg(struct msghdr *msg, struct rxrpc_send_params *p)  	if (!got_user_ID)  		return -EINVAL; -	if (p->tx_total_len != -1 && p->command != RXRPC_CMD_SEND_DATA) +	if (p->call.tx_total_len != -1 && p->command != RXRPC_CMD_SEND_DATA)  		return -EINVAL;  	_leave(" = 0");  	return 0; @@ -520,8 +579,7 @@ rxrpc_new_client_call_for_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg,  	cp.exclusive		= rx->exclusive | p->exclusive;  	cp.upgrade		= p->upgrade;  	cp.service_id		= srx->srx_service; -	call = rxrpc_new_client_call(rx, &cp, srx, p->user_call_ID, -				     p->tx_total_len, GFP_KERNEL); +	call = rxrpc_new_client_call(rx, &cp, srx, &p->call, GFP_KERNEL);  	/* The socket is now unlocked */  	_leave(" = %p\n", call); @@ -538,15 +596,17 @@ int rxrpc_do_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg, size_t len)  {  	enum rxrpc_call_state state;  	struct rxrpc_call *call; +	unsigned long now, j;  	int ret;  	struct rxrpc_send_params p = { -		.tx_total_len	= -1, -		.user_call_ID	= 0, -		.abort_code	= 0, -		.command	= RXRPC_CMD_SEND_DATA, -		.exclusive	= false, -		.upgrade	= true, +		.call.tx_total_len	= -1, +		.call.user_call_ID	= 0, +		.call.nr_timeouts	= 0, +		.abort_code		= 0, +		.command		= RXRPC_CMD_SEND_DATA, +		.exclusive		= false, +		.upgrade		= false,  	};  	_enter(""); @@ -559,15 +619,15 @@ int rxrpc_do_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg, size_t len)  		ret = -EINVAL;  		if (rx->sk.sk_state != RXRPC_SERVER_LISTENING)  			goto error_release_sock; -		call = rxrpc_accept_call(rx, p.user_call_ID, NULL); +		call = rxrpc_accept_call(rx, p.call.user_call_ID, NULL);  		/* The socket is now unlocked. */  		if (IS_ERR(call))  			return PTR_ERR(call); -		rxrpc_put_call(call, rxrpc_call_put); -		return 0; +		ret = 0; +		goto out_put_unlock;  	} -	call = rxrpc_find_call_by_user_ID(rx, p.user_call_ID); +	call = rxrpc_find_call_by_user_ID(rx, p.call.user_call_ID);  	if (!call) {  		ret = -EBADSLT;  		if (p.command != RXRPC_CMD_SEND_DATA) @@ -597,16 +657,41 @@ int rxrpc_do_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg, size_t len)  			goto error_put;  		} -		if (p.tx_total_len != -1) { +		if (p.call.tx_total_len != -1) {  			ret = -EINVAL;  			if (call->tx_total_len != -1 ||  			    call->tx_pending ||  			    call->tx_top != 0)  				goto error_put; -			call->tx_total_len = p.tx_total_len; +			call->tx_total_len = p.call.tx_total_len;  		}  	} +	switch (p.call.nr_timeouts) { +	case 3: +		j = msecs_to_jiffies(p.call.timeouts.normal); +		if (p.call.timeouts.normal > 0 && j == 0) +			j = 1; +		WRITE_ONCE(call->next_rx_timo, j); +		/* Fall through */ +	case 2: +		j = msecs_to_jiffies(p.call.timeouts.idle); +		if (p.call.timeouts.idle > 0 && j == 0) +			j = 1; +		WRITE_ONCE(call->next_req_timo, j); +		/* Fall through */ +	case 1: +		if (p.call.timeouts.hard > 0) { +			j = msecs_to_jiffies(p.call.timeouts.hard); +			now = jiffies; +			j += now; +			WRITE_ONCE(call->expect_term_by, j); +			rxrpc_reduce_call_timer(call, j, now, +						rxrpc_timer_set_for_hard); +		} +		break; +	} +  	state = READ_ONCE(call->state);  	_debug("CALL %d USR %lx ST %d on CONN %p",  	       call->debug_id, call->user_call_ID, state, call->conn); @@ -633,6 +718,7 @@ int rxrpc_do_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg, size_t len)  		ret = rxrpc_send_data(rx, call, msg, len, NULL);  	} +out_put_unlock:  	mutex_unlock(&call->user_mutex);  error_put:  	rxrpc_put_call(call, rxrpc_call_put); |