diff options
| -rw-r--r-- | src/internal/pthread_impl.h | 9 | ||||
| -rw-r--r-- | src/thread/pthread_cond_broadcast.c | 39 | ||||
| -rw-r--r-- | src/thread/pthread_cond_destroy.c | 15 | ||||
| -rw-r--r-- | src/thread/pthread_cond_init.c | 2 | ||||
| -rw-r--r-- | src/thread/pthread_cond_signal.c | 5 | ||||
| -rw-r--r-- | src/thread/pthread_cond_timedwait.c | 236 | 
6 files changed, 213 insertions, 93 deletions
| diff --git a/src/internal/pthread_impl.h b/src/internal/pthread_impl.h index 848ff668..2d090f8f 100644 --- a/src/internal/pthread_impl.h +++ b/src/internal/pthread_impl.h @@ -66,14 +66,13 @@ struct __timer {  #define _m_prev __u.__p[3]  #define _m_next __u.__p[4]  #define _m_count __u.__i[5] -#define _c_mutex __u.__p[0] +#define _c_shared __u.__p[0]  #define _c_seq __u.__i[2]  #define _c_waiters __u.__i[3]  #define _c_clock __u.__i[4] -#define _c_lock __u.__i[5] -#define _c_lockwait __u.__i[6] -#define _c_waiters2 __u.__i[7] -#define _c_destroy __u.__i[8] +#define _c_lock __u.__i[8] +#define _c_head __u.__p[1] +#define _c_tail __u.__p[5]  #define _rw_lock __u.__i[0]  #define _rw_waiters __u.__i[1]  #define _rw_shared __u.__i[2] diff --git a/src/thread/pthread_cond_broadcast.c b/src/thread/pthread_cond_broadcast.c index 18e778f3..69f840fb 100644 --- a/src/thread/pthread_cond_broadcast.c +++ b/src/thread/pthread_cond_broadcast.c @@ -1,43 +1,12 @@  #include "pthread_impl.h" +int __private_cond_signal(pthread_cond_t *, int); +  int pthread_cond_broadcast(pthread_cond_t *c)  { -	pthread_mutex_t *m; - +	if (!c->_c_shared) return __private_cond_signal(c, -1);  	if (!c->_c_waiters) return 0; -  	a_inc(&c->_c_seq); - -	/* If cond var is process-shared, simply wake all waiters. */ -	if (c->_c_mutex == (void *)-1) { -		__wake(&c->_c_seq, -1, 0); -		return 0; -	} - -	/* Block waiters from returning so we can use the mutex. */ -	while (a_swap(&c->_c_lock, 1)) -		__wait(&c->_c_lock, &c->_c_lockwait, 1, 1); -	if (!c->_c_waiters) -		goto out; -	m = c->_c_mutex; - -	/* Move waiter count to the mutex */ -	a_fetch_add(&m->_m_waiters, c->_c_waiters2); -	c->_c_waiters2 = 0; - -	/* Perform the futex requeue, waking one waiter unless we know -	 * that the calling thread holds the mutex. */ -	int wake_cnt = !(m->_m_type & 3) -		|| (m->_m_lock&INT_MAX)!=__pthread_self()->tid; -	if (m->_m_type & 128) wake_cnt = INT_MAX; -	__syscall(SYS_futex, &c->_c_seq, FUTEX_REQUEUE | 128, -		wake_cnt, INT_MAX, &m->_m_lock) != -EINVAL || -	__syscall(SYS_futex, &c->_c_seq, FUTEX_REQUEUE, -		wake_cnt, INT_MAX, &m->_m_lock); - -out: -	a_store(&c->_c_lock, 0); -	if (c->_c_lockwait) __wake(&c->_c_lock, 1, 1); - +	__wake(&c->_c_seq, -1, 0);  	return 0;  } diff --git a/src/thread/pthread_cond_destroy.c b/src/thread/pthread_cond_destroy.c index a096c554..8c555160 100644 --- a/src/thread/pthread_cond_destroy.c +++ b/src/thread/pthread_cond_destroy.c @@ -2,12 +2,13 @@  int pthread_cond_destroy(pthread_cond_t *c)  { -	int priv = c->_c_mutex != (void *)-1; -	int cnt; -	c->_c_destroy = 1; -	if (c->_c_waiters) -		__wake(&c->_c_seq, -1, priv); -	while ((cnt = c->_c_waiters)) -		__wait(&c->_c_waiters, 0, cnt, priv); +	if (c->_c_shared && c->_c_waiters) { +		int cnt; +		a_or(&c->_c_waiters, 0x80000000); +		a_inc(&c->_c_seq); +		__wake(&c->_c_seq, -1, 0); +		while ((cnt = c->_c_waiters) & 0x7fffffff) +			__wait(&c->_c_waiters, 0, cnt, 0); +	}  	return 0;  } diff --git a/src/thread/pthread_cond_init.c b/src/thread/pthread_cond_init.c index 357ecd55..8c484ddc 100644 --- a/src/thread/pthread_cond_init.c +++ b/src/thread/pthread_cond_init.c @@ -5,7 +5,7 @@ int pthread_cond_init(pthread_cond_t *restrict c, const pthread_condattr_t *rest  	*c = (pthread_cond_t){0};  	if (a) {  		c->_c_clock = a->__attr & 0x7fffffff; -		if (a->__attr>>31) c->_c_mutex = (void *)-1; +		if (a->__attr>>31) c->_c_shared = (void *)-1;  	}  	return 0;  } diff --git a/src/thread/pthread_cond_signal.c b/src/thread/pthread_cond_signal.c index 5fd72f90..119c00ab 100644 --- a/src/thread/pthread_cond_signal.c +++ b/src/thread/pthread_cond_signal.c @@ -1,9 +1,12 @@  #include "pthread_impl.h" +int __private_cond_signal(pthread_cond_t *, int); +  int pthread_cond_signal(pthread_cond_t *c)  { +	if (!c->_c_shared) return __private_cond_signal(c, 1);  	if (!c->_c_waiters) return 0;  	a_inc(&c->_c_seq); -	if (c->_c_waiters) __wake(&c->_c_seq, 1, c->_c_mutex!=(void*)-1); +	__wake(&c->_c_seq, 1, 0);  	return 0;  } diff --git a/src/thread/pthread_cond_timedwait.c b/src/thread/pthread_cond_timedwait.c index 44e89567..7aaba954 100644 --- a/src/thread/pthread_cond_timedwait.c +++ b/src/thread/pthread_cond_timedwait.c @@ -1,45 +1,145 @@  #include "pthread_impl.h" -struct cm { -	pthread_cond_t *c; -	pthread_mutex_t *m; +/* + * struct waiter + * + * Waiter objects have automatic storage on the waiting thread, and + * are used in building a linked list representing waiters currently + * waiting on the condition variable or a group of waiters woken + * together by a broadcast or signal; in the case of signal, this is a + * degenerate list of one member. + * + * Waiter lists attached to the condition variable itself are + * protected by the lock on the cv. Detached waiter lists are + * protected by the associated mutex. The hand-off between protections + * is handled by a "barrier" lock in each node, which disallows + * signaled waiters from making forward progress to the code that will + * access the list using the mutex until the list is in a consistent + * state and the cv lock as been released. + * + * Since process-shared cond var semantics do not necessarily allow + * one thread to see another's automatic storage (they may be in + * different processes), the waiter list is not used for the + * process-shared case, but the structure is still used to store data + * needed by the cancellation cleanup handler. + */ + +struct waiter { +	struct waiter *prev, *next; +	int state, barrier, requeued, mutex_ret; +	int *notify; +	pthread_mutex_t *mutex; +	pthread_cond_t *cond; +	int shared;  }; -static void unwait(pthread_cond_t *c, pthread_mutex_t *m) -{ -	/* Removing a waiter is non-trivial if we could be using requeue -	 * based broadcast signals, due to mutex access issues, etc. */ +/* Self-synchronized-destruction-safe lock functions */ -	if (c->_c_mutex == (void *)-1) { -		a_dec(&c->_c_waiters); -		if (c->_c_destroy) __wake(&c->_c_waiters, 1, 0); -		return; +static inline void lock(volatile int *l) +{ +	if (a_cas(l, 0, 1)) { +		a_cas(l, 1, 2); +		do __wait(l, 0, 2, 1); +		while (a_cas(l, 0, 2));  	} +} -	while (a_swap(&c->_c_lock, 1)) -		__wait(&c->_c_lock, &c->_c_lockwait, 1, 1); +static inline void unlock(volatile int *l) +{ +	if (a_swap(l, 0)==2) +		__wake(l, 1, 1); +} -	if (c->_c_waiters2) c->_c_waiters2--; -	else a_dec(&m->_m_waiters); +enum { +	WAITING, +	SIGNALED, +	LEAVING, +}; -	a_store(&c->_c_lock, 0); -	if (c->_c_lockwait) __wake(&c->_c_lock, 1, 1); +static void unwait(void *arg) +{ +	struct waiter *node = arg, *p; + +	if (node->shared) { +		pthread_cond_t *c = node->cond; +		pthread_mutex_t *m = node->mutex; +		if (a_fetch_add(&c->_c_waiters, -1) == -0x7fffffff) +			__wake(&c->_c_waiters, 1, 0); +		node->mutex_ret = pthread_mutex_lock(m); +		return; +	} -	a_dec(&c->_c_waiters); -	if (c->_c_destroy) __wake(&c->_c_waiters, 1, 1); -} +	int oldstate = a_cas(&node->state, WAITING, LEAVING); + +	if (oldstate == WAITING) { +		/* Access to cv object is valid because this waiter was not +		 * yet signaled and a new signal/broadcast cannot return +		 * after seeing a LEAVING waiter without getting notified +		 * via the futex notify below. */ + +		pthread_cond_t *c = node->cond; +		lock(&c->_c_lock); +		 +		if (c->_c_head == node) c->_c_head = node->next; +		else if (node->prev) node->prev->next = node->next; +		if (c->_c_tail == node) c->_c_tail = node->prev; +		else if (node->next) node->next->prev = node->prev; +		 +		unlock(&c->_c_lock); + +		if (node->notify) { +			if (a_fetch_add(node->notify, -1)==1) +				__wake(node->notify, 1, 1); +		} +	} -static void cleanup(void *p) -{ -	struct cm *cm = p; -	unwait(cm->c, cm->m); -	pthread_mutex_lock(cm->m); +	node->mutex_ret = pthread_mutex_lock(node->mutex); + +	if (oldstate == WAITING) return; + +	/* If the mutex can't be locked, we're in big trouble because +	 * it's all that protects access to the shared list state. +	 * In order to prevent catastrophic stack corruption from +	 * unsynchronized access, simply deadlock. */ +	if (node->mutex_ret && node->mutex_ret != EOWNERDEAD) +		for (;;) lock(&(int){0}); + +	/* Wait until control of the list has been handed over from +	 * the cv lock (signaling thread) to the mutex (waiters). */ +	lock(&node->barrier); + +	/* If this thread was requeued to the mutex, undo the extra +	 * waiter count that was added to the mutex. */ +	if (node->requeued) a_dec(&node->mutex->_m_waiters); + +	/* Find a thread to requeue to the mutex, starting from the +	 * end of the list (oldest waiters). */ +	for (p=node; p->next; p=p->next); +	if (p==node) p=node->prev; +	for (; p && p->requeued; p=p->prev); +	if (p==node) p=node->prev; +	if (p) { +		p->requeued = 1; +		a_inc(&node->mutex->_m_waiters); +		/* The futex requeue command cannot requeue from +		 * private to shared, so for process-shared mutexes, +		 * simply wake the target. */ +		int wake = node->mutex->_m_type & 128; +		__syscall(SYS_futex, &p->state, FUTEX_REQUEUE|128, +			wake, 1, &node->mutex->_m_lock) != -EINVAL +		|| __syscall(SYS_futex, &p->state, FUTEX_REQUEUE, +			0, 1, &node->mutex->_m_lock); +	} + +	/* Remove this thread from the list. */ +	if (node->next) node->next->prev = node->prev; +	if (node->prev) node->prev->next = node->next;  }  int pthread_cond_timedwait(pthread_cond_t *restrict c, pthread_mutex_t *restrict m, const struct timespec *restrict ts)  { -	struct cm cm = { .c=c, .m=m }; -	int r, e=0, seq; +	struct waiter node = { .cond = c, .mutex = m }; +	int e, seq, *fut, clock = c->_c_clock;  	if ((m->_m_type&15) && (m->_m_lock&INT_MAX) != __pthread_self()->tid)  		return EPERM; @@ -49,29 +149,77 @@ int pthread_cond_timedwait(pthread_cond_t *restrict c, pthread_mutex_t *restrict  	pthread_testcancel(); -	a_inc(&c->_c_waiters); - -	if (c->_c_mutex != (void *)-1) { -		c->_c_mutex = m; -		while (a_swap(&c->_c_lock, 1)) -			__wait(&c->_c_lock, &c->_c_lockwait, 1, 1); -		c->_c_waiters2++; -		a_store(&c->_c_lock, 0); -		if (c->_c_lockwait) __wake(&c->_c_lock, 1, 1); +	if (c->_c_shared) { +		node.shared = 1; +		fut = &c->_c_seq; +		seq = c->_c_seq; +		a_inc(&c->_c_waiters); +	} else { +		lock(&c->_c_lock); + +		node.barrier = 1; +		fut = &node.state; +		seq = node.state = WAITING; +		node.next = c->_c_head; +		c->_c_head = &node; +		if (!c->_c_tail) c->_c_tail = &node; +		else node.next->prev = &node; + +		unlock(&c->_c_lock);  	} -	seq = c->_c_seq; -  	pthread_mutex_unlock(m); -	do e = __timedwait(&c->_c_seq, seq, c->_c_clock, ts, cleanup, &cm, -		c->_c_mutex != (void *)-1); -	while (c->_c_seq == seq && (!e || e==EINTR)); +	do e = __timedwait(fut, seq, clock, ts, unwait, &node, !node.shared); +	while (*fut==seq && (!e || e==EINTR));  	if (e == EINTR) e = 0; -	unwait(c, m); +	unwait(&node); -	if ((r=pthread_mutex_lock(m))) return r; +	return node.mutex_ret ? node.mutex_ret : e; +} -	return e; +int __private_cond_signal(pthread_cond_t *c, int n) +{ +	struct waiter *p, *q=0; +	int ref = 0, cur; + +	lock(&c->_c_lock); +	for (p=c->_c_tail; n && p; p=p->prev) { +		/* The per-waiter-node barrier lock is held at this +		 * point, so while the following CAS may allow forward +		 * progress in the target thread, it doesn't allow +		 * access to the waiter list yet. Ideally the target +		 * does not run until the futex wake anyway. */ +		if (a_cas(&p->state, WAITING, SIGNALED) != WAITING) { +			ref++; +			p->notify = &ref; +		} else { +			n--; +			if (!q) q=p; +		} +	} +	/* Split the list, leaving any remainder on the cv. */ +	if (p) { +		if (p->next) p->next->prev = 0; +		p->next = 0; +	} else { +		c->_c_head = 0; +	} +	c->_c_tail = p; +	unlock(&c->_c_lock); + +	/* Wait for any waiters in the LEAVING state to remove +	 * themselves from the list before returning or allowing +	 * signaled threads to proceed. */ +	while ((cur = ref)) __wait(&ref, 0, cur, 1); + +	/* Wake the first signaled thread and unlock the per-waiter +	 * barriers preventing their forward progress. */ +	for (p=q; p; p=q) { +		q = p->prev; +		if (!p->next) __wake(&p->state, 1, 1); +		unlock(&p->barrier); +	} +	return 0;  } | 
