diff options
Diffstat (limited to 'ipc')
| -rw-r--r-- | ipc/mqueue.c | 54 | 
1 files changed, 33 insertions, 21 deletions
diff --git a/ipc/mqueue.c b/ipc/mqueue.c index 3aaea7ffd077..a24ba9fe5bb8 100644 --- a/ipc/mqueue.c +++ b/ipc/mqueue.c @@ -47,8 +47,7 @@  #define RECV		1  #define STATE_NONE	0 -#define STATE_PENDING	1 -#define STATE_READY	2 +#define STATE_READY	1  struct posix_msg_tree_node {  	struct rb_node		rb_node; @@ -571,15 +570,12 @@ static int wq_sleep(struct mqueue_inode_info *info, int sr,  	wq_add(info, sr, ewp);  	for (;;) { -		set_current_state(TASK_INTERRUPTIBLE); +		__set_current_state(TASK_INTERRUPTIBLE);  		spin_unlock(&info->lock);  		time = schedule_hrtimeout_range_clock(timeout, 0,  			HRTIMER_MODE_ABS, CLOCK_REALTIME); -		while (ewp->state == STATE_PENDING) -			cpu_relax(); -  		if (ewp->state == STATE_READY) {  			retval = 0;  			goto out; @@ -907,11 +903,15 @@ out_name:   * list of waiting receivers. A sender checks that list before adding the new   * message into the message array. If there is a waiting receiver, then it   * bypasses the message array and directly hands the message over to the - * receiver. - * The receiver accepts the message and returns without grabbing the queue - * spinlock. Therefore an intermediate STATE_PENDING state and memory barriers - * are necessary. The same algorithm is used for sysv semaphores, see - * ipc/sem.c for more details. + * receiver. The receiver accepts the message and returns without grabbing the + * queue spinlock: + * + * - Set pointer to message. + * - Queue the receiver task for later wakeup (without the info->lock). + * - Update its state to STATE_READY. Now the receiver can continue. + * - Wake up the process after the lock is dropped. Should the process wake up + *   before this wakeup (due to a timeout or a signal) it will either see + *   STATE_READY and continue or acquire the lock to check the state again.   *   * The same algorithm is used for senders.   */ @@ -919,21 +919,29 @@ out_name:  /* pipelined_send() - send a message directly to the task waiting in   * sys_mq_timedreceive() (without inserting message into a queue).   */ -static inline void pipelined_send(struct mqueue_inode_info *info, +static inline void pipelined_send(struct wake_q_head *wake_q, +				  struct mqueue_inode_info *info,  				  struct msg_msg *message,  				  struct ext_wait_queue *receiver)  {  	receiver->msg = message;  	list_del(&receiver->list); -	receiver->state = STATE_PENDING; -	wake_up_process(receiver->task); -	smp_wmb(); +	wake_q_add(wake_q, receiver->task); +	/* +	 * Rely on the implicit cmpxchg barrier from wake_q_add such +	 * that we can ensure that updating receiver->state is the last +	 * write operation: As once set, the receiver can continue, +	 * and if we don't have the reference count from the wake_q, +	 * yet, at that point we can later have a use-after-free +	 * condition and bogus wakeup. +	 */  	receiver->state = STATE_READY;  }  /* pipelined_receive() - if there is task waiting in sys_mq_timedsend()   * gets its message and put to the queue (we have one free place for sure). */ -static inline void pipelined_receive(struct mqueue_inode_info *info) +static inline void pipelined_receive(struct wake_q_head *wake_q, +				     struct mqueue_inode_info *info)  {  	struct ext_wait_queue *sender = wq_get_first_waiter(info, SEND); @@ -944,10 +952,9 @@ static inline void pipelined_receive(struct mqueue_inode_info *info)  	}  	if (msg_insert(sender->msg, info))  		return; +  	list_del(&sender->list); -	sender->state = STATE_PENDING; -	wake_up_process(sender->task); -	smp_wmb(); +	wake_q_add(wake_q, sender->task);  	sender->state = STATE_READY;  } @@ -965,6 +972,7 @@ SYSCALL_DEFINE5(mq_timedsend, mqd_t, mqdes, const char __user *, u_msg_ptr,  	struct timespec ts;  	struct posix_msg_tree_node *new_leaf = NULL;  	int ret = 0; +	WAKE_Q(wake_q);  	if (u_abs_timeout) {  		int res = prepare_timeout(u_abs_timeout, &expires, &ts); @@ -1049,7 +1057,7 @@ SYSCALL_DEFINE5(mq_timedsend, mqd_t, mqdes, const char __user *, u_msg_ptr,  	} else {  		receiver = wq_get_first_waiter(info, RECV);  		if (receiver) { -			pipelined_send(info, msg_ptr, receiver); +			pipelined_send(&wake_q, info, msg_ptr, receiver);  		} else {  			/* adds message to the queue */  			ret = msg_insert(msg_ptr, info); @@ -1062,6 +1070,7 @@ SYSCALL_DEFINE5(mq_timedsend, mqd_t, mqdes, const char __user *, u_msg_ptr,  	}  out_unlock:  	spin_unlock(&info->lock); +	wake_up_q(&wake_q);  out_free:  	if (ret)  		free_msg(msg_ptr); @@ -1149,14 +1158,17 @@ SYSCALL_DEFINE5(mq_timedreceive, mqd_t, mqdes, char __user *, u_msg_ptr,  			msg_ptr = wait.msg;  		}  	} else { +		WAKE_Q(wake_q); +  		msg_ptr = msg_get(info);  		inode->i_atime = inode->i_mtime = inode->i_ctime =  				CURRENT_TIME;  		/* There is now free space in queue. */ -		pipelined_receive(info); +		pipelined_receive(&wake_q, info);  		spin_unlock(&info->lock); +		wake_up_q(&wake_q);  		ret = 0;  	}  	if (ret == 0) {  |