diff options
Diffstat (limited to 'ipc')
-rw-r--r-- | ipc/mqueue.c | 18 | ||||
-rw-r--r-- | ipc/msg.c | 204 | ||||
-rw-r--r-- | ipc/namespace.c | 51 | ||||
-rw-r--r-- | ipc/sem.c | 140 |
4 files changed, 231 insertions, 182 deletions
diff --git a/ipc/mqueue.c b/ipc/mqueue.c index 0b13ace266f2..8cbd6e6894d5 100644 --- a/ipc/mqueue.c +++ b/ipc/mqueue.c @@ -225,7 +225,7 @@ static struct inode *mqueue_get_inode(struct super_block *sb, inode->i_mode = mode; inode->i_uid = current_fsuid(); inode->i_gid = current_fsgid(); - inode->i_mtime = inode->i_ctime = inode->i_atime = CURRENT_TIME; + inode->i_mtime = inode->i_ctime = inode->i_atime = current_time(inode); if (S_ISREG(mode)) { struct mqueue_inode_info *info; @@ -446,7 +446,7 @@ static int mqueue_create(struct inode *dir, struct dentry *dentry, put_ipc_ns(ipc_ns); dir->i_size += DIRENT_SIZE; - dir->i_ctime = dir->i_mtime = dir->i_atime = CURRENT_TIME; + dir->i_ctime = dir->i_mtime = dir->i_atime = current_time(dir); d_instantiate(dentry, inode); dget(dentry); @@ -462,7 +462,7 @@ static int mqueue_unlink(struct inode *dir, struct dentry *dentry) { struct inode *inode = d_inode(dentry); - dir->i_ctime = dir->i_mtime = dir->i_atime = CURRENT_TIME; + dir->i_ctime = dir->i_mtime = dir->i_atime = current_time(dir); dir->i_size -= DIRENT_SIZE; drop_nlink(inode); dput(dentry); @@ -500,7 +500,7 @@ static ssize_t mqueue_read_file(struct file *filp, char __user *u_data, if (ret <= 0) return ret; - file_inode(filp)->i_atime = file_inode(filp)->i_ctime = CURRENT_TIME; + file_inode(filp)->i_atime = file_inode(filp)->i_ctime = current_time(file_inode(filp)); return ret; } @@ -1060,7 +1060,7 @@ SYSCALL_DEFINE5(mq_timedsend, mqd_t, mqdes, const char __user *, u_msg_ptr, __do_notify(info); } inode->i_atime = inode->i_mtime = inode->i_ctime = - CURRENT_TIME; + current_time(inode); } out_unlock: spin_unlock(&info->lock); @@ -1156,7 +1156,7 @@ SYSCALL_DEFINE5(mq_timedreceive, mqd_t, mqdes, char __user *, u_msg_ptr, msg_ptr = msg_get(info); inode->i_atime = inode->i_mtime = inode->i_ctime = - CURRENT_TIME; + current_time(inode); /* There is now free space in queue. */ pipelined_receive(&wake_q, info); @@ -1277,7 +1277,7 @@ retry: if (u_notification == NULL) { if (info->notify_owner == task_tgid(current)) { remove_notification(info); - inode->i_atime = inode->i_ctime = CURRENT_TIME; + inode->i_atime = inode->i_ctime = current_time(inode); } } else if (info->notify_owner != NULL) { ret = -EBUSY; @@ -1302,7 +1302,7 @@ retry: info->notify_owner = get_pid(task_tgid(current)); info->notify_user_ns = get_user_ns(current_user_ns()); - inode->i_atime = inode->i_ctime = CURRENT_TIME; + inode->i_atime = inode->i_ctime = current_time(inode); } spin_unlock(&info->lock); out_fput: @@ -1359,7 +1359,7 @@ SYSCALL_DEFINE3(mq_getsetattr, mqd_t, mqdes, f.file->f_flags &= ~O_NONBLOCK; spin_unlock(&f.file->f_lock); - inode->i_atime = inode->i_ctime = CURRENT_TIME; + inode->i_atime = inode->i_ctime = current_time(inode); } spin_unlock(&info->lock); diff --git a/ipc/msg.c b/ipc/msg.c index c6521c205cb4..e12307d0c920 100644 --- a/ipc/msg.c +++ b/ipc/msg.c @@ -51,19 +51,14 @@ struct msg_receiver { long r_msgtype; long r_maxsize; - /* - * Mark r_msg volatile so that the compiler - * does not try to get smart and optimize - * it. We rely on this for the lockless - * receive algorithm. - */ - struct msg_msg *volatile r_msg; + struct msg_msg *r_msg; }; /* one msg_sender for each sleeping sender */ struct msg_sender { struct list_head list; struct task_struct *tsk; + size_t msgsz; }; #define SEARCH_ANY 1 @@ -159,45 +154,72 @@ static int newque(struct ipc_namespace *ns, struct ipc_params *params) return msq->q_perm.id; } -static inline void ss_add(struct msg_queue *msq, struct msg_sender *mss) +static inline bool msg_fits_inqueue(struct msg_queue *msq, size_t msgsz) +{ + return msgsz + msq->q_cbytes <= msq->q_qbytes && + 1 + msq->q_qnum <= msq->q_qbytes; +} + +static inline void ss_add(struct msg_queue *msq, + struct msg_sender *mss, size_t msgsz) { mss->tsk = current; + mss->msgsz = msgsz; __set_current_state(TASK_INTERRUPTIBLE); list_add_tail(&mss->list, &msq->q_senders); } static inline void ss_del(struct msg_sender *mss) { - if (mss->list.next != NULL) + if (mss->list.next) list_del(&mss->list); } -static void ss_wakeup(struct list_head *h, int kill) +static void ss_wakeup(struct msg_queue *msq, + struct wake_q_head *wake_q, bool kill) { struct msg_sender *mss, *t; + struct task_struct *stop_tsk = NULL; + struct list_head *h = &msq->q_senders; list_for_each_entry_safe(mss, t, h, list) { if (kill) mss->list.next = NULL; - wake_up_process(mss->tsk); + + /* + * Stop at the first task we don't wakeup, + * we've already iterated the original + * sender queue. + */ + else if (stop_tsk == mss->tsk) + break; + /* + * We are not in an EIDRM scenario here, therefore + * verify that we really need to wakeup the task. + * To maintain current semantics and wakeup order, + * move the sender to the tail on behalf of the + * blocked task. + */ + else if (!msg_fits_inqueue(msq, mss->msgsz)) { + if (!stop_tsk) + stop_tsk = mss->tsk; + + list_move_tail(&mss->list, &msq->q_senders); + continue; + } + + wake_q_add(wake_q, mss->tsk); } } -static void expunge_all(struct msg_queue *msq, int res) +static void expunge_all(struct msg_queue *msq, int res, + struct wake_q_head *wake_q) { struct msg_receiver *msr, *t; list_for_each_entry_safe(msr, t, &msq->q_receivers, r_list) { - msr->r_msg = NULL; /* initialize expunge ordering */ - wake_up_process(msr->r_tsk); - /* - * Ensure that the wakeup is visible before setting r_msg as - * the receiving end depends on it: either spinning on a nil, - * or dealing with -EAGAIN cases. See lockless receive part 1 - * and 2 in do_msgrcv(). - */ - smp_wmb(); /* barrier (B) */ - msr->r_msg = ERR_PTR(res); + wake_q_add(wake_q, msr->r_tsk); + WRITE_ONCE(msr->r_msg, ERR_PTR(res)); } } @@ -213,11 +235,13 @@ static void freeque(struct ipc_namespace *ns, struct kern_ipc_perm *ipcp) { struct msg_msg *msg, *t; struct msg_queue *msq = container_of(ipcp, struct msg_queue, q_perm); + WAKE_Q(wake_q); - expunge_all(msq, -EIDRM); - ss_wakeup(&msq->q_senders, 1); + expunge_all(msq, -EIDRM, &wake_q); + ss_wakeup(msq, &wake_q, true); msg_rmid(ns, msq); ipc_unlock_object(&msq->q_perm); + wake_up_q(&wake_q); rcu_read_unlock(); list_for_each_entry_safe(msg, t, &msq->q_messages, m_list) { @@ -372,6 +396,9 @@ static int msgctl_down(struct ipc_namespace *ns, int msqid, int cmd, freeque(ns, ipcp); goto out_up; case IPC_SET: + { + WAKE_Q(wake_q); + if (msqid64.msg_qbytes > ns->msg_ctlmnb && !capable(CAP_SYS_RESOURCE)) { err = -EPERM; @@ -386,15 +413,21 @@ static int msgctl_down(struct ipc_namespace *ns, int msqid, int cmd, msq->q_qbytes = msqid64.msg_qbytes; msq->q_ctime = get_seconds(); - /* sleeping receivers might be excluded by + /* + * Sleeping receivers might be excluded by * stricter permissions. */ - expunge_all(msq, -EAGAIN); - /* sleeping senders might be able to send + expunge_all(msq, -EAGAIN, &wake_q); + /* + * Sleeping senders might be able to send * due to a larger queue size. */ - ss_wakeup(&msq->q_senders, 0); - break; + ss_wakeup(msq, &wake_q, false); + ipc_unlock_object(&msq->q_perm); + wake_up_q(&wake_q); + + goto out_unlock1; + } default: err = -EINVAL; goto out_unlock1; @@ -566,7 +599,8 @@ static int testmsg(struct msg_msg *msg, long type, int mode) return 0; } -static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg) +static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg, + struct wake_q_head *wake_q) { struct msg_receiver *msr, *t; @@ -577,27 +611,14 @@ static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg) list_del(&msr->r_list); if (msr->r_maxsize < msg->m_ts) { - /* initialize pipelined send ordering */ - msr->r_msg = NULL; - wake_up_process(msr->r_tsk); - /* barrier (B) see barrier comment below */ - smp_wmb(); - msr->r_msg = ERR_PTR(-E2BIG); + wake_q_add(wake_q, msr->r_tsk); + WRITE_ONCE(msr->r_msg, ERR_PTR(-E2BIG)); } else { - msr->r_msg = NULL; msq->q_lrpid = task_pid_vnr(msr->r_tsk); msq->q_rtime = get_seconds(); - wake_up_process(msr->r_tsk); - /* - * Ensure that the wakeup is visible before - * setting r_msg, as the receiving can otherwise - * exit - once r_msg is set, the receiver can - * continue. See lockless receive part 1 and 2 - * in do_msgrcv(). Barrier (B). - */ - smp_wmb(); - msr->r_msg = msg; + wake_q_add(wake_q, msr->r_tsk); + WRITE_ONCE(msr->r_msg, msg); return 1; } } @@ -613,6 +634,7 @@ long do_msgsnd(int msqid, long mtype, void __user *mtext, struct msg_msg *msg; int err; struct ipc_namespace *ns; + WAKE_Q(wake_q); ns = current->nsproxy->ipc_ns; @@ -654,10 +676,8 @@ long do_msgsnd(int msqid, long mtype, void __user *mtext, if (err) goto out_unlock0; - if (msgsz + msq->q_cbytes <= msq->q_qbytes && - 1 + msq->q_qnum <= msq->q_qbytes) { + if (msg_fits_inqueue(msq, msgsz)) break; - } /* queue full, wait: */ if (msgflg & IPC_NOWAIT) { @@ -666,7 +686,7 @@ long do_msgsnd(int msqid, long mtype, void __user *mtext, } /* enqueue the sender and prepare to block */ - ss_add(msq, &s); + ss_add(msq, &s, msgsz); if (!ipc_rcu_getref(msq)) { err = -EIDRM; @@ -686,7 +706,6 @@ long do_msgsnd(int msqid, long mtype, void __user *mtext, err = -EIDRM; goto out_unlock0; } - ss_del(&s); if (signal_pending(current)) { @@ -695,10 +714,11 @@ long do_msgsnd(int msqid, long mtype, void __user *mtext, } } + msq->q_lspid = task_tgid_vnr(current); msq->q_stime = get_seconds(); - if (!pipelined_send(msq, msg)) { + if (!pipelined_send(msq, msg, &wake_q)) { /* no one is waiting for this message, enqueue it */ list_add_tail(&msg->m_list, &msq->q_messages); msq->q_cbytes += msgsz; @@ -712,6 +732,7 @@ long do_msgsnd(int msqid, long mtype, void __user *mtext, out_unlock0: ipc_unlock_object(&msq->q_perm); + wake_up_q(&wake_q); out_unlock1: rcu_read_unlock(); if (msg != NULL) @@ -829,6 +850,7 @@ long do_msgrcv(int msqid, void __user *buf, size_t bufsz, long msgtyp, int msgfl struct msg_queue *msq; struct ipc_namespace *ns; struct msg_msg *msg, *copy = NULL; + WAKE_Q(wake_q); ns = current->nsproxy->ipc_ns; @@ -893,7 +915,7 @@ long do_msgrcv(int msqid, void __user *buf, size_t bufsz, long msgtyp, int msgfl msq->q_cbytes -= msg->m_ts; atomic_sub(msg->m_ts, &ns->msg_bytes); atomic_dec(&ns->msg_hdrs); - ss_wakeup(&msq->q_senders, 0); + ss_wakeup(msq, &wake_q, false); goto out_unlock0; } @@ -919,71 +941,38 @@ long do_msgrcv(int msqid, void __user *buf, size_t bufsz, long msgtyp, int msgfl rcu_read_unlock(); schedule(); - /* Lockless receive, part 1: - * Disable preemption. We don't hold a reference to the queue - * and getting a reference would defeat the idea of a lockless - * operation, thus the code relies on rcu to guarantee the - * existence of msq: + /* + * Lockless receive, part 1: + * We don't hold a reference to the queue and getting a + * reference would defeat the idea of a lockless operation, + * thus the code relies on rcu to guarantee the existence of + * msq: * Prior to destruction, expunge_all(-EIRDM) changes r_msg. * Thus if r_msg is -EAGAIN, then the queue not yet destroyed. - * rcu_read_lock() prevents preemption between reading r_msg - * and acquiring the q_perm.lock in ipc_lock_object(). */ rcu_read_lock(); - /* Lockless receive, part 2: - * Wait until pipelined_send or expunge_all are outside of - * wake_up_process(). There is a race with exit(), see - * ipc/mqueue.c for the details. The correct serialization - * ensures that a receiver cannot continue without the wakeup - * being visibible _before_ setting r_msg: - * - * CPU 0 CPU 1 - * <loop receiver> - * smp_rmb(); (A) <-- pair -. <waker thread> - * <load ->r_msg> | msr->r_msg = NULL; - * | wake_up_process(); - * <continue> `------> smp_wmb(); (B) - * msr->r_msg = msg; + /* + * Lockless receive, part 2: + * The work in pipelined_send() and expunge_all(): + * - Set pointer to message + * - Queue the receiver task for later wakeup + * - Wake up the process after the lock is dropped. * - * Where (A) orders the message value read and where (B) orders - * the write to the r_msg -- done in both pipelined_send and - * expunge_all. - */ - for (;;) { - /* - * Pairs with writer barrier in pipelined_send - * or expunge_all. - */ - smp_rmb(); /* barrier (A) */ - msg = (struct msg_msg *)msr_d.r_msg; - if (msg) - break; - - /* - * The cpu_relax() call is a compiler barrier - * which forces everything in this loop to be - * re-loaded. - */ - cpu_relax(); - } - - /* Lockless receive, part 3: - * If there is a message or an error then accept it without - * locking. + * Should the process wake up before this wakeup (due to a + * signal) it will either see the message and continue ... */ + msg = READ_ONCE(msr_d.r_msg); if (msg != ERR_PTR(-EAGAIN)) goto out_unlock1; - /* Lockless receive, part 3: - * Acquire the queue spinlock. - */ + /* + * ... or see -EAGAIN, acquire the lock to check the message + * again. + */ ipc_lock_object(&msq->q_perm); - /* Lockless receive, part 4: - * Repeat test after acquiring the spinlock. - */ - msg = (struct msg_msg *)msr_d.r_msg; + msg = msr_d.r_msg; if (msg != ERR_PTR(-EAGAIN)) goto out_unlock0; @@ -998,6 +987,7 @@ long do_msgrcv(int msqid, void __user *buf, size_t bufsz, long msgtyp, int msgfl out_unlock0: ipc_unlock_object(&msq->q_perm); + wake_up_q(&wake_q); out_unlock1: rcu_read_unlock(); if (IS_ERR(msg)) { diff --git a/ipc/namespace.c b/ipc/namespace.c index d87e6baa1323..0abdea496493 100644 --- a/ipc/namespace.c +++ b/ipc/namespace.c @@ -16,39 +16,61 @@ #include "util.h" +static struct ucounts *inc_ipc_namespaces(struct user_namespace *ns) +{ + return inc_ucount(ns, current_euid(), UCOUNT_IPC_NAMESPACES); +} + +static void dec_ipc_namespaces(struct ucounts *ucounts) +{ + dec_ucount(ucounts, UCOUNT_IPC_NAMESPACES); +} + static struct ipc_namespace *create_ipc_ns(struct user_namespace *user_ns, struct ipc_namespace *old_ns) { struct ipc_namespace *ns; + struct ucounts *ucounts; int err; + err = -ENOSPC; + ucounts = inc_ipc_namespaces(user_ns); + if (!ucounts) + goto fail; + + err = -ENOMEM; ns = kmalloc(sizeof(struct ipc_namespace), GFP_KERNEL); if (ns == NULL) - return ERR_PTR(-ENOMEM); + goto fail_dec; err = ns_alloc_inum(&ns->ns); - if (err) { - kfree(ns); - return ERR_PTR(err); - } + if (err) + goto fail_free; ns->ns.ops = &ipcns_operations; atomic_set(&ns->count, 1); ns->user_ns = get_user_ns(user_ns); + ns->ucounts = ucounts; err = mq_init_ns(ns); - if (err) { - put_user_ns(ns->user_ns); - ns_free_inum(&ns->ns); - kfree(ns); - return ERR_PTR(err); - } + if (err) + goto fail_put; sem_init_ns(ns); msg_init_ns(ns); shm_init_ns(ns); return ns; + +fail_put: + put_user_ns(ns->user_ns); + ns_free_inum(&ns->ns); +fail_free: + kfree(ns); +fail_dec: + dec_ipc_namespaces(ucounts); +fail: + return ERR_PTR(err); } struct ipc_namespace *copy_ipcs(unsigned long flags, @@ -96,6 +118,7 @@ static void free_ipc_ns(struct ipc_namespace *ns) msg_exit_ns(ns); shm_exit_ns(ns); + dec_ipc_namespaces(ns->ucounts); put_user_ns(ns->user_ns); ns_free_inum(&ns->ns); kfree(ns); @@ -165,10 +188,16 @@ static int ipcns_install(struct nsproxy *nsproxy, struct ns_common *new) return 0; } +static struct user_namespace *ipcns_owner(struct ns_common *ns) +{ + return to_ipc_ns(ns)->user_ns; +} + const struct proc_ns_operations ipcns_operations = { .name = "ipc", .type = CLONE_NEWIPC, .get = ipcns_get, .put = ipcns_put, .install = ipcns_install, + .owner = ipcns_owner, }; diff --git a/ipc/sem.c b/ipc/sem.c index 7c9d4f7683c0..10b94bc59d4a 100644 --- a/ipc/sem.c +++ b/ipc/sem.c @@ -162,14 +162,21 @@ static int sysvipc_sem_proc_show(struct seq_file *s, void *it); /* * Locking: + * a) global sem_lock() for read/write * sem_undo.id_next, * sem_array.complex_count, - * sem_array.pending{_alter,_cont}, - * sem_array.sem_undo: global sem_lock() for read/write - * sem_undo.proc_next: only "current" is allowed to read/write that field. + * sem_array.complex_mode + * sem_array.pending{_alter,_const}, + * sem_array.sem_undo * + * b) global or semaphore sem_lock() for read/write: * sem_array.sem_base[i].pending_{const,alter}: - * global or semaphore sem_lock() for read/write + * sem_array.complex_mode (for read) + * + * c) special: + * sem_undo_list.list_proc: + * * undo_list->lock for write + * * rcu for read */ #define sc_semmsl sem_ctls[0] @@ -260,31 +267,62 @@ static void sem_rcu_free(struct rcu_head *head) } /* - * Wait until all currently ongoing simple ops have completed. + * Enter the mode suitable for non-simple operations: * Caller must own sem_perm.lock. - * New simple ops cannot start, because simple ops first check - * that sem_perm.lock is free. - * that a) sem_perm.lock is free and b) complex_count is 0. */ -static void sem_wait_array(struct sem_array *sma) +static void complexmode_enter(struct sem_array *sma) { int i; struct sem *sem; - if (sma->complex_count) { - /* The thread that increased sma->complex_count waited on - * all sem->lock locks. Thus we don't need to wait again. - */ + if (sma->complex_mode) { + /* We are already in complex_mode. Nothing to do */ return; } + /* We need a full barrier after seting complex_mode: + * The write to complex_mode must be visible + * before we read the first sem->lock spinlock state. + */ + smp_store_mb(sma->complex_mode, true); + for (i = 0; i < sma->sem_nsems; i++) { sem = sma->sem_base + i; spin_unlock_wait(&sem->lock); } + /* + * spin_unlock_wait() is not a memory barriers, it is only a + * control barrier. The code must pair with spin_unlock(&sem->lock), + * thus just the control barrier is insufficient. + * + * smp_rmb() is sufficient, as writes cannot pass the control barrier. + */ + smp_rmb(); } /* + * Try to leave the mode that disallows simple operations: + * Caller must own sem_perm.lock. + */ +static void complexmode_tryleave(struct sem_array *sma) +{ + if (sma->complex_count) { + /* Complex ops are sleeping. + * We must stay in complex mode + */ + return; + } + /* + * Immediately after setting complex_mode to false, + * a simple op can start. Thus: all memory writes + * performed by the current operation must be visible + * before we set complex_mode to false. + */ + smp_store_release(&sma->complex_mode, false); +} + +#define SEM_GLOBAL_LOCK (-1) +/* * If the request contains only one semaphore operation, and there are * no complex transactions pending, lock only the semaphore involved. * Otherwise, lock the entire semaphore array, since we either have @@ -300,56 +338,42 @@ static inline int sem_lock(struct sem_array *sma, struct sembuf *sops, /* Complex operation - acquire a full lock */ ipc_lock_object(&sma->sem_perm); - /* And wait until all simple ops that are processed - * right now have dropped their locks. - */ - sem_wait_array(sma); - return -1; + /* Prevent parallel simple ops */ + complexmode_enter(sma); + return SEM_GLOBAL_LOCK; } /* * Only one semaphore affected - try to optimize locking. - * The rules are: - * - optimized locking is possible if no complex operation - * is either enqueued or processed right now. - * - The test for enqueued complex ops is simple: - * sma->complex_count != 0 - * - Testing for complex ops that are processed right now is - * a bit more difficult. Complex ops acquire the full lock - * and first wait that the running simple ops have completed. - * (see above) - * Thus: If we own a simple lock and the global lock is free - * and complex_count is now 0, then it will stay 0 and - * thus just locking sem->lock is sufficient. + * Optimized locking is possible if no complex operation + * is either enqueued or processed right now. + * + * Both facts are tracked by complex_mode. */ sem = sma->sem_base + sops->sem_num; - if (sma->complex_count == 0) { + /* + * Initial check for complex_mode. Just an optimization, + * no locking, no memory barrier. + */ + if (!sma->complex_mode) { /* * It appears that no complex operation is around. * Acquire the per-semaphore lock. */ spin_lock(&sem->lock); - /* Then check that the global lock is free */ - if (!spin_is_locked(&sma->sem_perm.lock)) { - /* - * We need a memory barrier with acquire semantics, - * otherwise we can race with another thread that does: - * complex_count++; - * spin_unlock(sem_perm.lock); - */ - smp_acquire__after_ctrl_dep(); + /* + * See 51d7d5205d33 + * ("powerpc: Add smp_mb() to arch_spin_is_locked()"): + * A full barrier is required: the write of sem->lock + * must be visible before the read is executed + */ + smp_mb(); - /* - * Now repeat the test of complex_count: - * It can't change anymore until we drop sem->lock. - * Thus: if is now 0, then it will stay 0. - */ - if (sma->complex_count == 0) { - /* fast path successful! */ - return sops->sem_num; - } + if (!smp_load_acquire(&sma->complex_mode)) { + /* fast path successful! */ + return sops->sem_num; } spin_unlock(&sem->lock); } @@ -369,15 +393,16 @@ static inline int sem_lock(struct sem_array *sma, struct sembuf *sops, /* Not a false alarm, thus complete the sequence for a * full lock. */ - sem_wait_array(sma); - return -1; + complexmode_enter(sma); + return SEM_GLOBAL_LOCK; } } static inline void sem_unlock(struct sem_array *sma, int locknum) { - if (locknum == -1) { + if (locknum == SEM_GLOBAL_LOCK) { unmerge_queues(sma); + complexmode_tryleave(sma); ipc_unlock_object(&sma->sem_perm); } else { struct sem *sem = sma->sem_base + locknum; @@ -529,6 +554,7 @@ static int newary(struct ipc_namespace *ns, struct ipc_params *params) } sma->complex_count = 0; + sma->complex_mode = true; /* dropped by sem_unlock below */ INIT_LIST_HEAD(&sma->pending_alter); INIT_LIST_HEAD(&sma->pending_const); INIT_LIST_HEAD(&sma->list_id); @@ -2079,6 +2105,8 @@ void exit_sem(struct task_struct *tsk) struct list_head tasks; int semid, i; + cond_resched(); + rcu_read_lock(); un = list_entry_rcu(ulp->list_proc.next, struct sem_undo, list_proc); @@ -2184,10 +2212,10 @@ static int sysvipc_sem_proc_show(struct seq_file *s, void *it) /* * The proc interface isn't aware of sem_lock(), it calls * ipc_lock_object() directly (in sysvipc_find_ipc). - * In order to stay compatible with sem_lock(), we must wait until - * all simple semop() calls have left their critical regions. + * In order to stay compatible with sem_lock(), we must + * enter / leave complex_mode. */ - sem_wait_array(sma); + complexmode_enter(sma); sem_otime = get_semotime(sma); @@ -2204,6 +2232,8 @@ static int sysvipc_sem_proc_show(struct seq_file *s, void *it) sem_otime, sma->sem_ctime); + complexmode_tryleave(sma); + return 0; } #endif |