libevent是事件驱动的网络库,事件驱动是他的核心,所以理解事件驱动对于理解整个网络库有很重要的意义。
本着从简入繁,今天分析下单线程最简单的事件触发。通过sample下的event-test来理解libevent的事件驱动。
代码版本为1.4.14。
libevent事件机制:当事件发生, libevent就会根据用户设定的方式自动执行指定的回调函数,来处理事件。
这是一种reactor方式的事件通知方式,由事件驱动。reactor的优点:响应快,编程简单等等。。。
首先看下几个重要的结构。等全部分析完libevent,再把全部注释过的代码上传到github上。如果有错误及时告诉我,谢谢。
我的理解是当前线程中所有事件的一个管理者。位于event-internal.h中。
1 //事件基础管理 2 struct event_base { 3 //I/O复用类型,select、epoll...linux默认是epoll 4 const struct eventop *evsel; 5 //具体的I/O复用,是epollop类型,通过eventop中的init函数返回,包含了具体的I/O复用各种信息 6 void *evbase; 7 //总共的事件个数 8 int event_count; /* counts number of total events */ 9 //总共的活动事件个数10 int event_count_active; /* counts number of active events */11 12 //退出13 int event_gotterm; /* Set to terminate loop */14 //立即退出15 int event_break; /* Set to terminate loop immediately */16 17 /* active event management */18 //活动事件队列,二维链表。第一维是根据优先级,第二维是每个优先级中对应加入的事件19 struct event_list **activequeues;20 //优先级队列数量。数组第一维必须告诉大小。因为如果是数组,参入函数,第一维肯定退化为指针,无法知道长度21 int nactivequeues;22 23 //信号信息24 /* signal handling info */25 struct evsignal_info sig;26 27 //所有事件队列28 struct event_list eventqueue;29 30 //event_base创建时间31 struct timeval event_tv;32 33 //event_base时间小根堆34 struct min_heap timeheap;35 36 //event_base缓存时间37 struct timeval tv_cache;38 };
当前选用的I/O复用模型的封装。位于event-internal.h中。
1 //I/O复用封装 2 struct eventop { 3 const char *name; 4 void *(*init)(struct event_base *); //初始化 5 int (*add)(void *, struct event *); //注册 6 int (*del)(void *, struct event *); //删除 7 int (*dispatch)(struct event_base *, void *, struct timeval *); //事件分发 8 void (*dealloc)(struct event_base *, void *);//释放资源 9 /* set if we need to reinitialize the event base */10 int need_reinit;11 };
事件信息的封装
1 struct event { 2 //事件在队列中的节点(下次分析此队列的实现) 3 TAILQ_ENTRY (event) ev_next; 4 TAILQ_ENTRY (event) ev_active_next; 5 TAILQ_ENTRY (event) ev_signal_next; 6 //事件在最小时间堆中位置 7 unsigned int min_heap_idx; /* for managing timeouts */ 8 9 //事件的当前管理类10 struct event_base *ev_base;11 //事件对应的文件描述符,一切皆文件12 int ev_fd;13 //事件类型14 short ev_events;15 //发送到活动队列后要执行的次数16 short ev_ncalls;17 //ev_pncalls指向ev_ncalls,允许在回调中将自己的事件执行次数置为0,然后退出18 short *ev_pncalls; /* Allows deletes in callback */19 20 //事件触发的时间21 struct timeval ev_timeout;22 23 //事件优先级24 int ev_pri; /* smaller numbers are higher priority */25 26 //事件到来回调27 void (*ev_callback)(int, short, void *arg);28 //事件到来回调的参数29 void *ev_arg;30 31 //事件在活动队列中的事件类型,发送给回调函数,让回调函数知道发生事件的原因32 int ev_res; /* result passed to event callback */33 34 //标识该事件在哪个队列中,插入的是哪个队列35 int ev_flags;36 };
1 //队列标记 2 //定时器队列,与时间有关的事件加入此队列 3 #define EVLIST_TIMEOUT 0x01 4 //总队列,代表已经插入过 5 #define EVLIST_INSERTED 0x02 6 //信号队列 7 #define EVLIST_SIGNAL 0x04 8 //活动队列 9 #define EVLIST_ACTIVE 0x0810 //内部队列11 #define EVLIST_INTERNAL 0x1012 //初始化队列13 #define EVLIST_INIT 0x8014 15 /* EVLIST_X_ Private space: 0x1000-0xf000 */16 #define EVLIST_ALL (0xf000 | 0x9f)17 //事件类型,发生了什么事件18 19 //定时超时,表明事件超时,如果在活动队列中,需要执行20 #define EV_TIMEOUT 0x0121 //I/O事件22 #define EV_READ 0x0223 #define EV_WRITE 0x0424 //信号25 #define EV_SIGNAL 0x0826 //持续事件27 #define EV_PERSIST 0x10 /* Persistant event */
通过流程图可以更加清晰的理解。
test.c
1 #include <sys/types.h> 2 #include <sys/stat.h> 3 #include <sys/queue.h> 4 #include <sys/time.h> 5 #include <fcntl.h> 6 #include <stdlib.h> 7 #include <stdio.h> 8 #include <string.h> 9 #include <unistd.h>10 #include <errno.h>11 int main(int argc,char **argv){12 char *input = argv[1];13 if(argc !=2){14 input = "hello";15 }16 int fd;17 fd = open("event.fifo",O_WRONLY);18 if(fd == -1){19 perror("open error");20 exit(EXIT_FAILURE);21 }22 write(fd,input,strlen(input));23 close(fd);24 printf("write success\n");25 return 0;26 }
可以看到linux默认的I/O复用是epoll。加入队列,并且不是定时函数。最后进入循环到达epoll_dispatch分发。
此时上面一个终端可以收到如下信息:事件触发,调用event_del,从相应队列中删除,然后执行,event-test.c中再次加入了事件,进入事件循环。
1 //创建event_base 2 struct event_base * 3 event_base_new(void) 4 { 5 int i; 6 struct event_base *base; 7 //申请空间 8 if ((base = calloc(1, sizeof(struct event_base))) == NULL) 9 event_err(1, "%s: calloc", __func__);10 11 event_sigcb = NULL;12 event_gotsig = 0;13 //是否使用绝对时间14 detect_monotonic();15 //获取event_base创建时间16 gettime(base, &base->event_tv);17 18 //初始化小根堆19 min_heap_ctor(&base->timeheap);20 //初始化队列21 TAILQ_INIT(&base->eventqueue);22 //信号相关23 base->sig.ev_signal_pair[0] = -1;24 base->sig.ev_signal_pair[1] = -1;25 26 base->evbase = NULL;27 //获得I/O复用,选到合适的就往下执行。linux默认是epoll28 for (i = 0; eventops[i] && !base->evbase; i++) {29 //获得I/O复用30 base->evsel = eventops[i];31 //获得具体的I/O复用信息32 base->evbase = base->evsel->init(base);33 }34 //没有I/O复用,报错退出35 if (base->evbase == NULL)36 event_errx(1, "%s: no event mechanism available", __func__);37 //如果设置了EVENT_SHOW_METHOD,输出IO复用名字38 if (evutil_getenv("EVENT_SHOW_METHOD")) 39 event_msgx("libevent using: %s\n",40 base->evsel->name);41 42 /* allocate a single active event queue */43 //初始化活动队列的优先级,默认优先级为144 event_base_priority_init(base, 1);45 46 return (base);47 }
1 //初始化优先队列 2 int 3 event_base_priority_init(struct event_base *base, int npriorities) 4 { 5 int i; 6 //如果base中有活动事件,返回,不处理优先级的初始化 7 if (base->event_count_active) 8 return (-1); 9 //如果优先级数量未变,没有必要执行10 if (npriorities == base->nactivequeues)11 return (0);12 //释放所有优先级队列13 if (base->nactivequeues) {14 for (i = 0; i < base->nactivequeues; ++i) {15 free(base->activequeues[i]);16 }17 free(base->activequeues);18 }19 20 /* Allocate our priority queues */21 //分配优先级队列22 base->nactivequeues = npriorities;23 base->activequeues = (struct event_list **)24 calloc(base->nactivequeues, sizeof(struct event_list *));25 if (base->activequeues == NULL)26 event_err(1, "%s: calloc", __func__);27 //默认每个优先级分配一个节点,作为事件队列的队列的头结点28 for (i = 0; i < base->nactivequeues; ++i) {29 base->activequeues[i] = malloc(sizeof(struct event_list));30 if (base->activequeues[i] == NULL)31 event_err(1, "%s: malloc", __func__);32 //每个事件都初始化为队列的头结点33 TAILQ_INIT(base->activequeues[i]);34 }35 36 return (0);37 }
1 //设置与注册event 2 //ev: 需要注册的事件 3 //fd: 文件描述符 4 //events: 注册事件的类型 5 //callback: 注册事件的回调函数 6 //arg: 注册事件回调函数的参数 7 //事件类型有: 8 //#define EV_TIMEOUT 0x01 9 //#define EV_READ 0x0210 //#define EV_WRITE 0x0411 //#define EV_SIGNAL 0x0812 //定时事件event_set(ev, -1, 0, cb, arg)13 void14 event_set(struct event *ev, int fd, short events,15 void (*callback)(int, short, void *), void *arg)16 {17 /* Take the current base - caller needs to set the real base later */18 //默认为全局ev_base进行事件的注册19 ev->ev_base = current_base;20 //事件回调21 ev->ev_callback = callback;22 //事件回调参数23 ev->ev_arg = arg;24 //对应文件描述符25 ev->ev_fd = fd;26 //事件类型27 ev->ev_events = events;28 //事件在活动队列中的类型29 ev->ev_res = 0;30 //标识事件加入了哪个队列31 ev->ev_flags = EVLIST_INIT;32 //加入活动队列后调试的次数33 ev->ev_ncalls = 0;34 //Allows deletes in callback,允许在回调中删除自己35 ev->ev_pncalls = NULL;36 //初始化事件在堆中的位置。刚开始为-137 min_heap_elem_init(ev);38 39 /* by default, we put new events into the middle priority */40 //默认事件的优先级为中间41 if(current_base)42 ev->ev_pri = current_base->nactivequeues/2;43 }
1 //事件加入队列 2 int 3 event_add(struct event *ev, const struct timeval *tv) 4 { 5 //事件的基础管理,事件中有一个event_base指针,指向了他所属于的管理类 6 struct event_base *base = ev->ev_base; 7 //当前I/O复用管理,包括初始化,注册,回调等。。。 8 const struct eventop *evsel = base->evsel; 9 //具体的I/O复用10 void *evbase = base->evbase;11 int res = 0;12 13 event_debug((14 "event_add: event: %p, %s%s%scall %p",15 ev,16 ev->ev_events & EV_READ ? "EV_READ " : " ",17 ev->ev_events & EV_WRITE ? "EV_WRITE " : " ",18 tv ? "EV_TIMEOUT " : " ",19 ev->ev_callback));20 21 assert(!(ev->ev_flags & ~EVLIST_ALL));22 23 /*24 * prepare for timeout insertion further below, if we get a25 * failure on any step, we should not change any state.26 */27 //事件的时间tv不为null并且现在事件还不在定时队列中,我们先在小根堆中申请一个位置,以便后面加入28 //event_set后事件的ev_flags为EVLIST_INIT29 if (tv != NULL && !(ev->ev_flags & EVLIST_TIMEOUT)) {30 if (min_heap_reserve(&base->timeheap,31 1 + min_heap_size(&base->timeheap)) == -1)32 return (-1); /* ENOMEM == errno */33 }34 //如果事件类型是EV_READ,EV_WRITE,EV_SIGNAL并且事件状态不是EVLIST_INSERTED(已加入)与EVLIST_ACTIVE(已活动)35 if ((ev->ev_events & (EV_READ|EV_WRITE|EV_SIGNAL)) &&36 !(ev->ev_flags & (EVLIST_INSERTED|EVLIST_ACTIVE))) {37 //将事件加入到对应的I/O复用中38 res = evsel->add(evbase, ev);39 if (res != -1)40 //加入对应的I/O复用成功后,插入EVLIST_INSERTED队列41 event_queue_insert(base, ev, EVLIST_INSERTED);42 }43 44 /* 45 * we should change the timout state only if the previous event46 * addition succeeded.47 */48 //定时执行事件处理(tv不为零,表示有超时时间)49 if (res != -1 && tv != NULL) {50 struct timeval now;51 52 /* 53 * we already reserved memory above for the case where we54 * are not replacing an exisiting timeout.55 */56 //定时事件已经在定时队列中了,先从中删除57 if (ev->ev_flags & EVLIST_TIMEOUT)58 event_queue_remove(base, ev, EVLIST_TIMEOUT);59 60 /* Check if it is active due to a timeout. Rescheduling61 * this timeout before the callback can be executed62 * removes it from the active list. */63 //定时事件是否在活动队列中,并且是定时事件,如果是,从活动队列中删除64 if ((ev->ev_flags & EVLIST_ACTIVE) &&65 (ev->ev_res & EV_TIMEOUT)) {66 /* See if we are just active executing this67 * event in a loop68 */69 //调用次数置零70 if (ev->ev_ncalls && ev->ev_pncalls) {71 /* Abort loop */72 *ev->ev_pncalls = 0;73 }74 //从活动队列中删除75 event_queue_remove(base, ev, EVLIST_ACTIVE);76 }77 78 //得到当前时间79 gettime(base, &now);80 //更新时间81 //当前时间点+定时事件每隔多少秒触发时间=触发时间点。ev->ev_timeout为事件触发时间点82 evutil_timeradd(&now, tv, &ev->ev_timeout);83 84 event_debug((85 "event_add: timeout in %ld seconds, call %p",86 tv->tv_sec, ev->ev_callback));87 //加入定时队列88 event_queue_insert(base, ev, EVLIST_TIMEOUT);89 }90 91 return (res);92 }
1 /* not thread safe */ 2 //默认进入全局事件管理的事件循环 3 int 4 event_loop(int flags) 5 { 6 return event_base_loop(current_base, flags); 7 } 8 //事件分发,进入事件循环,默认进入全局事件管理的事件循环 9 int 10 event_base_loop(struct event_base *base, int flags) 11 { 12 //I/O复用管理 13 const struct eventop *evsel = base->evsel; 14 //具体I/O复用 15 void *evbase = base->evbase; 16 struct timeval tv; 17 struct timeval *tv_p; 18 int res, done; 19 20 /* clear time cache */ 21 base->tv_cache.tv_sec = 0; 22 //信号处理 23 if (base->sig.ev_signal_added) 24 evsignal_base = base; 25 done = 0; 26 //事件循环 27 while (!done) { 28 /* Terminate the loop if we have been asked to */ 29 //退出 30 if (base->event_gotterm) { 31 base->event_gotterm = 0; 32 break; 33 } 34 //立即退出 35 if (base->event_break) { 36 base->event_break = 0; 37 break; 38 } 39 40 /* You cannot use this interface for multi-threaded apps */ 41 //信号处理 42 while (event_gotsig) { 43 event_gotsig = 0; 44 if (event_sigcb) { 45 res = (*event_sigcb)(); 46 if (res == -1) { 47 errno = EINTR; 48 return (-1); 49 } 50 } 51 } 52 53 //检测时间对不对,不对的话要校准 54 timeout_correct(base, &tv); 55 //tv为当前时间 56 tv_p = &tv; 57 //如果当前事件活动队列为0,并且事件是阻塞的,立马到时间堆中去查找定时时间 58 if (!base->event_count_active && !(flags & EVLOOP_NONBLOCK)) { 59 timeout_next(base, &tv_p); 60 } else { 61 /* 62 * if we have active events, we just poll new events 63 * without waiting. 64 */ 65 //活动队列不为空,或者此事件是非阻塞事件,将超时时间置为零,意味着没有超时时间 66 evutil_timerclear(&tv); 67 } 68 //没有可以执行的事件,退出 69 /* If we have no events, we just exit */ 70 if (!event_haveevents(base)) { 71 event_debug(("%s: no events registered.", __func__)); 72 return (1); 73 } 74 75 /* update last old time */ 76 //更新base的创建时间 77 gettime(base, &base->event_tv); 78 79 /* clear time cache */ 80 //清缓存 81 base->tv_cache.tv_sec = 0; 82 83 //进行对应事件的分发,将tv_p也传入进去,tv_p为超时时间 84 res = evsel->dispatch(base, evbase, tv_p); 85 86 if (res == -1) 87 return (-1); 88 //来事件了 89 //更新缓存时间 90 gettime(base, &base->tv_cache); 91 92 //进行超时处理,处理目前时间已经到达需要执行的事件,加入活动队列等操作 93 timeout_process(base); 94 95 //有活动队列 96 if (base->event_count_active) { 97 //调用 98 event_process_active(base); 99 //全部执行完,并且只要执行一次,就可以跳出循环了100 if (!base->event_count_active && (flags & EVLOOP_ONCE))101 done = 1;102 } else if (flags & EVLOOP_NONBLOCK)103 //活动队列没有事件,而且是非阻塞,跳出循环104 done = 1;105 }106 107 /* clear time cache */108 base->tv_cache.tv_sec = 0;109 110 event_debug(("%s: asked to terminate loop.", __func__));111 return (0);112 }
1 //查找下一个需要处理的事件,这边需要指针的指针,因为假如小根堆中压根没有事件,将指针置为空 2 static int 3 timeout_next(struct event_base *base, struct timeval **tv_p) 4 { 5 struct timeval now; 6 struct event *ev; 7 struct timeval *tv = *tv_p; 8 //查找小根堆里面的事件最小的事件,没有就退出 9 if ((ev = min_heap_top(&base->timeheap)) == NULL) {10 /* if no time-based events are active wait for I/O */11 //没有事件了,超时时间置为空,退出,时间指针置为空,所以需要指针的指针12 *tv_p = NULL;13 return (0);14 }15 16 if (gettime(base, &now) == -1)17 return (-1);18 //事件已经超时,需要立即执行,清空tv_p,超时时间为0,返回19 if (evutil_timercmp(&ev->ev_timeout, &now, <=)) {20 evutil_timerclear(tv);21 return (0);22 }23 //事件还没有到执行的时间,计算出相差的时间,返回24 evutil_timersub(&ev->ev_timeout, &now, tv);25 26 assert(tv->tv_sec >= 0);27 assert(tv->tv_usec >= 0);28 29 event_debug(("timeout_next: in %ld seconds", tv->tv_sec));30 return (0);31 }
1 //进行时间处理 2 void 3 timeout_process(struct event_base *base) 4 { 5 struct timeval now; 6 struct event *ev; 7 //时间堆为空退出 8 if (min_heap_empty(&base->timeheap)) 9 return;10 11 gettime(base, &now);12 13 //事件执行时间比现在大时,需要执行,将此事件从event队列中删除14 while ((ev = min_heap_top(&base->timeheap))) {15 if (evutil_timercmp(&ev->ev_timeout, &now, >))16 break;17 18 /* delete this event from the I/O queues */19 //从ev对应的队列中删除此事件20 event_del(ev);21 22 event_debug(("timeout_process: call %p",23 ev->ev_callback));24 //发送到活动队列,激活此事件,事件的状态变更为EV_TIMEOUT,事件的执行次数改为125 event_active(ev, EV_TIMEOUT, 1);26 }27 }
1 //对在活动队列中的事件调用他对应的回调 2 static void 3 event_process_active(struct event_base *base) 4 { 5 struct event *ev; 6 struct event_list *activeq = NULL; 7 int i; 8 short ncalls; 9 10 //取得第一个非空的优先级队列,nactivequeues越小,优先级越高11 for (i = 0; i < base->nactivequeues; ++i) {12 if (TAILQ_FIRST(base->activequeues[i]) != NULL) {13 activeq = base->activequeues[i];14 break;15 }16 }17 18 assert(activeq != NULL);19 20 for (ev = TAILQ_FIRST(activeq); ev; ev = TAILQ_FIRST(activeq)) {21 //如果是持续事件,只从EVLIST_ACTIVE队列中删除事件即可22 if (ev->ev_events & EV_PERSIST)23 event_queue_remove(base, ev, EVLIST_ACTIVE);24 else25 event_del(ev);26 27 /* Allows deletes to work */28 //允许删除自己29 ncalls = ev->ev_ncalls;30 ev->ev_pncalls = &ncalls;31 while (ncalls) {32 //持续调用,直到调用次数为033 ncalls--;34 ev->ev_ncalls = ncalls;35 (*ev->ev_callback)((int)ev->ev_fd, ev->ev_res, ev->ev_arg);36 if (event_gotsig || base->event_break)37 return;38 }39 }40 }
联系客服