summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorRich Felker <dalias@aerifal.cx>2014-08-17 22:09:47 -0400
committerRich Felker <dalias@aerifal.cx>2014-08-17 22:09:47 -0400
commit37195db8ec31300a87bc7ec09d2adcf299e9203d (patch)
treef3d2bbb1ced5199a695a03bfb4d42ba06b7bc8a7 /src
parent4220d298ef7a2226b14fe4b481f7f7699eab6e3f (diff)
downloadmusl-37195db8ec31300a87bc7ec09d2adcf299e9203d.tar.gz
redesign cond var implementation to fix multiple issues
the immediate issue that was reported by Jens Gustedt and needed to be fixed was corruption of the cv/mutex waiter states when switching to using a new mutex with the cv after all waiters were unblocked but before they finished returning from the wait function. self-synchronized destruction was also handled poorly and may have had race conditions. and the use of sequence numbers for waking waiters admitted a theoretical missed-wakeup if the sequence number wrapped through the full 32-bit space. the new implementation is largely documented in the comments in the source. the basic principle is to use linked lists initially attached to the cv object, but detachable on signal/broadcast, made up of nodes residing in automatic storage (stack) on the threads that are waiting. this eliminates the need for waiters to access the cv object after they are signaled, and allows us to limit wakeup to one waiter at a time during broadcasts even when futex requeue cannot be used. performance is also greatly improved, roughly double some tests. basically nothing is changed in the process-shared cond var case, where this implementation does not work, since processes do not have access to one another's local storage.
Diffstat (limited to 'src')
-rw-r--r--src/internal/pthread_impl.h9
-rw-r--r--src/thread/pthread_cond_broadcast.c39
-rw-r--r--src/thread/pthread_cond_destroy.c15
-rw-r--r--src/thread/pthread_cond_init.c2
-rw-r--r--src/thread/pthread_cond_signal.c5
-rw-r--r--src/thread/pthread_cond_timedwait.c236
6 files changed, 213 insertions, 93 deletions
diff --git a/src/internal/pthread_impl.h b/src/internal/pthread_impl.h
index 848ff668..2d090f8f 100644
--- a/src/internal/pthread_impl.h
+++ b/src/internal/pthread_impl.h
@@ -66,14 +66,13 @@ struct __timer {
#define _m_prev __u.__p[3]
#define _m_next __u.__p[4]
#define _m_count __u.__i[5]
-#define _c_mutex __u.__p[0]
+#define _c_shared __u.__p[0]
#define _c_seq __u.__i[2]
#define _c_waiters __u.__i[3]
#define _c_clock __u.__i[4]
-#define _c_lock __u.__i[5]
-#define _c_lockwait __u.__i[6]
-#define _c_waiters2 __u.__i[7]
-#define _c_destroy __u.__i[8]
+#define _c_lock __u.__i[8]
+#define _c_head __u.__p[1]
+#define _c_tail __u.__p[5]
#define _rw_lock __u.__i[0]
#define _rw_waiters __u.__i[1]
#define _rw_shared __u.__i[2]
diff --git a/src/thread/pthread_cond_broadcast.c b/src/thread/pthread_cond_broadcast.c
index 18e778f3..69f840fb 100644
--- a/src/thread/pthread_cond_broadcast.c
+++ b/src/thread/pthread_cond_broadcast.c
@@ -1,43 +1,12 @@
#include "pthread_impl.h"
+int __private_cond_signal(pthread_cond_t *, int);
+
int pthread_cond_broadcast(pthread_cond_t *c)
{
- pthread_mutex_t *m;
-
+ if (!c->_c_shared) return __private_cond_signal(c, -1);
if (!c->_c_waiters) return 0;
-
a_inc(&c->_c_seq);
-
- /* If cond var is process-shared, simply wake all waiters. */
- if (c->_c_mutex == (void *)-1) {
- __wake(&c->_c_seq, -1, 0);
- return 0;
- }
-
- /* Block waiters from returning so we can use the mutex. */
- while (a_swap(&c->_c_lock, 1))
- __wait(&c->_c_lock, &c->_c_lockwait, 1, 1);
- if (!c->_c_waiters)
- goto out;
- m = c->_c_mutex;
-
- /* Move waiter count to the mutex */
- a_fetch_add(&m->_m_waiters, c->_c_waiters2);
- c->_c_waiters2 = 0;
-
- /* Perform the futex requeue, waking one waiter unless we know
- * that the calling thread holds the mutex. */
- int wake_cnt = !(m->_m_type & 3)
- || (m->_m_lock&INT_MAX)!=__pthread_self()->tid;
- if (m->_m_type & 128) wake_cnt = INT_MAX;
- __syscall(SYS_futex, &c->_c_seq, FUTEX_REQUEUE | 128,
- wake_cnt, INT_MAX, &m->_m_lock) != -EINVAL ||
- __syscall(SYS_futex, &c->_c_seq, FUTEX_REQUEUE,
- wake_cnt, INT_MAX, &m->_m_lock);
-
-out:
- a_store(&c->_c_lock, 0);
- if (c->_c_lockwait) __wake(&c->_c_lock, 1, 1);
-
+ __wake(&c->_c_seq, -1, 0);
return 0;
}
diff --git a/src/thread/pthread_cond_destroy.c b/src/thread/pthread_cond_destroy.c
index a096c554..8c555160 100644
--- a/src/thread/pthread_cond_destroy.c
+++ b/src/thread/pthread_cond_destroy.c
@@ -2,12 +2,13 @@
int pthread_cond_destroy(pthread_cond_t *c)
{
- int priv = c->_c_mutex != (void *)-1;
- int cnt;
- c->_c_destroy = 1;
- if (c->_c_waiters)
- __wake(&c->_c_seq, -1, priv);
- while ((cnt = c->_c_waiters))
- __wait(&c->_c_waiters, 0, cnt, priv);
+ if (c->_c_shared && c->_c_waiters) {
+ int cnt;
+ a_or(&c->_c_waiters, 0x80000000);
+ a_inc(&c->_c_seq);
+ __wake(&c->_c_seq, -1, 0);
+ while ((cnt = c->_c_waiters) & 0x7fffffff)
+ __wait(&c->_c_waiters, 0, cnt, 0);
+ }
return 0;
}
diff --git a/src/thread/pthread_cond_init.c b/src/thread/pthread_cond_init.c
index 357ecd55..8c484ddc 100644
--- a/src/thread/pthread_cond_init.c
+++ b/src/thread/pthread_cond_init.c
@@ -5,7 +5,7 @@ int pthread_cond_init(pthread_cond_t *restrict c, const pthread_condattr_t *rest
*c = (pthread_cond_t){0};
if (a) {
c->_c_clock = a->__attr & 0x7fffffff;
- if (a->__attr>>31) c->_c_mutex = (void *)-1;
+ if (a->__attr>>31) c->_c_shared = (void *)-1;
}
return 0;
}
diff --git a/src/thread/pthread_cond_signal.c b/src/thread/pthread_cond_signal.c
index 5fd72f90..119c00ab 100644
--- a/src/thread/pthread_cond_signal.c
+++ b/src/thread/pthread_cond_signal.c
@@ -1,9 +1,12 @@
#include "pthread_impl.h"
+int __private_cond_signal(pthread_cond_t *, int);
+
int pthread_cond_signal(pthread_cond_t *c)
{
+ if (!c->_c_shared) return __private_cond_signal(c, 1);
if (!c->_c_waiters) return 0;
a_inc(&c->_c_seq);
- if (c->_c_waiters) __wake(&c->_c_seq, 1, c->_c_mutex!=(void*)-1);
+ __wake(&c->_c_seq, 1, 0);
return 0;
}
diff --git a/src/thread/pthread_cond_timedwait.c b/src/thread/pthread_cond_timedwait.c
index 44e89567..7aaba954 100644
--- a/src/thread/pthread_cond_timedwait.c
+++ b/src/thread/pthread_cond_timedwait.c
@@ -1,45 +1,145 @@
#include "pthread_impl.h"
-struct cm {
- pthread_cond_t *c;
- pthread_mutex_t *m;
+/*
+ * struct waiter
+ *
+ * Waiter objects have automatic storage on the waiting thread, and
+ * are used in building a linked list representing waiters currently
+ * waiting on the condition variable or a group of waiters woken
+ * together by a broadcast or signal; in the case of signal, this is a
+ * degenerate list of one member.
+ *
+ * Waiter lists attached to the condition variable itself are
+ * protected by the lock on the cv. Detached waiter lists are
+ * protected by the associated mutex. The hand-off between protections
+ * is handled by a "barrier" lock in each node, which disallows
+ * signaled waiters from making forward progress to the code that will
+ * access the list using the mutex until the list is in a consistent
+ * state and the cv lock as been released.
+ *
+ * Since process-shared cond var semantics do not necessarily allow
+ * one thread to see another's automatic storage (they may be in
+ * different processes), the waiter list is not used for the
+ * process-shared case, but the structure is still used to store data
+ * needed by the cancellation cleanup handler.
+ */
+
+struct waiter {
+ struct waiter *prev, *next;
+ int state, barrier, requeued, mutex_ret;
+ int *notify;
+ pthread_mutex_t *mutex;
+ pthread_cond_t *cond;
+ int shared;
};
-static void unwait(pthread_cond_t *c, pthread_mutex_t *m)
-{
- /* Removing a waiter is non-trivial if we could be using requeue
- * based broadcast signals, due to mutex access issues, etc. */
+/* Self-synchronized-destruction-safe lock functions */
- if (c->_c_mutex == (void *)-1) {
- a_dec(&c->_c_waiters);
- if (c->_c_destroy) __wake(&c->_c_waiters, 1, 0);
- return;
+static inline void lock(volatile int *l)
+{
+ if (a_cas(l, 0, 1)) {
+ a_cas(l, 1, 2);
+ do __wait(l, 0, 2, 1);
+ while (a_cas(l, 0, 2));
}
+}
- while (a_swap(&c->_c_lock, 1))
- __wait(&c->_c_lock, &c->_c_lockwait, 1, 1);
+static inline void unlock(volatile int *l)
+{
+ if (a_swap(l, 0)==2)
+ __wake(l, 1, 1);
+}
- if (c->_c_waiters2) c->_c_waiters2--;
- else a_dec(&m->_m_waiters);
+enum {
+ WAITING,
+ SIGNALED,
+ LEAVING,
+};
- a_store(&c->_c_lock, 0);
- if (c->_c_lockwait) __wake(&c->_c_lock, 1, 1);
+static void unwait(void *arg)
+{
+ struct waiter *node = arg, *p;
+
+ if (node->shared) {
+ pthread_cond_t *c = node->cond;
+ pthread_mutex_t *m = node->mutex;
+ if (a_fetch_add(&c->_c_waiters, -1) == -0x7fffffff)
+ __wake(&c->_c_waiters, 1, 0);
+ node->mutex_ret = pthread_mutex_lock(m);
+ return;
+ }
- a_dec(&c->_c_waiters);
- if (c->_c_destroy) __wake(&c->_c_waiters, 1, 1);
-}
+ int oldstate = a_cas(&node->state, WAITING, LEAVING);
+
+ if (oldstate == WAITING) {
+ /* Access to cv object is valid because this waiter was not
+ * yet signaled and a new signal/broadcast cannot return
+ * after seeing a LEAVING waiter without getting notified
+ * via the futex notify below. */
+
+ pthread_cond_t *c = node->cond;
+ lock(&c->_c_lock);
+
+ if (c->_c_head == node) c->_c_head = node->next;
+ else if (node->prev) node->prev->next = node->next;
+ if (c->_c_tail == node) c->_c_tail = node->prev;
+ else if (node->next) node->next->prev = node->prev;
+
+ unlock(&c->_c_lock);
+
+ if (node->notify) {
+ if (a_fetch_add(node->notify, -1)==1)
+ __wake(node->notify, 1, 1);
+ }
+ }
-static void cleanup(void *p)
-{
- struct cm *cm = p;
- unwait(cm->c, cm->m);
- pthread_mutex_lock(cm->m);
+ node->mutex_ret = pthread_mutex_lock(node->mutex);
+
+ if (oldstate == WAITING) return;
+
+ /* If the mutex can't be locked, we're in big trouble because
+ * it's all that protects access to the shared list state.
+ * In order to prevent catastrophic stack corruption from
+ * unsynchronized access, simply deadlock. */
+ if (node->mutex_ret && node->mutex_ret != EOWNERDEAD)
+ for (;;) lock(&(int){0});
+
+ /* Wait until control of the list has been handed over from
+ * the cv lock (signaling thread) to the mutex (waiters). */
+ lock(&node->barrier);
+
+ /* If this thread was requeued to the mutex, undo the extra
+ * waiter count that was added to the mutex. */
+ if (node->requeued) a_dec(&node->mutex->_m_waiters);
+
+ /* Find a thread to requeue to the mutex, starting from the
+ * end of the list (oldest waiters). */
+ for (p=node; p->next; p=p->next);
+ if (p==node) p=node->prev;
+ for (; p && p->requeued; p=p->prev);
+ if (p==node) p=node->prev;
+ if (p) {
+ p->requeued = 1;
+ a_inc(&node->mutex->_m_waiters);
+ /* The futex requeue command cannot requeue from
+ * private to shared, so for process-shared mutexes,
+ * simply wake the target. */
+ int wake = node->mutex->_m_type & 128;
+ __syscall(SYS_futex, &p->state, FUTEX_REQUEUE|128,
+ wake, 1, &node->mutex->_m_lock) != -EINVAL
+ || __syscall(SYS_futex, &p->state, FUTEX_REQUEUE,
+ 0, 1, &node->mutex->_m_lock);
+ }
+
+ /* Remove this thread from the list. */
+ if (node->next) node->next->prev = node->prev;
+ if (node->prev) node->prev->next = node->next;
}
int pthread_cond_timedwait(pthread_cond_t *restrict c, pthread_mutex_t *restrict m, const struct timespec *restrict ts)
{
- struct cm cm = { .c=c, .m=m };
- int r, e=0, seq;
+ struct waiter node = { .cond = c, .mutex = m };
+ int e, seq, *fut, clock = c->_c_clock;
if ((m->_m_type&15) && (m->_m_lock&INT_MAX) != __pthread_self()->tid)
return EPERM;
@@ -49,29 +149,77 @@ int pthread_cond_timedwait(pthread_cond_t *restrict c, pthread_mutex_t *restrict
pthread_testcancel();
- a_inc(&c->_c_waiters);
-
- if (c->_c_mutex != (void *)-1) {
- c->_c_mutex = m;
- while (a_swap(&c->_c_lock, 1))
- __wait(&c->_c_lock, &c->_c_lockwait, 1, 1);
- c->_c_waiters2++;
- a_store(&c->_c_lock, 0);
- if (c->_c_lockwait) __wake(&c->_c_lock, 1, 1);
+ if (c->_c_shared) {
+ node.shared = 1;
+ fut = &c->_c_seq;
+ seq = c->_c_seq;
+ a_inc(&c->_c_waiters);
+ } else {
+ lock(&c->_c_lock);
+
+ node.barrier = 1;
+ fut = &node.state;
+ seq = node.state = WAITING;
+ node.next = c->_c_head;
+ c->_c_head = &node;
+ if (!c->_c_tail) c->_c_tail = &node;
+ else node.next->prev = &node;
+
+ unlock(&c->_c_lock);
}
- seq = c->_c_seq;
-
pthread_mutex_unlock(m);
- do e = __timedwait(&c->_c_seq, seq, c->_c_clock, ts, cleanup, &cm,
- c->_c_mutex != (void *)-1);
- while (c->_c_seq == seq && (!e || e==EINTR));
+ do e = __timedwait(fut, seq, clock, ts, unwait, &node, !node.shared);
+ while (*fut==seq && (!e || e==EINTR));
if (e == EINTR) e = 0;
- unwait(c, m);
+ unwait(&node);
- if ((r=pthread_mutex_lock(m))) return r;
+ return node.mutex_ret ? node.mutex_ret : e;
+}
- return e;
+int __private_cond_signal(pthread_cond_t *c, int n)
+{
+ struct waiter *p, *q=0;
+ int ref = 0, cur;
+
+ lock(&c->_c_lock);
+ for (p=c->_c_tail; n && p; p=p->prev) {
+ /* The per-waiter-node barrier lock is held at this
+ * point, so while the following CAS may allow forward
+ * progress in the target thread, it doesn't allow
+ * access to the waiter list yet. Ideally the target
+ * does not run until the futex wake anyway. */
+ if (a_cas(&p->state, WAITING, SIGNALED) != WAITING) {
+ ref++;
+ p->notify = &ref;
+ } else {
+ n--;
+ if (!q) q=p;
+ }
+ }
+ /* Split the list, leaving any remainder on the cv. */
+ if (p) {
+ if (p->next) p->next->prev = 0;
+ p->next = 0;
+ } else {
+ c->_c_head = 0;
+ }
+ c->_c_tail = p;
+ unlock(&c->_c_lock);
+
+ /* Wait for any waiters in the LEAVING state to remove
+ * themselves from the list before returning or allowing
+ * signaled threads to proceed. */
+ while ((cur = ref)) __wait(&ref, 0, cur, 1);
+
+ /* Wake the first signaled thread and unlock the per-waiter
+ * barriers preventing their forward progress. */
+ for (p=q; p; p=q) {
+ q = p->prev;
+ if (!p->next) __wake(&p->state, 1, 1);
+ unlock(&p->barrier);
+ }
+ return 0;
}