From 658d9e98eec02f92e3cf263a1bb27beb3d395b2f Mon Sep 17 00:00:00 2001 From: Jeremy Kerr Date: Tue, 21 May 2013 11:52:00 +0800 Subject: lib/waiter: Add timeout waiters Signed-off-by: Jeremy Kerr --- lib/waiter/waiter.c | 175 ++++++++++++++++++++++++++++++++++++++++++---------- lib/waiter/waiter.h | 5 +- 2 files changed, 146 insertions(+), 34 deletions(-) (limited to 'lib') diff --git a/lib/waiter/waiter.c b/lib/waiter/waiter.c index 78ba045..513ab60 100644 --- a/lib/waiter/waiter.c +++ b/lib/waiter/waiter.c @@ -3,15 +3,21 @@ #include #include #include +#include #include #include "waiter.h" struct waiter { + enum { + WAITER_IO, + WAITER_TIME, + } type; struct waitset *set; int fd; int events; + struct timeval timeout; waiter_cb callback; void *arg; }; @@ -21,12 +27,17 @@ struct waitset { int n_waiters; bool waiters_changed; + struct timeval next_timeout; + /* These are kept consistent over each call to waiter_poll, as * set->waiters may be updated (by waiters' callbacks calling * waiter_register or waiter_remove) during iteration. */ struct pollfd *pollfds; - struct waiter **cur_waiters; - int cur_n_waiters; + int n_pollfds; + struct waiter **io_waiters; + int n_io_waiters; + struct waiter **time_waiters; + int n_time_waiters; }; struct waitset *waitset_create(void *ctx) @@ -40,8 +51,7 @@ void waitset_destroy(struct waitset *set) talloc_free(set); } -struct waiter *waiter_register(struct waitset *set, int fd, int events, - waiter_cb callback, void *arg) +static struct waiter *waiter_new(struct waitset *set) { struct waiter **waiters, *waiter; @@ -62,7 +72,15 @@ struct waiter *waiter_register(struct waitset *set, int fd, int events, set->n_waiters++; set->waiters[set->n_waiters - 1] = waiter; + return waiter; +} + +struct waiter *waiter_register_io(struct waitset *set, int fd, int events, + waiter_cb callback, void *arg) +{ + struct waiter *waiter = waiter_new(set); + waiter->type = WAITER_IO; waiter->set = set; waiter->fd = fd; waiter->events = events; @@ -72,6 +90,27 @@ struct waiter *waiter_register(struct waitset *set, int fd, int events, return waiter; } +struct waiter *waiter_register_timeout(struct waitset *set, int delay_ms, + waiter_cb callback, void *arg) +{ + struct waiter *waiter = waiter_new(set); + struct timeval now, delay; + + delay.tv_sec = delay_ms / 1000; + delay.tv_usec = 1000 * (delay_ms % 1000); + + gettimeofday(&now, NULL); + + timeradd(&now, &delay, &waiter->timeout); + + waiter->type = WAITER_TIME; + waiter->set = set; + waiter->callback = callback; + waiter->arg = arg; + + return waiter; +} + void waiter_remove(struct waiter *waiter) { struct waitset *set = waiter->set; @@ -94,47 +133,117 @@ void waiter_remove(struct waiter *waiter) talloc_free(waiter); } +static void update_waiters(struct waitset *set) +{ + int n_io, n_time, i_io, i_time, i; + + if (!set->waiters_changed) + return; + + n_io = n_time = 0; + + for (i = 0; i < set->n_waiters; i++) { + if (set->waiters[i]->type == WAITER_IO) + n_io++; + else if (set->waiters[i]->type == WAITER_TIME) + n_time++; + } + + /* realloc if counts have changed */ + if (set->n_io_waiters != n_io) { + set->io_waiters = talloc_realloc(set, set->io_waiters, + struct waiter *, n_io); + set->pollfds = talloc_realloc(set, set->pollfds, + struct pollfd, n_io); + set->n_io_waiters = n_io; + } + if (set->n_time_waiters != n_time) { + set->time_waiters = talloc_realloc(set, set->time_waiters, + struct waiter *, n_time); + set->n_time_waiters = n_time; + } + + /* IO waiters: copy to io_waiters, populate pollfds */ + for (i = i_io = 0; i < set->n_waiters; i++) { + struct waiter *waiter = set->waiters[i]; + + if (waiter->type != WAITER_IO) + continue; + + set->pollfds[i_io].fd = waiter->fd; + set->pollfds[i_io].events = waiter->events; + set->io_waiters[i_io] = waiter; + i_io++; + } + + /* time waiters: copy to time_waiters, calculate next expiry */ + timerclear(&set->next_timeout); + for (i = i_time = 0; i < set->n_waiters; i++) { + struct waiter *waiter = set->waiters[i]; + + if (waiter->type != WAITER_TIME) + continue; + + if (!timerisset(&set->next_timeout) || + timercmp(&waiter->timeout, + &set->next_timeout, <)) + set->next_timeout = waiter->timeout; + + set->time_waiters[i_time] = waiter; + i_time++; + } +} + int waiter_poll(struct waitset *set) { + struct timeval now, timeout; + int timeout_ms; int i, rc; /* If the waiters have been updated, we need to update our * consistent copy */ - if (set->waiters_changed) { - - /* We need to reallocate if the count has changes */ - if (set->cur_n_waiters != set->n_waiters) { - set->cur_waiters = talloc_realloc(set, set->cur_waiters, - struct waiter *, set->n_waiters); - set->pollfds = talloc_realloc(set, set->pollfds, - struct pollfd, set->n_waiters); - set->cur_n_waiters = set->n_waiters; - } - - /* Populate cur_waiters and pollfds from ->waiters data */ - for (i = 0; i < set->n_waiters; i++) { - set->pollfds[i].fd = set->waiters[i]->fd; - set->pollfds[i].events = set->waiters[i]->events; - set->pollfds[i].revents = 0; - set->cur_waiters[i] = set->waiters[i]; - } - - set->waiters_changed = false; + update_waiters(set); + + if (timerisset(&set->next_timeout)) { + gettimeofday(&now, NULL); + timersub(&set->next_timeout, &now, &timeout); + timeout_ms = timeout.tv_sec * 1000 + + timeout.tv_usec / 1000; + if (timeout_ms < 0) + timeout_ms = 0; + } else { + timeout_ms = -1; } - rc = poll(set->pollfds, set->cur_n_waiters, -1); - if (rc <= 0) + rc = poll(set->pollfds, set->n_io_waiters, timeout_ms); + + if (rc < 0) return rc; - for (i = 0; i < set->cur_n_waiters; i++) { - if (set->pollfds[i].revents) { - rc = set->cur_waiters[i]->callback( - set->cur_waiters[i]->arg); + for (i = 0; i < set->n_io_waiters; i++) { + struct waiter *waiter = set->io_waiters[i]; + + if (!set->pollfds[i].revents) + continue; + rc = waiter->callback(waiter->arg); + + if (rc) + waiter_remove(waiter); + } + + if (set->n_time_waiters > 0) + gettimeofday(&now, NULL); + + for (i = 0; i < set->n_time_waiters; i++) { + struct waiter *waiter = set->time_waiters[i]; + + if (timercmp(&waiter->timeout, &now, >)) + continue; + + waiter->callback(waiter->arg); - if (rc) - waiter_remove(set->cur_waiters[i]); - } + waiter_remove(waiter); } return 0; diff --git a/lib/waiter/waiter.h b/lib/waiter/waiter.h index ed7f6bb..58ea04c 100644 --- a/lib/waiter/waiter.h +++ b/lib/waiter/waiter.h @@ -16,7 +16,10 @@ typedef int (*waiter_cb)(void *); struct waitset *waitset_create(void *ctx); void waitset_destroy(struct waitset *waitset); -struct waiter *waiter_register(struct waitset *waitset, int fd, int events, +struct waiter *waiter_register_io(struct waitset *waitset, int fd, int events, + waiter_cb callback, void *arg); + +struct waiter *waiter_register_timeout(struct waitset *set, int delay_ms, waiter_cb callback, void *arg); void waiter_remove(struct waiter *waiter); -- cgit v1.2.1