diff options
Diffstat (limited to 'src/aio')
-rw-r--r-- | src/aio/aio.c | 108 | ||||
-rw-r--r-- | src/aio/aio_suspend.c | 5 | ||||
-rw-r--r-- | src/aio/lio_listio.c | 8 |
3 files changed, 86 insertions, 35 deletions
diff --git a/src/aio/aio.c b/src/aio/aio.c index 628e8420..d7e063bf 100644 --- a/src/aio/aio.c +++ b/src/aio/aio.c @@ -5,9 +5,16 @@ #include <errno.h> #include <unistd.h> #include <stdlib.h> +#include <sys/auxv.h> #include "syscall.h" #include "atomic.h" #include "pthread_impl.h" +#include "aio_impl.h" + +#define malloc __libc_malloc +#define calloc __libc_calloc +#define realloc __libc_realloc +#define free __libc_free /* The following is a threads-based implementation of AIO with minimal * dependence on implementation details. Most synchronization is @@ -40,13 +47,6 @@ * blocked permanently. */ -struct aio_args { - struct aiocb *cb; - int op; - int err; - sem_t sem; -}; - struct aio_thread { pthread_t td; struct aiocb *cb; @@ -64,21 +64,45 @@ struct aio_queue { struct aio_thread *head; }; +struct aio_args { + struct aiocb *cb; + struct aio_queue *q; + int op; + sem_t sem; +}; + static pthread_rwlock_t maplock = PTHREAD_RWLOCK_INITIALIZER; static struct aio_queue *****map; static volatile int aio_fd_cnt; volatile int __aio_fut; +static size_t io_thread_stack_size; + +#define MAX(a,b) ((a)>(b) ? (a) : (b)) + static struct aio_queue *__aio_get_queue(int fd, int need) { - if (fd < 0) return 0; + sigset_t allmask, origmask; + int masked = 0; + if (fd < 0) { + errno = EBADF; + return 0; + } int a=fd>>24; unsigned char b=fd>>16, c=fd>>8, d=fd; struct aio_queue *q = 0; pthread_rwlock_rdlock(&maplock); if ((!map || !map[a] || !map[a][b] || !map[a][b][c] || !(q=map[a][b][c][d])) && need) { pthread_rwlock_unlock(&maplock); + if (fcntl(fd, F_GETFD) < 0) return 0; + sigfillset(&allmask); + masked = 1; + pthread_sigmask(SIG_BLOCK, &allmask, &origmask); pthread_rwlock_wrlock(&maplock); + if (!io_thread_stack_size) { + unsigned long val = __getauxval(AT_MINSIGSTKSZ); + io_thread_stack_size = MAX(MINSIGSTKSZ+2048, val+512); + } if (!map) map = calloc(sizeof *map, (-1U/2+1)>>24); if (!map) goto out; if (!map[a]) map[a] = calloc(sizeof **map, 256); @@ -100,6 +124,7 @@ static struct aio_queue *__aio_get_queue(int fd, int need) if (q) pthread_mutex_lock(&q->lock); out: pthread_rwlock_unlock(&maplock); + if (masked) pthread_sigmask(SIG_SETMASK, &origmask, 0); return q; } @@ -195,12 +220,11 @@ static void *io_thread_func(void *ctx) size_t len = cb->aio_nbytes; off_t off = cb->aio_offset; - struct aio_queue *q = __aio_get_queue(fd, 1); + struct aio_queue *q = args->q; ssize_t ret; - args->err = q ? 0 : EAGAIN; + pthread_mutex_lock(&q->lock); sem_post(&args->sem); - if (!q) return 0; at.op = op; at.running = 1; @@ -212,7 +236,6 @@ static void *io_thread_func(void *ctx) at.prev = 0; if ((at.next = q->head)) at.next->prev = &at; q->head = &at; - q->ref++; if (!q->init) { int seekable = lseek(fd, 0, SEEK_CUR) >= 0; @@ -262,9 +285,19 @@ static int submit(struct aiocb *cb, int op) pthread_attr_t a; sigset_t allmask, origmask; pthread_t td; - struct aio_args args = { .cb = cb, .op = op }; + struct aio_queue *q = __aio_get_queue(cb->aio_fildes, 1); + struct aio_args args = { .cb = cb, .op = op, .q = q }; sem_init(&args.sem, 0, 0); + if (!q) { + if (errno != EBADF) errno = EAGAIN; + cb->__ret = -1; + cb->__err = errno; + return -1; + } + q->ref++; + pthread_mutex_unlock(&q->lock); + if (cb->aio_sigevent.sigev_notify == SIGEV_THREAD) { if (cb->aio_sigevent.sigev_notify_attributes) a = *cb->aio_sigevent.sigev_notify_attributes; @@ -272,7 +305,7 @@ static int submit(struct aiocb *cb, int op) pthread_attr_init(&a); } else { pthread_attr_init(&a); - pthread_attr_setstacksize(&a, PTHREAD_STACK_MIN); + pthread_attr_setstacksize(&a, io_thread_stack_size); pthread_attr_setguardsize(&a, 0); } pthread_attr_setdetachstate(&a, PTHREAD_CREATE_DETACHED); @@ -280,17 +313,15 @@ static int submit(struct aiocb *cb, int op) pthread_sigmask(SIG_BLOCK, &allmask, &origmask); cb->__err = EINPROGRESS; if (pthread_create(&td, &a, io_thread_func, &args)) { - errno = EAGAIN; - ret = -1; + pthread_mutex_lock(&q->lock); + __aio_unref_queue(q); + cb->__err = errno = EAGAIN; + cb->__ret = ret = -1; } pthread_sigmask(SIG_SETMASK, &origmask, 0); if (!ret) { while (sem_wait(&args.sem)); - if (args.err) { - errno = args.err; - ret = -1; - } } return ret; @@ -342,8 +373,9 @@ int aio_cancel(int fd, struct aiocb *cb) sigfillset(&allmask); pthread_sigmask(SIG_BLOCK, &allmask, &origmask); + errno = ENOENT; if (!(q = __aio_get_queue(fd, 0))) { - if (fcntl(fd, F_GETFD) < 0) ret = -1; + if (errno == EBADF) ret = -1; goto done; } @@ -370,9 +402,31 @@ int __aio_close(int fd) return fd; } -weak_alias(aio_cancel, aio_cancel64); -weak_alias(aio_error, aio_error64); -weak_alias(aio_fsync, aio_fsync64); -weak_alias(aio_read, aio_read64); -weak_alias(aio_write, aio_write64); -weak_alias(aio_return, aio_return64); +void __aio_atfork(int who) +{ + if (who<0) { + pthread_rwlock_rdlock(&maplock); + return; + } else if (!who) { + pthread_rwlock_unlock(&maplock); + return; + } + aio_fd_cnt = 0; + if (pthread_rwlock_tryrdlock(&maplock)) { + /* Obtaining lock may fail if _Fork was called nor via + * fork. In this case, no further aio is possible from + * child and we can just null out map so __aio_close + * does not attempt to do anything. */ + map = 0; + return; + } + if (map) for (int a=0; a<(-1U/2+1)>>24; a++) + if (map[a]) for (int b=0; b<256; b++) + if (map[a][b]) for (int c=0; c<256; c++) + if (map[a][b][c]) for (int d=0; d<256; d++) + map[a][b][c][d] = 0; + /* Re-initialize the rwlock rather than unlocking since there + * may have been more than one reference on it in the parent. + * We are not a lock holder anyway; the thread in the parent was. */ + pthread_rwlock_init(&maplock, 0); +} diff --git a/src/aio/aio_suspend.c b/src/aio/aio_suspend.c index 9b24b6af..1f0c9aaa 100644 --- a/src/aio/aio_suspend.c +++ b/src/aio/aio_suspend.c @@ -3,12 +3,13 @@ #include <time.h> #include "atomic.h" #include "pthread_impl.h" +#include "aio_impl.h" int aio_suspend(const struct aiocb *const cbs[], int cnt, const struct timespec *ts) { int i, tid = 0, ret, expect = 0; struct timespec at; - volatile int dummy_fut, *pfut; + volatile int dummy_fut = 0, *pfut; int nzcnt = 0; const struct aiocb *cb = 0; @@ -72,5 +73,3 @@ int aio_suspend(const struct aiocb *const cbs[], int cnt, const struct timespec } } } - -weak_alias(aio_suspend, aio_suspend64); diff --git a/src/aio/lio_listio.c b/src/aio/lio_listio.c index 7b6a03d3..a672812f 100644 --- a/src/aio/lio_listio.c +++ b/src/aio/lio_listio.c @@ -113,7 +113,7 @@ int lio_listio(int mode, struct aiocb *restrict const *restrict cbs, int cnt, st if (st) { pthread_attr_t a; - sigset_t set; + sigset_t set, set_old; pthread_t td; if (sev->sigev_notify == SIGEV_THREAD) { @@ -128,16 +128,14 @@ int lio_listio(int mode, struct aiocb *restrict const *restrict cbs, int cnt, st } pthread_attr_setdetachstate(&a, PTHREAD_CREATE_DETACHED); sigfillset(&set); - pthread_sigmask(SIG_BLOCK, &set, &set); + pthread_sigmask(SIG_BLOCK, &set, &set_old); if (pthread_create(&td, &a, wait_thread, st)) { free(st); errno = EAGAIN; return -1; } - pthread_sigmask(SIG_SETMASK, &set, 0); + pthread_sigmask(SIG_SETMASK, &set_old, 0); } return 0; } - -weak_alias(lio_listio, lio_listio64); |