diff options
| -rw-r--r-- | include/aio.h | 60 | ||||
| -rw-r--r-- | src/aio/aio_cancel.c | 16 | ||||
| -rw-r--r-- | src/aio/aio_error.c | 6 | ||||
| -rw-r--r-- | src/aio/aio_fsync.c | 9 | ||||
| -rw-r--r-- | src/aio/aio_readwrite.c | 104 | ||||
| -rw-r--r-- | src/aio/aio_return.c | 6 | ||||
| -rw-r--r-- | src/aio/aio_suspend.c | 57 | ||||
| -rw-r--r-- | src/aio/lio_listio.c | 140 | 
8 files changed, 398 insertions, 0 deletions
| diff --git a/include/aio.h b/include/aio.h new file mode 100644 index 00000000..893c0e9e --- /dev/null +++ b/include/aio.h @@ -0,0 +1,60 @@ +#ifndef _AIO_H +#define _AIO_H + +#ifdef __cplusplus +extern "C" { +#endif + +#if defined(_POSIX_SOURCE) || defined(_POSIX_C_SOURCE) \ + || defined(_XOPEN_SOURCE) || defined(_GNU_SOURCE) + +#include <signal.h> +#include <time.h> + +#define __NEED_ssize_t +#define __NEED_off_t + +#include <bits/alltypes.h> + +struct aiocb { +	int aio_filedes, aio_lio_opcode, aio_reqprio; +	volatile void *aio_buf; +	size_t aio_nbytes; +	struct sigevent aio_sigevent; +	void *__td; +	int __lock[2]; +	int __err; +	ssize_t __ret; +	off_t aio_offset; +	void *__next, *__prev; +	char __dummy4[32-2*sizeof(void *)]; +}; + +#define AIO_CANCELED 0 +#define AIO_NOTCANCELED 1 +#define AIO_ALLDONE 2 + +#define LIO_READ 0 +#define LIO_WRITE 1 +#define LIO_NOP 2 + +#define LIO_WAIT 0 +#define LIO_NOWAIT 1 + +int aio_read(struct aiocb *); +int aio_write(struct aiocb *); +int aio_error(struct aiocb *); +ssize_t aio_return(struct aiocb *); +int aio_cancel(int, struct aiocb *); +int aio_suspend(struct aiocb *const [], int, const struct timespec *); +int aio_fsync(int, struct aiocb *); + +int lio_listio(int, struct aiocb *const [], int, struct sigevent *); + +#endif + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/src/aio/aio_cancel.c b/src/aio/aio_cancel.c new file mode 100644 index 00000000..5a753b1f --- /dev/null +++ b/src/aio/aio_cancel.c @@ -0,0 +1,16 @@ +#include <aio.h> +#include <pthread.h> +#include <errno.h> + +int aio_cancel(int fd, struct aiocb *cb) +{ +	if (!cb) { +		/* FIXME: for correctness, we should return AIO_ALLDONE +		 * if there are no outstanding aio operations on this +		 * file descriptor, but that would require making aio +		 * much slower, and seems to have little advantage since +		 * we don't support cancellation anyway. */ +		return AIO_NOTCANCELED; +	} +	return cb->__err==EINPROGRESS ? AIO_NOTCANCELED : AIO_ALLDONE; +} diff --git a/src/aio/aio_error.c b/src/aio/aio_error.c new file mode 100644 index 00000000..169a9a30 --- /dev/null +++ b/src/aio/aio_error.c @@ -0,0 +1,6 @@ +#include <aio.h> + +int aio_error(struct aiocb *cb) +{ +	return cb->__err; +} diff --git a/src/aio/aio_fsync.c b/src/aio/aio_fsync.c new file mode 100644 index 00000000..0ac6ea87 --- /dev/null +++ b/src/aio/aio_fsync.c @@ -0,0 +1,9 @@ +#include <aio.h> +#include <errno.h> + +int aio_fsync(int op, struct aiocb *cb) +{ +	/* FIXME: unsupported */ +	errno = EINVAL; +	return -1; +} diff --git a/src/aio/aio_readwrite.c b/src/aio/aio_readwrite.c new file mode 100644 index 00000000..27168f25 --- /dev/null +++ b/src/aio/aio_readwrite.c @@ -0,0 +1,104 @@ +#include <aio.h> +#include <fcntl.h> +#include "pthread_impl.h" + +static void dummy(void) +{ +} + +weak_alias(dummy, __aio_wake); + +static void notify_signal(struct sigevent *sev) +{ +	siginfo_t si = { +		.si_signo = sev->sigev_signo, +		.si_value = sev->sigev_value, +		.si_code = SI_ASYNCIO, +		.si_pid = __pthread_self()->pid, +		.si_uid = getuid() +	}; +	__syscall(SYS_rt_sigqueueinfo, si.si_pid, si.si_signo, &si); +} + +static void *io_thread(void *p) +{ +	struct aiocb *cb = p; +	int fd = cb->aio_filedes; +	void *buf = (void *)cb->aio_buf; +	size_t len = cb->aio_nbytes; +	off_t off = cb->aio_offset; +	int op = cb->aio_lio_opcode; +	struct sigevent sev = cb->aio_sigevent; +	ssize_t ret; + +	if (op == LIO_WRITE) { +		if (  (fcntl(fd, F_GETFL) & O_APPEND) +		    ||((ret = pwrite(fd, buf, len, off))<0 && errno==ESPIPE) ) +			ret = write(fd, buf, len); +	} else if (op == LIO_READ) { +		if ( (ret = pread(fd, buf, len, off))<0 && errno==ESPIPE ) +			ret = read(fd, buf, len); +	} else { +		ret = 0; +	} +	cb->__ret = ret; + +	if (ret < 0) a_store(&cb->__err, errno); +	else a_store(&cb->__err, 0); + +	__aio_wake(); + +	switch (cb->aio_sigevent.sigev_notify) { +	case SIGEV_SIGNAL: +		notify_signal(&sev); +		break; +	case SIGEV_THREAD: +		sev.sigev_notify_function(sev.sigev_value); +		break; +	} + +	return 0; +} + +static int new_req(struct aiocb *cb) +{ +	int ret = 0; +	pthread_attr_t a; +	sigset_t set; +	pthread_t td; + +	if (cb->aio_sigevent.sigev_notify == SIGEV_THREAD) { +		if (cb->aio_sigevent.sigev_notify_attributes) +			a = *cb->aio_sigevent.sigev_notify_attributes; +		else +			pthread_attr_init(&a); +	} else { +		pthread_attr_init(&a); +		pthread_attr_setstacksize(&a, PAGE_SIZE); +		pthread_attr_setguardsize(&a, 0); +	} +	pthread_attr_setdetachstate(&a, PTHREAD_CREATE_DETACHED); +	sigfillset(&set); +	pthread_sigmask(SIG_BLOCK, &set, &set); +	cb->__err = EINPROGRESS; +	if (pthread_create(&td, &a, io_thread, cb)) { +		errno = EAGAIN; +		ret = -1; +	} +	pthread_sigmask(SIG_SETMASK, &set, 0); +	cb->__td = td; + +	return ret; +} + +ssize_t aio_read(struct aiocb *cb) +{ +	cb->aio_lio_opcode = LIO_READ; +	return new_req(cb); +} + +ssize_t aio_write(struct aiocb *cb) +{ +	cb->aio_lio_opcode = LIO_WRITE; +	return new_req(cb); +} diff --git a/src/aio/aio_return.c b/src/aio/aio_return.c new file mode 100644 index 00000000..df10bdbe --- /dev/null +++ b/src/aio/aio_return.c @@ -0,0 +1,6 @@ +#include <aio.h> + +ssize_t aio_return(struct aiocb *cb) +{ +	return cb->__ret; +} diff --git a/src/aio/aio_suspend.c b/src/aio/aio_suspend.c new file mode 100644 index 00000000..cb2539e9 --- /dev/null +++ b/src/aio/aio_suspend.c @@ -0,0 +1,57 @@ +#include <aio.h> +#include <errno.h> +#include "pthread_impl.h" + +/* Due to the requirement that aio_suspend be async-signal-safe, we cannot + * use any locks, wait queues, etc. that would make it more efficient. The + * only obviously-correct algorithm is to generate a wakeup every time any + * aio operation finishes and have aio_suspend re-evaluate the completion + * status of each aiocb it was waiting on. */ + +static volatile int seq; + +void __aio_wake(void) +{ +	a_inc(&seq); +	__wake(&seq, -1, 1); +} + +int aio_suspend(struct aiocb *const cbs[], int cnt, const struct timespec *ts) +{ +	int i, last, first=1, ret=0; +	struct timespec at; + +	if (cnt<0) { +		errno = EINVAL; +		return -1; +	} + +	for (;;) { +		last = seq; + +		for (i=0; i<cnt; i++) { +			if (cbs[i] && cbs[i]->__err != EINPROGRESS) +				return 0; +		} + +		if (first && ts) { +			clock_gettime(CLOCK_MONOTONIC, &at); +			at.tv_sec += ts->tv_sec; +			if ((at.tv_nsec += ts->tv_nsec) >= 1000000000) { +				at.tv_nsec -= 1000000000; +				at.tv_sec++; +			} +			first = 0; +		} + +		ret = __timedwait(&seq, last, CLOCK_MONOTONIC, +			ts ? &at : 0, 0, 0, 1); + +		if (ret == ETIMEDOUT) ret = EAGAIN; + +		if (ret) { +			errno = ret; +			return -1; +		} +	} +} diff --git a/src/aio/lio_listio.c b/src/aio/lio_listio.c new file mode 100644 index 00000000..8865029a --- /dev/null +++ b/src/aio/lio_listio.c @@ -0,0 +1,140 @@ +#include <aio.h> +#include <errno.h> +#include "pthread_impl.h" + +struct lio_state { +	struct sigevent *sev; +	int cnt; +	struct aiocb *cbs[]; +}; + +static int lio_wait(struct lio_state *st) +{ +	int i, err, got_err; +	int cnt = st->cnt; +	struct aiocb **cbs = st->cbs; + +	for (;;) { +		for (i=0; i<cnt; i++) { +			if (!cbs[i]) continue; +			err = aio_error(cbs[i]); +			if (err==EINPROGRESS) +				break; +			if (err) got_err=1; +			cbs[i] = 0; +		} +		if (i==cnt) { +			if (got_err) { +				errno = EIO; +				return -1; +			} +			return 0; +		} +		if (aio_suspend(cbs, cnt, 0)) +			return -1; +	} +} + +static void notify_signal(struct sigevent *sev) +{ +	siginfo_t si = { +		.si_signo = sev->sigev_signo, +		.si_value = sev->sigev_value, +		.si_code = SI_ASYNCIO, +		.si_pid = __pthread_self()->pid, +		.si_uid = getuid() +	}; +	__syscall(SYS_rt_sigqueueinfo, si.si_pid, si.si_signo, &si); +} + +static void *wait_thread(void *p) +{ +	struct lio_state *st = p; +	struct sigevent *sev = st->sev; +	lio_wait(st); +	free(st); +	switch (sev->sigev_notify) { +	case SIGEV_SIGNAL: +		notify_signal(sev); +		break; +	case SIGEV_THREAD: +		sev->sigev_notify_function(sev->sigev_value); +		break; +	} +	return 0; +} + +int lio_listio(int mode, struct aiocb *const cbs[], int cnt, struct sigevent *sev) +{ +	int i, ret; +	struct lio_state *st=0; + +	if (cnt < 0) { +		errno = EINVAL; +		return -1; +	} + +	if (mode == LIO_WAIT || (sev && sev->sigev_notify != SIGEV_NONE)) { +		if (!(st = malloc(sizeof *st + cnt*sizeof *cbs))) { +			errno = EAGAIN; +			return -1; +		} +		st->cnt = cnt; +		st->sev = sev; +		memcpy(st->cbs, cbs, cnt*sizeof *cbs); +	} + +	for (i=0; i<cnt; i++) { +		if (!cbs[i]) continue; +		switch (cbs[i]->aio_lio_opcode) { +		case LIO_READ: +			ret = aio_read(cbs[i]); +			break; +		case LIO_WRITE: +			ret = aio_write(cbs[i]); +			break; +		default: +			continue; +		} +		if (ret) { +			free(st); +			errno = EAGAIN; +			return -1; +		} +	} + +	if (mode == LIO_WAIT) { +		ret = lio_wait(st); +		free(st); +		return 0; +	} + +	if (st) { +		pthread_attr_t a; +		sigset_t set; +		pthread_t td; + +		if (sev->sigev_notify == SIGEV_THREAD) { +			if (sev->sigev_notify_attributes) +				a = *sev->sigev_notify_attributes; +			else +				pthread_attr_init(&a); +		} else { +			pthread_attr_init(&a); +			pthread_attr_setstacksize(&a, PAGE_SIZE); +			pthread_attr_setguardsize(&a, 0); +		} +		pthread_attr_setdetachstate(&a, PTHREAD_CREATE_DETACHED); +		sigfillset(&set); +		pthread_sigmask(SIG_BLOCK, &set, &set); +		if (pthread_create(&td, &a, wait_thread, st)) { +			free(st); +			errno = EAGAIN; +			return -1; +		} +		pthread_sigmask(SIG_SETMASK, &set, 0); +	} + +	return 0; +} + | 
