[Libevent-users] [PATCH] relative timer with EV_PERSIST support + move event_del logic into event_active + regression tests

Christopher Layne clayne at anodized.com
Tue Nov 13 21:15:00 EST 2007


1. Make EV_PERSIST reschedule timeouts automatically.
2. New function: timeout_schedule: (nothing really new within it, just
   modular wrapping of timeout rescheduling).
3. New macro: evutil_timercpy: tv_dst = tv_src in one line type deal;
3. Regression tests for persistent timeouts, include read/write, signals,
   and timers.
4. Another regression test for signal handler restores (no problem, just
   added another one).

So what this means is that if you do the following:

	event_set(ev, fd, EV_READ | EV_PERSIST, read_cb, obj);
	event_add(ev, &timeout);

read_cb() will be called whenever a read event happens, and it's timeout
as passed to event_add() will be reset to the original value you passed.
You do not have to call event_add() within the handler.

	event_set(ev, -1, EV_TIMEOUT | EV_PERSIST, timer_cb, obj);
	event_add(ev, &cycle);

timer_cb() will be called when timeout (as passed via cycle) expires. It
will then reschedule itself with it's original timeout, e.g. periodic timer.
You do not have to call event_add() within the handler.

For the event_del() changes, it's just moving event_del() into event_active(),
when an event occurs. There is a feature request on sourceforge for this,
and this couples nicely with the EV_PERSIST change. It also allows us to
rexamine the logic tests within the various event dispatchers themselves, as
event_active() will only delete the same event once.

-cl

$ test/regress
Testing Priorities 1: OK
Testing Priorities 2: OK
Testing Priorities 3: OK
Testing Evbuffer: OK
Testing evbuffer_find 1: OK
Testing evbuffer_find 2: OK
Testing evbuffer_find 3: OK
Bufferevent: OK
Free active base: OK
Testing HTTP Server Event Base: OK
Testing HTTP Header filtering: OK
Testing Basic HTTP Server: OK
Testing Request Connection Pipeline : OK
Testing Request Connection Pipeline (persistent): OK
Testing Connection Close Detection: OK
Testing HTTP POST Request: OK
Testing Bad HTTP Request: OK
Testing HTTP Server with high port: OK
Testing HTTP Dispatcher: OK
Testing Basic RPC Support: OK
Testing Good RPC Post: OK
Testing RPC Client: OK
Testing RPC (Queued) Client: OK
Testing RPC Client Timeout: OK
DNS server support: OK
Simple DNS resolve: type: 1, count: 1, ttl: 300: 152.160.49.201 OK
IPv6 DNS resolve: type: 3, count: 1, ttl: 922: 2610:a0:c779:b::d1ad:35b4 OK
Simple read: OK
Simple write: OK
Multiple read/write: OK
Persist read/write: OK
Combined read/write: OK
Simple timeout: OK
Persistent timeout: OK
Persistent read/write timeout: OK
Persistent signal timeout: OK
Simple signal: OK
Immediate signal: OK
Loop exit: OK
Multiple events for same fd: OK
Want read only once: OK
Testing Tagging:
                encoded 0x00000af0 with 2 bytes
                encoded 0x00001000 with 3 bytes
                encoded 0x00000001 with 1 bytes
                encoded 0xdeadbeef with 5 bytes
                encoded 0x00000000 with 1 bytes
                encoded 0x00bef000 with 4 bytes
        evtag_int_test: OK
        evtag_fuzz: OK
OK
Testing RPC: (1.9 us/add) OK
Signal dealloc: OK
Signal pipeloss: OK
Signal switchbase: OK
Signal handler restore: OK
Signal handler spread restore: OK
Signal handler assert: OK

$ make verify
cd test && make verify
make[1]: Entering directory `/home/clayne/project/libevent.build/test'
Running tests:
KQUEUE
Skipping test
DEVPOLL
Skipping test
POLL
 test-eof: OKAY
 test-weof: OKAY
 test-time: OKAY
 regress: type: 1, count: 1, ttl: 300: 152.160.49.201 type: 3, count: 1, ttl: 238: 2610:a0:c779:b::d1ad:35b4 (1.9 us/add) OKAY
SELECT
 test-eof: OKAY
 test-weof: OKAY
 test-time: OKAY
 regress: type: 1, count: 1, ttl: 300: 152.160.49.201 type: 3, count: 1, ttl: 1800: 2610:a0:c779:b::d1ad:35b4 (1.9 us/add) OKAY
EPOLL
 test-eof: OKAY
 test-weof: OKAY
 test-time: OKAY
 regress: type: 1, count: 1, ttl: 300: 152.160.49.201 type: 3, count: 1, ttl: 1800: 2610:a0:c779:b::d1ad:35b4 (1.8 us/add) OKAY
EVPORT
Skipping test

-------------- next part --------------
Index: event.c
===================================================================
--- event.c	(revision 526)
+++ event.c	(working copy)
@@ -125,4 +125,6 @@ static int	timeout_next(struct event_bas
 static void	timeout_process(struct event_base *);
 static void	timeout_correct(struct event_base *, struct timeval *);
+static int	timeout_schedule(struct event_base *, struct event *,
+				 struct timeval *);
 
 static void
@@ -615,4 +617,5 @@ event_add(struct event *ev, struct timev
 	const struct eventop *evsel = base->evsel;
 	void *evbase = base->evbase;
+	min_heap_t *mh = &base->timeheap;
 
 	event_debug((
@@ -627,12 +630,4 @@ event_add(struct event *ev, struct timev
 
 	if (tv != NULL) {
-		struct timeval now;
-
-		if (ev->ev_flags & EVLIST_TIMEOUT)
-			event_queue_remove(base, ev, EVLIST_TIMEOUT);
-		else if (min_heap_reserve(&base->timeheap,
-			1 + min_heap_size(&base->timeheap)) == -1)
-		    return (-1);  /* ENOMEM == errno */
-
 		/* Check if it is active due to a timeout.  Rescheduling
 		 * this timeout before the callback can be executed
@@ -651,12 +646,14 @@ event_add(struct event *ev, struct timev
 		}
 
-		gettime(&now);
-		evutil_timeradd(&now, tv, &ev->ev_timeout);
-
-		event_debug((
-			 "event_add: timeout in %d seconds, call %p",
-			 tv->tv_sec, ev->ev_callback));
-
-		event_queue_insert(base, ev, EVLIST_TIMEOUT);
+		/*
+		 * If it's a persistent event, the timeout is considered to
+		 * be relative to the last time the event was active. To do
+		 * this, store the relative tv in addition to the absolute one.
+		 */
+		if (ev->ev_events & EV_PERSIST)
+			evutil_timercpy(&ev->ev_timeout_rel, tv);
+		if (min_heap_reserve(mh, 1 + min_heap_size(mh)) == -1)
+			return (-1);  /* ENOMEM == errno */
+		timeout_schedule(base, ev, tv);
 	}
 
@@ -727,4 +724,9 @@ event_active(struct event *ev, int res, 
 		return;
 	}
+	/* Do not delete persistent events. Reschedule timeouts if persist. */
+	if (~ev->ev_events & EV_PERSIST)
+		event_del(ev);
+	else if (ev->ev_flags & EVLIST_TIMEOUT)
+		timeout_schedule(ev->ev_base, ev, &ev->ev_timeout_rel);
 
 	ev->ev_res = res;
@@ -734,4 +736,23 @@ event_active(struct event *ev, int res, 
 }
 
+int
+timeout_schedule(struct event_base *base, struct event *ev, struct timeval *tv)
+{
+	struct timeval now;
+
+	/* This could probably make use of min_heap_shift_down() somehow */
+	if (ev->ev_flags & EVLIST_TIMEOUT)
+		event_queue_remove(base, ev, EVLIST_TIMEOUT);
+
+	gettime(&now);
+	evutil_timeradd(&now, tv, &ev->ev_timeout);
+	event_queue_insert(base, ev, EVLIST_TIMEOUT);
+
+	event_debug(("%s: %p: timeout in %d seconds, call %p",
+		__func__, ev, tv->tv_sec, ev->ev_callback));
+
+	return (0);
+}
+
 static int
 timeout_next(struct event_base *base, struct timeval **tv_p)
@@ -818,9 +839,5 @@ timeout_process(struct event_base *base)
 			break;
 
-		/* delete this event from the I/O queues */
-		event_del(ev);
-
-		event_debug(("timeout_process: call %p",
-			 ev->ev_callback));
+		event_debug(("timeout_process: call %p", ev->ev_callback));
 		event_active(ev, EV_TIMEOUT, 1);
 	}
Index: event.h
===================================================================
--- event.h	(revision 524)
+++ event.h	(working copy)
@@ -232,4 +232,5 @@ struct event {
 	int ev_res;		/* result passed to event callback */
 	int ev_flags;
+	struct timeval ev_timeout_rel; /* relative timeout */
 };
 
Index: evutil.h
===================================================================
--- evutil.h	(revision 524)
+++ evutil.h	(working copy)
@@ -110,4 +110,10 @@ int evutil_make_socket_nonblocking(int s
 #endif
 
+#define evutil_timercpy(dtv, stv)			\
+	do {						\
+		(dtv)->tv_sec = (stv)->tv_sec;		\
+		(dtv)->tv_usec = (stv)->tv_usec;	\
+	} while(0)
+
 #ifdef _EVENT_HAVE_STDINT_H
 #include <stdint.h>
Index: signal.c
===================================================================
--- signal.c	(revision 528)
+++ signal.c	(working copy)
@@ -260,6 +260,4 @@ evsignal_process(struct event_base *base
 		ncalls = base->sig.evsigcaught[EVENT_SIGNAL(ev)];
 		if (ncalls) {
-			if (!(ev->ev_events & EV_PERSIST))
-				event_del(ev);
 			event_active(ev, EV_SIGNAL, ncalls);
 			base->sig.evsigcaught[EVENT_SIGNAL(ev)] = 0;
Index: epoll.c
===================================================================
--- epoll.c	(revision 524)
+++ epoll.c	(working copy)
@@ -230,10 +230,4 @@ epoll_dispatch(struct event_base *base, 
 			continue;
 
-		if (evread != NULL && !(evread->ev_events & EV_PERSIST))
-			event_del(evread);
-		if (evwrite != NULL && evwrite != evread &&
-			!(evwrite->ev_events & EV_PERSIST))
-			event_del(evwrite);
-
 		if (evread != NULL)
 			event_active(evread, EV_READ, 1);
Index: select.c
===================================================================
--- select.c	(revision 524)
+++ select.c	(working copy)
@@ -195,14 +195,8 @@ select_dispatch(struct event_base *base,
 			res |= EV_WRITE;
 		}
-		if (r_ev && (res & r_ev->ev_events)) {
-			if (!(r_ev->ev_events & EV_PERSIST))
-				event_del(r_ev);
+		if (r_ev && (res & r_ev->ev_events))
 			event_active(r_ev, res & r_ev->ev_events, 1);
-		}
-		if (w_ev && w_ev != r_ev && (res & w_ev->ev_events)) {
-			if (!(w_ev->ev_events & EV_PERSIST))
-				event_del(w_ev);
+		if (w_ev && w_ev != r_ev && (res & w_ev->ev_events))
 			event_active(w_ev, res & w_ev->ev_events, 1);
-		}
 	}
 	check_selectop(sop);
Index: devpoll.c
===================================================================
--- devpoll.c	(revision 524)
+++ devpoll.c	(working copy)
@@ -272,10 +272,4 @@ devpoll_dispatch(struct event_base *base
 			continue;
 
-		if (evread != NULL && !(evread->ev_events & EV_PERSIST))
-			event_del(evread);
-		if (evwrite != NULL && evwrite != evread &&
-		    !(evwrite->ev_events & EV_PERSIST))
-			event_del(evwrite);
-
 		if (evread != NULL)
 			event_active(evread, EV_READ, 1);
Index: poll.c
===================================================================
--- poll.c	(revision 524)
+++ poll.c	(working copy)
@@ -198,14 +198,8 @@ poll_dispatch(struct event_base *base, v
 			continue;
 
-		if (r_ev && (res & r_ev->ev_events)) {
-			if (!(r_ev->ev_events & EV_PERSIST))
-				event_del(r_ev);
+		if (r_ev && (res & r_ev->ev_events))
 			event_active(r_ev, res & r_ev->ev_events, 1);
-		}
-		if (w_ev && w_ev != r_ev && (res & w_ev->ev_events)) {
-			if (!(w_ev->ev_events & EV_PERSIST))
-				event_del(w_ev);
+		if (w_ev && w_ev != r_ev && (res & w_ev->ev_events))
 			event_active(w_ev, res & w_ev->ev_events, 1);
-		}
 	}
 
Index: kqueue.c
===================================================================
--- kqueue.c	(revision 524)
+++ kqueue.c	(working copy)
@@ -275,7 +275,4 @@ kq_dispatch(struct event_base *base, voi
 			continue;
 
-		if (!(ev->ev_events & EV_PERSIST))
-			event_del(ev);
-
 		event_active(ev, which,
 		    ev->ev_events & EV_SIGNAL ? events[i].data : 1);
Index: test/regress.c
===================================================================
--- test/regress.c	(revision 524)
+++ test/regress.c	(working copy)
@@ -70,4 +70,5 @@ static int woff;
 static int roff;
 static int usepersist;
+static int persist_cnt;
 static struct timeval tset;
 static struct timeval tcalled;
@@ -173,4 +174,100 @@ multiple_read_cb(int fd, short event, vo
 
 void
+persist_write_cb(int fd, short event, void *arg)
+{
+	struct event *ev = arg;
+	ssize_t n;
+
+	test_ok = 0;
+	if (event & EV_TIMEOUT && persist_cnt) {
+		/* Write until done - at which point the read will timeout */
+		if ((n = write(fd, "a", 1)) != -1)
+			return;
+	}
+	event_del(ev);
+
+	return;
+}
+
+void
+persist_read_cb(int fd, short event, void *arg)
+{
+	struct event *ev = arg;
+	char c;
+	ssize_t n;
+
+	test_ok = 0;
+	if (event & EV_TIMEOUT) {
+		/* If timeout has not fired early, we're good. */
+		if (persist_cnt == 0)
+			test_ok = 1;
+	}
+	if (event & EV_READ) {
+		/* With EV_PERSIST, an active event resets the timeout */
+		persist_cnt--;
+		if ((n = read(fd, &c, sizeof c)) != -1)
+			return;
+	}
+	event_del(ev);
+
+	return;
+}
+
+void
+persist_timeout_cb(int fd, short event, void *arg)
+{
+	struct event *ev = arg;
+	(void) fd;
+
+	test_ok = 0;
+	if (event & EV_TIMEOUT) {
+		/* Don't succeed until this cb has repeated persist_cnt times */
+		if (persist_cnt--)
+			return;
+		test_ok = 1;
+	}
+	event_del(ev);
+
+	return;
+}
+
+void
+persist_signal_cb(int fd, short event, void *arg)
+{
+	struct event *ev = arg;
+	(void) fd;
+
+	test_ok = 0;
+	if (event & EV_TIMEOUT && persist_cnt) {
+		raise(SIGUSR1);
+		return;
+	}
+	event_del(ev);
+
+	return;
+}
+
+void
+persist_signal_handler_cb(int fd, short event, void *arg)
+{
+	struct event *ev = arg;
+	(void) fd;
+
+	test_ok = 0;
+	if (event & EV_TIMEOUT) {
+		/* If timeout has not fired early, we're good. */
+		if (persist_cnt == 0)
+			test_ok = 1;
+	}
+	if (event & EV_SIGNAL) {
+		if (persist_cnt--)
+			return;
+	}
+	event_del(ev);
+
+	return;
+}
+		
+void
 timeout_cb(int fd, short event, void *arg)
 {
@@ -446,4 +543,90 @@ test_simpletimeout(void)
 }
 
+void
+test_persistent_timeout(void)
+{
+	struct timeval tv;
+	struct event evt;
+	struct event_base *eb = event_init();
+
+	setup_test("Persistent timeout: ");
+	test_ok = 0;
+	persist_cnt = 8;
+
+	/* Cycle every 250 msec, for persist_cnt cycles */
+	tv.tv_sec = 0;
+	tv.tv_usec = 250000;
+	event_set(&evt, -1, EV_TIMEOUT | EV_PERSIST, persist_timeout_cb, &evt);
+	event_add(&evt, &tv);
+
+	event_dispatch();
+	event_base_free(eb);
+
+	cleanup_test();
+}
+
+void
+test_persistent_timeout_rw(void)
+{
+	struct timeval tv;
+	struct event evw, evr;
+	struct event_base *eb = event_init();
+	int pfd[2];
+
+	setup_test("Persistent read/write timeout: ");
+	test_ok = 0;
+	persist_cnt = 8;
+
+	pipe(pfd);
+	evutil_make_socket_nonblocking(pfd[0]);
+	evutil_make_socket_nonblocking(pfd[1]);
+
+	/* Write every 250 msec, repeated persist_cnt times */
+	tv.tv_sec = 0;
+	tv.tv_usec = 250000;
+	event_set(&evw, pfd[1], EV_TIMEOUT | EV_PERSIST, persist_write_cb, &evw);
+	event_add(&evw, &tv);
+	/* Read until timeout (which will be relative to last active event) */
+	tv.tv_sec = 0;
+	tv.tv_usec = 500000;
+	event_set(&evr, pfd[0], EV_READ | EV_PERSIST, persist_read_cb, &evr);
+	event_add(&evr, &tv);
+
+	event_dispatch();
+	event_base_free(eb);
+
+	close(pfd[1]);
+	close(pfd[0]);
+	cleanup_test();
+}
+
+void
+test_persistent_timeout_signal(void)
+{
+	struct timeval tv;
+	struct event evs, evh;
+	struct event_base *eb = event_init();
+
+	setup_test("Persistent signal timeout: ");
+	test_ok = 0;
+	persist_cnt = 8;
+
+	/* Signal every 250 msec, repeated persist_cnt times */
+	tv.tv_sec = 0;
+	tv.tv_usec = 250000;
+	event_set(&evs, -1, EV_TIMEOUT | EV_PERSIST, persist_signal_cb, &evs);
+	event_add(&evs, &tv);
+	/* Handle until timeout (which will be relative to last active event) */
+	tv.tv_sec = 0;
+	tv.tv_usec = 500000;
+	event_set(&evh, SIGUSR1, EV_SIGNAL | EV_PERSIST, persist_signal_handler_cb, &evh);
+	event_add(&evh, &tv);
+
+	event_dispatch();
+	event_base_free(eb);
+
+	cleanup_test();
+}
+
 #ifndef WIN32
 void
@@ -454,4 +637,6 @@ test_simplesignal(void)
 
 	setup_test("Simple signal: ");
+
+	event_init();
 	signal_set(&ev, SIGALRM, signal_cb, &ev);
 	signal_add(&ev, NULL);
@@ -633,4 +818,106 @@ out:
 
 void
+sig_tab_timer_cb(int sig, short event, void *a)
+{
+	raise(sig);
+	return;
+}
+
+void
+sig_tab_dec_cb(int sig, short event, void *a)
+{
+	test_ok--;
+	return;
+}
+
+void
+sig_tab_inc_cb(int sig)
+{
+	test_ok += 2;
+	return;
+}
+
+int sig_tab[] = {
+	SIGABRT,
+	SIGALRM,
+	SIGCHLD,
+	SIGCONT,
+	SIGFPE,
+	SIGHUP,
+	SIGILL,
+	SIGINT,
+	SIGPIPE,
+	SIGQUIT,
+	SIGSEGV,
+	SIGTERM,
+	SIGTSTP,
+	SIGTTIN,
+	SIGTTOU,
+	SIGUSR1,
+	SIGUSR2
+};
+
+/*
+ * assert that we can handle a number of signals and then restore properly.
+ */
+void
+test_signal_spread_restore()
+{
+	struct event *ev, *evt;
+	struct event_base *eb = event_init();
+	struct timeval tv_zero = { 0, 0 };
+#ifdef HAVE_SIGACTION
+	struct sigaction sa;
+#endif
+	size_t i, n = sizeof sig_tab / sizeof *sig_tab;
+
+	printf("Signal handler spread restore: ");
+
+	test_ok = 0;
+	ev = calloc(n, sizeof *ev);
+	evt = calloc(n, sizeof *evt);
+	if (ev == NULL || evt == NULL)
+		goto out;
+	for (i = n; i--; ) {
+		/* standard signal handlers */
+#ifdef HAVE_SIGACTION
+		sa.sa_handler = sig_tab_inc_cb;
+		sa.sa_flags = 0x0;
+		sigemptyset(&sa.sa_mask);
+		if (sigaction(sig_tab[i], &sa, NULL) == -1)
+			goto out;
+#else
+		if (signal(sig_tab[i], sig_tab_inc_cb) == SIG_ERR)
+			goto out;
+#endif
+		/* event handlers */
+		event_set(&ev[i], sig_tab[i], EV_SIGNAL, sig_tab_dec_cb, &ev[i]);
+		event_add(&ev[i], NULL);
+		/* bootstrappers */
+		event_set(&evt[i], sig_tab[i], EV_TIMEOUT, sig_tab_timer_cb, &evt[i]);
+		event_add(&evt[i], &tv_zero);
+	}
+
+	event_dispatch();
+
+	/* at this point test_ok should == -n; */
+	for (i = n; i--; )
+		raise(sig_tab[i]);
+	/* at this point test_ok should == n; */
+
+	/* event handlers decrement, standard signal handlers increment by 2 */
+	if (test_ok == n)
+		test_ok = 1;
+	else
+		test_ok = 0;
+out:
+	event_base_free(eb);
+	free(evt);
+	free(ev);
+	cleanup_test();
+	return;
+}
+
+void
 test_free_active_base(void)
 {
@@ -1173,4 +1460,7 @@ main (int argc, char **argv)
 
 	test_simpletimeout();
+	test_persistent_timeout();
+	test_persistent_timeout_rw();
+	test_persistent_timeout_signal();
 #ifndef WIN32
 	test_simplesignal();
@@ -1192,4 +1482,5 @@ main (int argc, char **argv)
 	test_signal_switchbase();
 	test_signal_restore();
+	test_signal_spread_restore();
 	test_signal_assert();
 #endif


More information about the Libevent-users mailing list