summaryrefslogtreecommitdiffstats
path: root/ipc
diff options
context:
space:
mode:
Diffstat (limited to 'ipc')
-rw-r--r--ipc/mqueue.c54
-rw-r--r--ipc/msg.c50
-rw-r--r--ipc/sem.c4
-rw-r--r--ipc/shm.c12
-rw-r--r--ipc/util.c28
-rw-r--r--ipc/util.h2
6 files changed, 95 insertions, 55 deletions
diff --git a/ipc/mqueue.c b/ipc/mqueue.c
index 3aaea7ffd077..a24ba9fe5bb8 100644
--- a/ipc/mqueue.c
+++ b/ipc/mqueue.c
@@ -47,8 +47,7 @@
#define RECV 1
#define STATE_NONE 0
-#define STATE_PENDING 1
-#define STATE_READY 2
+#define STATE_READY 1
struct posix_msg_tree_node {
struct rb_node rb_node;
@@ -571,15 +570,12 @@ static int wq_sleep(struct mqueue_inode_info *info, int sr,
wq_add(info, sr, ewp);
for (;;) {
- set_current_state(TASK_INTERRUPTIBLE);
+ __set_current_state(TASK_INTERRUPTIBLE);
spin_unlock(&info->lock);
time = schedule_hrtimeout_range_clock(timeout, 0,
HRTIMER_MODE_ABS, CLOCK_REALTIME);
- while (ewp->state == STATE_PENDING)
- cpu_relax();
-
if (ewp->state == STATE_READY) {
retval = 0;
goto out;
@@ -907,11 +903,15 @@ out_name:
* list of waiting receivers. A sender checks that list before adding the new
* message into the message array. If there is a waiting receiver, then it
* bypasses the message array and directly hands the message over to the
- * receiver.
- * The receiver accepts the message and returns without grabbing the queue
- * spinlock. Therefore an intermediate STATE_PENDING state and memory barriers
- * are necessary. The same algorithm is used for sysv semaphores, see
- * ipc/sem.c for more details.
+ * receiver. The receiver accepts the message and returns without grabbing the
+ * queue spinlock:
+ *
+ * - Set pointer to message.
+ * - Queue the receiver task for later wakeup (without the info->lock).
+ * - Update its state to STATE_READY. Now the receiver can continue.
+ * - Wake up the process after the lock is dropped. Should the process wake up
+ * before this wakeup (due to a timeout or a signal) it will either see
+ * STATE_READY and continue or acquire the lock to check the state again.
*
* The same algorithm is used for senders.
*/
@@ -919,21 +919,29 @@ out_name:
/* pipelined_send() - send a message directly to the task waiting in
* sys_mq_timedreceive() (without inserting message into a queue).
*/
-static inline void pipelined_send(struct mqueue_inode_info *info,
+static inline void pipelined_send(struct wake_q_head *wake_q,
+ struct mqueue_inode_info *info,
struct msg_msg *message,
struct ext_wait_queue *receiver)
{
receiver->msg = message;
list_del(&receiver->list);
- receiver->state = STATE_PENDING;
- wake_up_process(receiver->task);
- smp_wmb();
+ wake_q_add(wake_q, receiver->task);
+ /*
+ * Rely on the implicit cmpxchg barrier from wake_q_add such
+ * that we can ensure that updating receiver->state is the last
+ * write operation: As once set, the receiver can continue,
+ * and if we don't have the reference count from the wake_q,
+ * yet, at that point we can later have a use-after-free
+ * condition and bogus wakeup.
+ */
receiver->state = STATE_READY;
}
/* pipelined_receive() - if there is task waiting in sys_mq_timedsend()
* gets its message and put to the queue (we have one free place for sure). */
-static inline void pipelined_receive(struct mqueue_inode_info *info)
+static inline void pipelined_receive(struct wake_q_head *wake_q,
+ struct mqueue_inode_info *info)
{
struct ext_wait_queue *sender = wq_get_first_waiter(info, SEND);
@@ -944,10 +952,9 @@ static inline void pipelined_receive(struct mqueue_inode_info *info)
}
if (msg_insert(sender->msg, info))
return;
+
list_del(&sender->list);
- sender->state = STATE_PENDING;
- wake_up_process(sender->task);
- smp_wmb();
+ wake_q_add(wake_q, sender->task);
sender->state = STATE_READY;
}
@@ -965,6 +972,7 @@ SYSCALL_DEFINE5(mq_timedsend, mqd_t, mqdes, const char __user *, u_msg_ptr,
struct timespec ts;
struct posix_msg_tree_node *new_leaf = NULL;
int ret = 0;
+ WAKE_Q(wake_q);
if (u_abs_timeout) {
int res = prepare_timeout(u_abs_timeout, &expires, &ts);
@@ -1049,7 +1057,7 @@ SYSCALL_DEFINE5(mq_timedsend, mqd_t, mqdes, const char __user *, u_msg_ptr,
} else {
receiver = wq_get_first_waiter(info, RECV);
if (receiver) {
- pipelined_send(info, msg_ptr, receiver);
+ pipelined_send(&wake_q, info, msg_ptr, receiver);
} else {
/* adds message to the queue */
ret = msg_insert(msg_ptr, info);
@@ -1062,6 +1070,7 @@ SYSCALL_DEFINE5(mq_timedsend, mqd_t, mqdes, const char __user *, u_msg_ptr,
}
out_unlock:
spin_unlock(&info->lock);
+ wake_up_q(&wake_q);
out_free:
if (ret)
free_msg(msg_ptr);
@@ -1149,14 +1158,17 @@ SYSCALL_DEFINE5(mq_timedreceive, mqd_t, mqdes, char __user *, u_msg_ptr,
msg_ptr = wait.msg;
}
} else {
+ WAKE_Q(wake_q);
+
msg_ptr = msg_get(info);
inode->i_atime = inode->i_mtime = inode->i_ctime =
CURRENT_TIME;
/* There is now free space in queue. */
- pipelined_receive(info);
+ pipelined_receive(&wake_q, info);
spin_unlock(&info->lock);
+ wake_up_q(&wake_q);
ret = 0;
}
if (ret == 0) {
diff --git a/ipc/msg.c b/ipc/msg.c
index 2b6fdbb9e0e9..66c4f567eb73 100644
--- a/ipc/msg.c
+++ b/ipc/msg.c
@@ -76,7 +76,7 @@ struct msg_sender {
static inline struct msg_queue *msq_obtain_object(struct ipc_namespace *ns, int id)
{
- struct kern_ipc_perm *ipcp = ipc_obtain_object(&msg_ids(ns), id);
+ struct kern_ipc_perm *ipcp = ipc_obtain_object_idr(&msg_ids(ns), id);
if (IS_ERR(ipcp))
return ERR_CAST(ipcp);
@@ -196,7 +196,7 @@ static void expunge_all(struct msg_queue *msq, int res)
* or dealing with -EAGAIN cases. See lockless receive part 1
* and 2 in do_msgrcv().
*/
- smp_mb();
+ smp_wmb(); /* barrier (B) */
msr->r_msg = ERR_PTR(res);
}
}
@@ -580,7 +580,8 @@ static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg)
/* initialize pipelined send ordering */
msr->r_msg = NULL;
wake_up_process(msr->r_tsk);
- smp_mb(); /* see barrier comment below */
+ /* barrier (B) see barrier comment below */
+ smp_wmb();
msr->r_msg = ERR_PTR(-E2BIG);
} else {
msr->r_msg = NULL;
@@ -589,11 +590,12 @@ static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg)
wake_up_process(msr->r_tsk);
/*
* Ensure that the wakeup is visible before
- * setting r_msg, as the receiving end depends
- * on it. See lockless receive part 1 and 2 in
- * do_msgrcv().
+ * setting r_msg, as the receiving can otherwise
+ * exit - once r_msg is set, the receiver can
+ * continue. See lockless receive part 1 and 2
+ * in do_msgrcv(). Barrier (B).
*/
- smp_mb();
+ smp_wmb();
msr->r_msg = msg;
return 1;
@@ -932,12 +934,38 @@ long do_msgrcv(int msqid, void __user *buf, size_t bufsz, long msgtyp, int msgfl
/* Lockless receive, part 2:
* Wait until pipelined_send or expunge_all are outside of
* wake_up_process(). There is a race with exit(), see
- * ipc/mqueue.c for the details.
+ * ipc/mqueue.c for the details. The correct serialization
+ * ensures that a receiver cannot continue without the wakeup
+ * being visibible _before_ setting r_msg:
+ *
+ * CPU 0 CPU 1
+ * <loop receiver>
+ * smp_rmb(); (A) <-- pair -. <waker thread>
+ * <load ->r_msg> | msr->r_msg = NULL;
+ * | wake_up_process();
+ * <continue> `------> smp_wmb(); (B)
+ * msr->r_msg = msg;
+ *
+ * Where (A) orders the message value read and where (B) orders
+ * the write to the r_msg -- done in both pipelined_send and
+ * expunge_all.
*/
- msg = (struct msg_msg *)msr_d.r_msg;
- while (msg == NULL) {
- cpu_relax();
+ for (;;) {
+ /*
+ * Pairs with writer barrier in pipelined_send
+ * or expunge_all.
+ */
+ smp_rmb(); /* barrier (A) */
msg = (struct msg_msg *)msr_d.r_msg;
+ if (msg)
+ break;
+
+ /*
+ * The cpu_relax() call is a compiler barrier
+ * which forces everything in this loop to be
+ * re-loaded.
+ */
+ cpu_relax();
}
/* Lockless receive, part 3:
diff --git a/ipc/sem.c b/ipc/sem.c
index d1a6edd17eba..bc3d530cb23e 100644
--- a/ipc/sem.c
+++ b/ipc/sem.c
@@ -391,7 +391,7 @@ static inline struct sem_array *sem_obtain_lock(struct ipc_namespace *ns,
struct kern_ipc_perm *ipcp;
struct sem_array *sma;
- ipcp = ipc_obtain_object(&sem_ids(ns), id);
+ ipcp = ipc_obtain_object_idr(&sem_ids(ns), id);
if (IS_ERR(ipcp))
return ERR_CAST(ipcp);
@@ -410,7 +410,7 @@ static inline struct sem_array *sem_obtain_lock(struct ipc_namespace *ns,
static inline struct sem_array *sem_obtain_object(struct ipc_namespace *ns, int id)
{
- struct kern_ipc_perm *ipcp = ipc_obtain_object(&sem_ids(ns), id);
+ struct kern_ipc_perm *ipcp = ipc_obtain_object_idr(&sem_ids(ns), id);
if (IS_ERR(ipcp))
return ERR_CAST(ipcp);
diff --git a/ipc/shm.c b/ipc/shm.c
index 6d767071c367..06e5cf2fe019 100644
--- a/ipc/shm.c
+++ b/ipc/shm.c
@@ -129,7 +129,7 @@ void __init shm_init(void)
static inline struct shmid_kernel *shm_obtain_object(struct ipc_namespace *ns, int id)
{
- struct kern_ipc_perm *ipcp = ipc_obtain_object(&shm_ids(ns), id);
+ struct kern_ipc_perm *ipcp = ipc_obtain_object_idr(&shm_ids(ns), id);
if (IS_ERR(ipcp))
return ERR_CAST(ipcp);
@@ -155,8 +155,11 @@ static inline struct shmid_kernel *shm_lock(struct ipc_namespace *ns, int id)
{
struct kern_ipc_perm *ipcp = ipc_lock(&shm_ids(ns), id);
- if (IS_ERR(ipcp))
- return (struct shmid_kernel *)ipcp;
+ /*
+ * We raced in the idr lookup or with shm_destroy(). Either way, the
+ * ID is busted.
+ */
+ BUG_ON(IS_ERR(ipcp));
return container_of(ipcp, struct shmid_kernel, shm_perm);
}
@@ -191,7 +194,6 @@ static void shm_open(struct vm_area_struct *vma)
struct shmid_kernel *shp;
shp = shm_lock(sfd->ns, sfd->id);
- BUG_ON(IS_ERR(shp));
shp->shm_atim = get_seconds();
shp->shm_lprid = task_tgid_vnr(current);
shp->shm_nattch++;
@@ -258,7 +260,6 @@ static void shm_close(struct vm_area_struct *vma)
down_write(&shm_ids(ns).rwsem);
/* remove from the list of attaches of the shm segment */
shp = shm_lock(ns, sfd->id);
- BUG_ON(IS_ERR(shp));
shp->shm_lprid = task_tgid_vnr(current);
shp->shm_dtim = get_seconds();
shp->shm_nattch--;
@@ -1191,7 +1192,6 @@ out_fput:
out_nattch:
down_write(&shm_ids(ns).rwsem);
shp = shm_lock(ns, shmid);
- BUG_ON(IS_ERR(shp));
shp->shm_nattch--;
if (shm_may_destroy(ns, shp))
shm_destroy(ns, shp);
diff --git a/ipc/util.c b/ipc/util.c
index ff3323ef8d8b..be4230020a1f 100644
--- a/ipc/util.c
+++ b/ipc/util.c
@@ -467,10 +467,7 @@ void ipc_rcu_free(struct rcu_head *head)
{
struct ipc_rcu *p = container_of(head, struct ipc_rcu, rcu);
- if (is_vmalloc_addr(p))
- vfree(p);
- else
- kfree(p);
+ kvfree(p);
}
/**
@@ -558,7 +555,7 @@ void ipc64_perm_to_ipc_perm(struct ipc64_perm *in, struct ipc_perm *out)
* Call inside the RCU critical section.
* The ipc object is *not* locked on exit.
*/
-struct kern_ipc_perm *ipc_obtain_object(struct ipc_ids *ids, int id)
+struct kern_ipc_perm *ipc_obtain_object_idr(struct ipc_ids *ids, int id)
{
struct kern_ipc_perm *out;
int lid = ipcid_to_idx(id);
@@ -584,21 +581,24 @@ struct kern_ipc_perm *ipc_lock(struct ipc_ids *ids, int id)
struct kern_ipc_perm *out;
rcu_read_lock();
- out = ipc_obtain_object(ids, id);
+ out = ipc_obtain_object_idr(ids, id);
if (IS_ERR(out))
- goto err1;
+ goto err;
spin_lock(&out->lock);
- /* ipc_rmid() may have already freed the ID while ipc_lock
- * was spinning: here verify that the structure is still valid
+ /*
+ * ipc_rmid() may have already freed the ID while ipc_lock()
+ * was spinning: here verify that the structure is still valid.
+ * Upon races with RMID, return -EIDRM, thus indicating that
+ * the ID points to a removed identifier.
*/
if (ipc_valid_object(out))
return out;
spin_unlock(&out->lock);
- out = ERR_PTR(-EINVAL);
-err1:
+ out = ERR_PTR(-EIDRM);
+err:
rcu_read_unlock();
return out;
}
@@ -608,7 +608,7 @@ err1:
* @ids: ipc identifier set
* @id: ipc id to look for
*
- * Similar to ipc_obtain_object() but also checks
+ * Similar to ipc_obtain_object_idr() but also checks
* the ipc object reference counter.
*
* Call inside the RCU critical section.
@@ -616,13 +616,13 @@ err1:
*/
struct kern_ipc_perm *ipc_obtain_object_check(struct ipc_ids *ids, int id)
{
- struct kern_ipc_perm *out = ipc_obtain_object(ids, id);
+ struct kern_ipc_perm *out = ipc_obtain_object_idr(ids, id);
if (IS_ERR(out))
goto out;
if (ipc_checkid(out, id))
- return ERR_PTR(-EIDRM);
+ return ERR_PTR(-EINVAL);
out:
return out;
}
diff --git a/ipc/util.h b/ipc/util.h
index 1a5a0fcd099c..3a8a5a0eca62 100644
--- a/ipc/util.h
+++ b/ipc/util.h
@@ -132,7 +132,7 @@ void ipc_rcu_putref(void *ptr, void (*func)(struct rcu_head *head));
void ipc_rcu_free(struct rcu_head *head);
struct kern_ipc_perm *ipc_lock(struct ipc_ids *, int);
-struct kern_ipc_perm *ipc_obtain_object(struct ipc_ids *ids, int id);
+struct kern_ipc_perm *ipc_obtain_object_idr(struct ipc_ids *ids, int id);
void kernel_to_ipc64_perm(struct kern_ipc_perm *in, struct ipc64_perm *out);
void ipc64_perm_to_ipc_perm(struct ipc64_perm *in, struct ipc_perm *out);
OpenPOWER on IntegriCloud