diff options
Diffstat (limited to 'ipc')
| -rw-r--r-- | ipc/mqueue.c | 18 | ||||
| -rw-r--r-- | ipc/msg.c | 204 | ||||
| -rw-r--r-- | ipc/msgutil.c | 4 | ||||
| -rw-r--r-- | ipc/namespace.c | 51 | ||||
| -rw-r--r-- | ipc/sem.c | 140 | 
5 files changed, 233 insertions, 184 deletions
| diff --git a/ipc/mqueue.c b/ipc/mqueue.c index 0b13ace266f2..8cbd6e6894d5 100644 --- a/ipc/mqueue.c +++ b/ipc/mqueue.c @@ -225,7 +225,7 @@ static struct inode *mqueue_get_inode(struct super_block *sb,  	inode->i_mode = mode;  	inode->i_uid = current_fsuid();  	inode->i_gid = current_fsgid(); -	inode->i_mtime = inode->i_ctime = inode->i_atime = CURRENT_TIME; +	inode->i_mtime = inode->i_ctime = inode->i_atime = current_time(inode);  	if (S_ISREG(mode)) {  		struct mqueue_inode_info *info; @@ -446,7 +446,7 @@ static int mqueue_create(struct inode *dir, struct dentry *dentry,  	put_ipc_ns(ipc_ns);  	dir->i_size += DIRENT_SIZE; -	dir->i_ctime = dir->i_mtime = dir->i_atime = CURRENT_TIME; +	dir->i_ctime = dir->i_mtime = dir->i_atime = current_time(dir);  	d_instantiate(dentry, inode);  	dget(dentry); @@ -462,7 +462,7 @@ static int mqueue_unlink(struct inode *dir, struct dentry *dentry)  {  	struct inode *inode = d_inode(dentry); -	dir->i_ctime = dir->i_mtime = dir->i_atime = CURRENT_TIME; +	dir->i_ctime = dir->i_mtime = dir->i_atime = current_time(dir);  	dir->i_size -= DIRENT_SIZE;  	drop_nlink(inode);  	dput(dentry); @@ -500,7 +500,7 @@ static ssize_t mqueue_read_file(struct file *filp, char __user *u_data,  	if (ret <= 0)  		return ret; -	file_inode(filp)->i_atime = file_inode(filp)->i_ctime = CURRENT_TIME; +	file_inode(filp)->i_atime = file_inode(filp)->i_ctime = current_time(file_inode(filp));  	return ret;  } @@ -1060,7 +1060,7 @@ SYSCALL_DEFINE5(mq_timedsend, mqd_t, mqdes, const char __user *, u_msg_ptr,  			__do_notify(info);  		}  		inode->i_atime = inode->i_mtime = inode->i_ctime = -				CURRENT_TIME; +				current_time(inode);  	}  out_unlock:  	spin_unlock(&info->lock); @@ -1156,7 +1156,7 @@ SYSCALL_DEFINE5(mq_timedreceive, mqd_t, mqdes, char __user *, u_msg_ptr,  		msg_ptr = msg_get(info);  		inode->i_atime = inode->i_mtime = inode->i_ctime = -				CURRENT_TIME; +				current_time(inode);  		/* There is now free space in queue. */  		pipelined_receive(&wake_q, info); @@ -1277,7 +1277,7 @@ retry:  	if (u_notification == NULL) {  		if (info->notify_owner == task_tgid(current)) {  			remove_notification(info); -			inode->i_atime = inode->i_ctime = CURRENT_TIME; +			inode->i_atime = inode->i_ctime = current_time(inode);  		}  	} else if (info->notify_owner != NULL) {  		ret = -EBUSY; @@ -1302,7 +1302,7 @@ retry:  		info->notify_owner = get_pid(task_tgid(current));  		info->notify_user_ns = get_user_ns(current_user_ns()); -		inode->i_atime = inode->i_ctime = CURRENT_TIME; +		inode->i_atime = inode->i_ctime = current_time(inode);  	}  	spin_unlock(&info->lock);  out_fput: @@ -1359,7 +1359,7 @@ SYSCALL_DEFINE3(mq_getsetattr, mqd_t, mqdes,  			f.file->f_flags &= ~O_NONBLOCK;  		spin_unlock(&f.file->f_lock); -		inode->i_atime = inode->i_ctime = CURRENT_TIME; +		inode->i_atime = inode->i_ctime = current_time(inode);  	}  	spin_unlock(&info->lock); diff --git a/ipc/msg.c b/ipc/msg.c index c6521c205cb4..e12307d0c920 100644 --- a/ipc/msg.c +++ b/ipc/msg.c @@ -51,19 +51,14 @@ struct msg_receiver {  	long			r_msgtype;  	long			r_maxsize; -	/* -	 * Mark r_msg volatile so that the compiler -	 * does not try to get smart and optimize -	 * it. We rely on this for the lockless -	 * receive algorithm. -	 */ -	struct msg_msg		*volatile r_msg; +	struct msg_msg		*r_msg;  };  /* one msg_sender for each sleeping sender */  struct msg_sender {  	struct list_head	list;  	struct task_struct	*tsk; +	size_t                  msgsz;  };  #define SEARCH_ANY		1 @@ -159,45 +154,72 @@ static int newque(struct ipc_namespace *ns, struct ipc_params *params)  	return msq->q_perm.id;  } -static inline void ss_add(struct msg_queue *msq, struct msg_sender *mss) +static inline bool msg_fits_inqueue(struct msg_queue *msq, size_t msgsz) +{ +	return msgsz + msq->q_cbytes <= msq->q_qbytes && +		1 + msq->q_qnum <= msq->q_qbytes; +} + +static inline void ss_add(struct msg_queue *msq, +			  struct msg_sender *mss, size_t msgsz)  {  	mss->tsk = current; +	mss->msgsz = msgsz;  	__set_current_state(TASK_INTERRUPTIBLE);  	list_add_tail(&mss->list, &msq->q_senders);  }  static inline void ss_del(struct msg_sender *mss)  { -	if (mss->list.next != NULL) +	if (mss->list.next)  		list_del(&mss->list);  } -static void ss_wakeup(struct list_head *h, int kill) +static void ss_wakeup(struct msg_queue *msq, +		      struct wake_q_head *wake_q, bool kill)  {  	struct msg_sender *mss, *t; +	struct task_struct *stop_tsk = NULL; +	struct list_head *h = &msq->q_senders;  	list_for_each_entry_safe(mss, t, h, list) {  		if (kill)  			mss->list.next = NULL; -		wake_up_process(mss->tsk); + +		/* +		 * Stop at the first task we don't wakeup, +		 * we've already iterated the original +		 * sender queue. +		 */ +		else if (stop_tsk == mss->tsk) +			break; +		/* +		 * We are not in an EIDRM scenario here, therefore +		 * verify that we really need to wakeup the task. +		 * To maintain current semantics and wakeup order, +		 * move the sender to the tail on behalf of the +		 * blocked task. +		 */ +		else if (!msg_fits_inqueue(msq, mss->msgsz)) { +			if (!stop_tsk) +				stop_tsk = mss->tsk; + +			list_move_tail(&mss->list, &msq->q_senders); +			continue; +		} + +		wake_q_add(wake_q, mss->tsk);  	}  } -static void expunge_all(struct msg_queue *msq, int res) +static void expunge_all(struct msg_queue *msq, int res, +			struct wake_q_head *wake_q)  {  	struct msg_receiver *msr, *t;  	list_for_each_entry_safe(msr, t, &msq->q_receivers, r_list) { -		msr->r_msg = NULL; /* initialize expunge ordering */ -		wake_up_process(msr->r_tsk); -		/* -		 * Ensure that the wakeup is visible before setting r_msg as -		 * the receiving end depends on it: either spinning on a nil, -		 * or dealing with -EAGAIN cases. See lockless receive part 1 -		 * and 2 in do_msgrcv(). -		 */ -		smp_wmb(); /* barrier (B) */ -		msr->r_msg = ERR_PTR(res); +		wake_q_add(wake_q, msr->r_tsk); +		WRITE_ONCE(msr->r_msg, ERR_PTR(res));  	}  } @@ -213,11 +235,13 @@ static void freeque(struct ipc_namespace *ns, struct kern_ipc_perm *ipcp)  {  	struct msg_msg *msg, *t;  	struct msg_queue *msq = container_of(ipcp, struct msg_queue, q_perm); +	WAKE_Q(wake_q); -	expunge_all(msq, -EIDRM); -	ss_wakeup(&msq->q_senders, 1); +	expunge_all(msq, -EIDRM, &wake_q); +	ss_wakeup(msq, &wake_q, true);  	msg_rmid(ns, msq);  	ipc_unlock_object(&msq->q_perm); +	wake_up_q(&wake_q);  	rcu_read_unlock();  	list_for_each_entry_safe(msg, t, &msq->q_messages, m_list) { @@ -372,6 +396,9 @@ static int msgctl_down(struct ipc_namespace *ns, int msqid, int cmd,  		freeque(ns, ipcp);  		goto out_up;  	case IPC_SET: +	{ +		WAKE_Q(wake_q); +  		if (msqid64.msg_qbytes > ns->msg_ctlmnb &&  		    !capable(CAP_SYS_RESOURCE)) {  			err = -EPERM; @@ -386,15 +413,21 @@ static int msgctl_down(struct ipc_namespace *ns, int msqid, int cmd,  		msq->q_qbytes = msqid64.msg_qbytes;  		msq->q_ctime = get_seconds(); -		/* sleeping receivers might be excluded by +		/* +		 * Sleeping receivers might be excluded by  		 * stricter permissions.  		 */ -		expunge_all(msq, -EAGAIN); -		/* sleeping senders might be able to send +		expunge_all(msq, -EAGAIN, &wake_q); +		/* +		 * Sleeping senders might be able to send  		 * due to a larger queue size.  		 */ -		ss_wakeup(&msq->q_senders, 0); -		break; +		ss_wakeup(msq, &wake_q, false); +		ipc_unlock_object(&msq->q_perm); +		wake_up_q(&wake_q); + +		goto out_unlock1; +	}  	default:  		err = -EINVAL;  		goto out_unlock1; @@ -566,7 +599,8 @@ static int testmsg(struct msg_msg *msg, long type, int mode)  	return 0;  } -static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg) +static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg, +				 struct wake_q_head *wake_q)  {  	struct msg_receiver *msr, *t; @@ -577,27 +611,14 @@ static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg)  			list_del(&msr->r_list);  			if (msr->r_maxsize < msg->m_ts) { -				/* initialize pipelined send ordering */ -				msr->r_msg = NULL; -				wake_up_process(msr->r_tsk); -				/* barrier (B) see barrier comment below */ -				smp_wmb(); -				msr->r_msg = ERR_PTR(-E2BIG); +				wake_q_add(wake_q, msr->r_tsk); +				WRITE_ONCE(msr->r_msg, ERR_PTR(-E2BIG));  			} else { -				msr->r_msg = NULL;  				msq->q_lrpid = task_pid_vnr(msr->r_tsk);  				msq->q_rtime = get_seconds(); -				wake_up_process(msr->r_tsk); -				/* -				 * Ensure that the wakeup is visible before -				 * setting r_msg, as the receiving can otherwise -				 * exit - once r_msg is set, the receiver can -				 * continue. See lockless receive part 1 and 2 -				 * in do_msgrcv(). Barrier (B). -				 */ -				smp_wmb(); -				msr->r_msg = msg; +				wake_q_add(wake_q, msr->r_tsk); +				WRITE_ONCE(msr->r_msg, msg);  				return 1;  			}  		} @@ -613,6 +634,7 @@ long do_msgsnd(int msqid, long mtype, void __user *mtext,  	struct msg_msg *msg;  	int err;  	struct ipc_namespace *ns; +	WAKE_Q(wake_q);  	ns = current->nsproxy->ipc_ns; @@ -654,10 +676,8 @@ long do_msgsnd(int msqid, long mtype, void __user *mtext,  		if (err)  			goto out_unlock0; -		if (msgsz + msq->q_cbytes <= msq->q_qbytes && -				1 + msq->q_qnum <= msq->q_qbytes) { +		if (msg_fits_inqueue(msq, msgsz))  			break; -		}  		/* queue full, wait: */  		if (msgflg & IPC_NOWAIT) { @@ -666,7 +686,7 @@ long do_msgsnd(int msqid, long mtype, void __user *mtext,  		}  		/* enqueue the sender and prepare to block */ -		ss_add(msq, &s); +		ss_add(msq, &s, msgsz);  		if (!ipc_rcu_getref(msq)) {  			err = -EIDRM; @@ -686,7 +706,6 @@ long do_msgsnd(int msqid, long mtype, void __user *mtext,  			err = -EIDRM;  			goto out_unlock0;  		} -  		ss_del(&s);  		if (signal_pending(current)) { @@ -695,10 +714,11 @@ long do_msgsnd(int msqid, long mtype, void __user *mtext,  		}  	} +  	msq->q_lspid = task_tgid_vnr(current);  	msq->q_stime = get_seconds(); -	if (!pipelined_send(msq, msg)) { +	if (!pipelined_send(msq, msg, &wake_q)) {  		/* no one is waiting for this message, enqueue it */  		list_add_tail(&msg->m_list, &msq->q_messages);  		msq->q_cbytes += msgsz; @@ -712,6 +732,7 @@ long do_msgsnd(int msqid, long mtype, void __user *mtext,  out_unlock0:  	ipc_unlock_object(&msq->q_perm); +	wake_up_q(&wake_q);  out_unlock1:  	rcu_read_unlock();  	if (msg != NULL) @@ -829,6 +850,7 @@ long do_msgrcv(int msqid, void __user *buf, size_t bufsz, long msgtyp, int msgfl  	struct msg_queue *msq;  	struct ipc_namespace *ns;  	struct msg_msg *msg, *copy = NULL; +	WAKE_Q(wake_q);  	ns = current->nsproxy->ipc_ns; @@ -893,7 +915,7 @@ long do_msgrcv(int msqid, void __user *buf, size_t bufsz, long msgtyp, int msgfl  			msq->q_cbytes -= msg->m_ts;  			atomic_sub(msg->m_ts, &ns->msg_bytes);  			atomic_dec(&ns->msg_hdrs); -			ss_wakeup(&msq->q_senders, 0); +			ss_wakeup(msq, &wake_q, false);  			goto out_unlock0;  		} @@ -919,71 +941,38 @@ long do_msgrcv(int msqid, void __user *buf, size_t bufsz, long msgtyp, int msgfl  		rcu_read_unlock();  		schedule(); -		/* Lockless receive, part 1: -		 * Disable preemption.  We don't hold a reference to the queue -		 * and getting a reference would defeat the idea of a lockless -		 * operation, thus the code relies on rcu to guarantee the -		 * existence of msq: +		/* +		 * Lockless receive, part 1: +		 * We don't hold a reference to the queue and getting a +		 * reference would defeat the idea of a lockless operation, +		 * thus the code relies on rcu to guarantee the existence of +		 * msq:  		 * Prior to destruction, expunge_all(-EIRDM) changes r_msg.  		 * Thus if r_msg is -EAGAIN, then the queue not yet destroyed. -		 * rcu_read_lock() prevents preemption between reading r_msg -		 * and acquiring the q_perm.lock in ipc_lock_object().  		 */  		rcu_read_lock(); -		/* Lockless receive, part 2: -		 * Wait until pipelined_send or expunge_all are outside of -		 * wake_up_process(). There is a race with exit(), see -		 * ipc/mqueue.c for the details. The correct serialization -		 * ensures that a receiver cannot continue without the wakeup -		 * being visibible _before_ setting r_msg: -		 * -		 * CPU 0                             CPU 1 -		 * <loop receiver> -		 *   smp_rmb(); (A) <-- pair -.      <waker thread> -		 *   <load ->r_msg>           |        msr->r_msg = NULL; -		 *                            |        wake_up_process(); -		 * <continue>                 `------> smp_wmb(); (B) -		 *                                     msr->r_msg = msg; +		/* +		 * Lockless receive, part 2: +		 * The work in pipelined_send() and expunge_all(): +		 * - Set pointer to message +		 * - Queue the receiver task for later wakeup +		 * - Wake up the process after the lock is dropped.  		 * -		 * Where (A) orders the message value read and where (B) orders -		 * the write to the r_msg -- done in both pipelined_send and -		 * expunge_all. -		 */ -		for (;;) { -			/* -			 * Pairs with writer barrier in pipelined_send -			 * or expunge_all. -			 */ -			smp_rmb(); /* barrier (A) */ -			msg = (struct msg_msg *)msr_d.r_msg; -			if (msg) -				break; - -			/* -			 * The cpu_relax() call is a compiler barrier -			 * which forces everything in this loop to be -			 * re-loaded. -			 */ -			cpu_relax(); -		} - -		/* Lockless receive, part 3: -		 * If there is a message or an error then accept it without -		 * locking. +		 * Should the process wake up before this wakeup (due to a +		 * signal) it will either see the message and continue ...  		 */ +		msg = READ_ONCE(msr_d.r_msg);  		if (msg != ERR_PTR(-EAGAIN))  			goto out_unlock1; -		/* Lockless receive, part 3: -		 * Acquire the queue spinlock. -		 */ +		 /* +		  * ... or see -EAGAIN, acquire the lock to check the message +		  * again. +		  */  		ipc_lock_object(&msq->q_perm); -		/* Lockless receive, part 4: -		 * Repeat test after acquiring the spinlock. -		 */ -		msg = (struct msg_msg *)msr_d.r_msg; +		msg = msr_d.r_msg;  		if (msg != ERR_PTR(-EAGAIN))  			goto out_unlock0; @@ -998,6 +987,7 @@ long do_msgrcv(int msqid, void __user *buf, size_t bufsz, long msgtyp, int msgfl  out_unlock0:  	ipc_unlock_object(&msq->q_perm); +	wake_up_q(&wake_q);  out_unlock1:  	rcu_read_unlock();  	if (IS_ERR(msg)) { diff --git a/ipc/msgutil.c b/ipc/msgutil.c index a521999de4f1..bf74eaa5c39f 100644 --- a/ipc/msgutil.c +++ b/ipc/msgutil.c @@ -53,7 +53,7 @@ static struct msg_msg *alloc_msg(size_t len)  	size_t alen;  	alen = min(len, DATALEN_MSG); -	msg = kmalloc(sizeof(*msg) + alen, GFP_KERNEL); +	msg = kmalloc(sizeof(*msg) + alen, GFP_KERNEL_ACCOUNT);  	if (msg == NULL)  		return NULL; @@ -65,7 +65,7 @@ static struct msg_msg *alloc_msg(size_t len)  	while (len > 0) {  		struct msg_msgseg *seg;  		alen = min(len, DATALEN_SEG); -		seg = kmalloc(sizeof(*seg) + alen, GFP_KERNEL); +		seg = kmalloc(sizeof(*seg) + alen, GFP_KERNEL_ACCOUNT);  		if (seg == NULL)  			goto out_err;  		*pseg = seg; diff --git a/ipc/namespace.c b/ipc/namespace.c index d87e6baa1323..0abdea496493 100644 --- a/ipc/namespace.c +++ b/ipc/namespace.c @@ -16,39 +16,61 @@  #include "util.h" +static struct ucounts *inc_ipc_namespaces(struct user_namespace *ns) +{ +	return inc_ucount(ns, current_euid(), UCOUNT_IPC_NAMESPACES); +} + +static void dec_ipc_namespaces(struct ucounts *ucounts) +{ +	dec_ucount(ucounts, UCOUNT_IPC_NAMESPACES); +} +  static struct ipc_namespace *create_ipc_ns(struct user_namespace *user_ns,  					   struct ipc_namespace *old_ns)  {  	struct ipc_namespace *ns; +	struct ucounts *ucounts;  	int err; +	err = -ENOSPC; +	ucounts = inc_ipc_namespaces(user_ns); +	if (!ucounts) +		goto fail; + +	err = -ENOMEM;  	ns = kmalloc(sizeof(struct ipc_namespace), GFP_KERNEL);  	if (ns == NULL) -		return ERR_PTR(-ENOMEM); +		goto fail_dec;  	err = ns_alloc_inum(&ns->ns); -	if (err) { -		kfree(ns); -		return ERR_PTR(err); -	} +	if (err) +		goto fail_free;  	ns->ns.ops = &ipcns_operations;  	atomic_set(&ns->count, 1);  	ns->user_ns = get_user_ns(user_ns); +	ns->ucounts = ucounts;  	err = mq_init_ns(ns); -	if (err) { -		put_user_ns(ns->user_ns); -		ns_free_inum(&ns->ns); -		kfree(ns); -		return ERR_PTR(err); -	} +	if (err) +		goto fail_put;  	sem_init_ns(ns);  	msg_init_ns(ns);  	shm_init_ns(ns);  	return ns; + +fail_put: +	put_user_ns(ns->user_ns); +	ns_free_inum(&ns->ns); +fail_free: +	kfree(ns); +fail_dec: +	dec_ipc_namespaces(ucounts); +fail: +	return ERR_PTR(err);  }  struct ipc_namespace *copy_ipcs(unsigned long flags, @@ -96,6 +118,7 @@ static void free_ipc_ns(struct ipc_namespace *ns)  	msg_exit_ns(ns);  	shm_exit_ns(ns); +	dec_ipc_namespaces(ns->ucounts);  	put_user_ns(ns->user_ns);  	ns_free_inum(&ns->ns);  	kfree(ns); @@ -165,10 +188,16 @@ static int ipcns_install(struct nsproxy *nsproxy, struct ns_common *new)  	return 0;  } +static struct user_namespace *ipcns_owner(struct ns_common *ns) +{ +	return to_ipc_ns(ns)->user_ns; +} +  const struct proc_ns_operations ipcns_operations = {  	.name		= "ipc",  	.type		= CLONE_NEWIPC,  	.get		= ipcns_get,  	.put		= ipcns_put,  	.install	= ipcns_install, +	.owner		= ipcns_owner,  }; diff --git a/ipc/sem.c b/ipc/sem.c index 7c9d4f7683c0..10b94bc59d4a 100644 --- a/ipc/sem.c +++ b/ipc/sem.c @@ -162,14 +162,21 @@ static int sysvipc_sem_proc_show(struct seq_file *s, void *it);  /*   * Locking: + * a) global sem_lock() for read/write   *	sem_undo.id_next,   *	sem_array.complex_count, - *	sem_array.pending{_alter,_cont}, - *	sem_array.sem_undo: global sem_lock() for read/write - *	sem_undo.proc_next: only "current" is allowed to read/write that field. + *	sem_array.complex_mode + *	sem_array.pending{_alter,_const}, + *	sem_array.sem_undo   * + * b) global or semaphore sem_lock() for read/write:   *	sem_array.sem_base[i].pending_{const,alter}: - *		global or semaphore sem_lock() for read/write + *	sem_array.complex_mode (for read) + * + * c) special: + *	sem_undo_list.list_proc: + *	* undo_list->lock for write + *	* rcu for read   */  #define sc_semmsl	sem_ctls[0] @@ -260,31 +267,62 @@ static void sem_rcu_free(struct rcu_head *head)  }  /* - * Wait until all currently ongoing simple ops have completed. + * Enter the mode suitable for non-simple operations:   * Caller must own sem_perm.lock. - * New simple ops cannot start, because simple ops first check - * that sem_perm.lock is free. - * that a) sem_perm.lock is free and b) complex_count is 0.   */ -static void sem_wait_array(struct sem_array *sma) +static void complexmode_enter(struct sem_array *sma)  {  	int i;  	struct sem *sem; -	if (sma->complex_count)  { -		/* The thread that increased sma->complex_count waited on -		 * all sem->lock locks. Thus we don't need to wait again. -		 */ +	if (sma->complex_mode)  { +		/* We are already in complex_mode. Nothing to do */  		return;  	} +	/* We need a full barrier after seting complex_mode: +	 * The write to complex_mode must be visible +	 * before we read the first sem->lock spinlock state. +	 */ +	smp_store_mb(sma->complex_mode, true); +  	for (i = 0; i < sma->sem_nsems; i++) {  		sem = sma->sem_base + i;  		spin_unlock_wait(&sem->lock);  	} +	/* +	 * spin_unlock_wait() is not a memory barriers, it is only a +	 * control barrier. The code must pair with spin_unlock(&sem->lock), +	 * thus just the control barrier is insufficient. +	 * +	 * smp_rmb() is sufficient, as writes cannot pass the control barrier. +	 */ +	smp_rmb();  }  /* + * Try to leave the mode that disallows simple operations: + * Caller must own sem_perm.lock. + */ +static void complexmode_tryleave(struct sem_array *sma) +{ +	if (sma->complex_count)  { +		/* Complex ops are sleeping. +		 * We must stay in complex mode +		 */ +		return; +	} +	/* +	 * Immediately after setting complex_mode to false, +	 * a simple op can start. Thus: all memory writes +	 * performed by the current operation must be visible +	 * before we set complex_mode to false. +	 */ +	smp_store_release(&sma->complex_mode, false); +} + +#define SEM_GLOBAL_LOCK	(-1) +/*   * If the request contains only one semaphore operation, and there are   * no complex transactions pending, lock only the semaphore involved.   * Otherwise, lock the entire semaphore array, since we either have @@ -300,56 +338,42 @@ static inline int sem_lock(struct sem_array *sma, struct sembuf *sops,  		/* Complex operation - acquire a full lock */  		ipc_lock_object(&sma->sem_perm); -		/* And wait until all simple ops that are processed -		 * right now have dropped their locks. -		 */ -		sem_wait_array(sma); -		return -1; +		/* Prevent parallel simple ops */ +		complexmode_enter(sma); +		return SEM_GLOBAL_LOCK;  	}  	/*  	 * Only one semaphore affected - try to optimize locking. -	 * The rules are: -	 * - optimized locking is possible if no complex operation -	 *   is either enqueued or processed right now. -	 * - The test for enqueued complex ops is simple: -	 *      sma->complex_count != 0 -	 * - Testing for complex ops that are processed right now is -	 *   a bit more difficult. Complex ops acquire the full lock -	 *   and first wait that the running simple ops have completed. -	 *   (see above) -	 *   Thus: If we own a simple lock and the global lock is free -	 *	and complex_count is now 0, then it will stay 0 and -	 *	thus just locking sem->lock is sufficient. +	 * Optimized locking is possible if no complex operation +	 * is either enqueued or processed right now. +	 * +	 * Both facts are tracked by complex_mode.  	 */  	sem = sma->sem_base + sops->sem_num; -	if (sma->complex_count == 0) { +	/* +	 * Initial check for complex_mode. Just an optimization, +	 * no locking, no memory barrier. +	 */ +	if (!sma->complex_mode) {  		/*  		 * It appears that no complex operation is around.  		 * Acquire the per-semaphore lock.  		 */  		spin_lock(&sem->lock); -		/* Then check that the global lock is free */ -		if (!spin_is_locked(&sma->sem_perm.lock)) { -			/* -			 * We need a memory barrier with acquire semantics, -			 * otherwise we can race with another thread that does: -			 *	complex_count++; -			 *	spin_unlock(sem_perm.lock); -			 */ -			smp_acquire__after_ctrl_dep(); +		/* +		 * See 51d7d5205d33 +		 * ("powerpc: Add smp_mb() to arch_spin_is_locked()"): +		 * A full barrier is required: the write of sem->lock +		 * must be visible before the read is executed +		 */ +		smp_mb(); -			/* -			 * Now repeat the test of complex_count: -			 * It can't change anymore until we drop sem->lock. -			 * Thus: if is now 0, then it will stay 0. -			 */ -			if (sma->complex_count == 0) { -				/* fast path successful! */ -				return sops->sem_num; -			} +		if (!smp_load_acquire(&sma->complex_mode)) { +			/* fast path successful! */ +			return sops->sem_num;  		}  		spin_unlock(&sem->lock);  	} @@ -369,15 +393,16 @@ static inline int sem_lock(struct sem_array *sma, struct sembuf *sops,  		/* Not a false alarm, thus complete the sequence for a  		 * full lock.  		 */ -		sem_wait_array(sma); -		return -1; +		complexmode_enter(sma); +		return SEM_GLOBAL_LOCK;  	}  }  static inline void sem_unlock(struct sem_array *sma, int locknum)  { -	if (locknum == -1) { +	if (locknum == SEM_GLOBAL_LOCK) {  		unmerge_queues(sma); +		complexmode_tryleave(sma);  		ipc_unlock_object(&sma->sem_perm);  	} else {  		struct sem *sem = sma->sem_base + locknum; @@ -529,6 +554,7 @@ static int newary(struct ipc_namespace *ns, struct ipc_params *params)  	}  	sma->complex_count = 0; +	sma->complex_mode = true; /* dropped by sem_unlock below */  	INIT_LIST_HEAD(&sma->pending_alter);  	INIT_LIST_HEAD(&sma->pending_const);  	INIT_LIST_HEAD(&sma->list_id); @@ -2079,6 +2105,8 @@ void exit_sem(struct task_struct *tsk)  		struct list_head tasks;  		int semid, i; +		cond_resched(); +  		rcu_read_lock();  		un = list_entry_rcu(ulp->list_proc.next,  				    struct sem_undo, list_proc); @@ -2184,10 +2212,10 @@ static int sysvipc_sem_proc_show(struct seq_file *s, void *it)  	/*  	 * The proc interface isn't aware of sem_lock(), it calls  	 * ipc_lock_object() directly (in sysvipc_find_ipc). -	 * In order to stay compatible with sem_lock(), we must wait until -	 * all simple semop() calls have left their critical regions. +	 * In order to stay compatible with sem_lock(), we must +	 * enter / leave complex_mode.  	 */ -	sem_wait_array(sma); +	complexmode_enter(sma);  	sem_otime = get_semotime(sma); @@ -2204,6 +2232,8 @@ static int sysvipc_sem_proc_show(struct seq_file *s, void *it)  		   sem_otime,  		   sma->sem_ctime); +	complexmode_tryleave(sma); +  	return 0;  }  #endif |