可以看出struct bufferevent内置了两个event(读/写)和对应的缓冲区。当有数据被读入(input)的时候,readcb被调用,当output被输出完成的时候,writecb被调用,当网络I/O出现错误,如链接中断,超时或其他错误时,errorcb被调用。
使用bufferevent的过程:
1. 设置sock为非阻塞的
eg: evutil_make_socket_nonblocking(fd);2. 使用bufferevent_socket_new创建一个structbufferevent *bev,关联该sockfd,托管给event_base
函数原型为:
struct bufferevent * bufferevent_socket_new(struct event_base *base, evutil_socket_t fd, int options)eg: struct bufferevent *bev;bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE);3. 设置读写对应的回调函数
函数原型:
int bufferevent_enable(struct bufferevent *bufev, short event)eg. bufferevent_enable(bev, EV_READ|EV_WRITE);5. 进入bufferevent_setcb回调函数:
在readcb里面从input中读取数据,处理完毕后填充到output中;
writecb对于服务端程序,只需要readcb就可以了,可以置为NULL;
errorcb用于处理一些错误信息。
需要C/C++ Linux服务器架构师学习资料私信“资料”(资料包括C/C++,Linux,golang技术,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK,ffmpeg等),免费分享
针对这些使用过程进入源码进行分析:
1. bufferevent_socket_new
(1)在bufferevent_init_common中调用evbuffer_new()初始化input和output
(2)在event_assign中初始化bufferevent中的ev_read和ev_write事件。
(3)在evbuffer_add_cb中给output添加了一个callback bufferevent_socket_outbuf_cb
2. bufferevent_setcb
该函数的作用主要是赋值,把该函数后面的参数,赋值给第一个参数struct bufferevent *bufev定义的变量
3. bufferevent_enable
调用event_add将读写事件加入到事件监听队列中。
对bufferevent常用的几个函数进行分析:
char *evbuffer_readln(struct evbuffer*buffer, size_t *n_read_out,enum evbuffer_eol_style eol_style);含义:Read a single line from an evbuffer.
返回值:读到的一行内容
int evbuffer_add(struct evbuffer *buf,const void *data, size_t datlen);含义:将数据添加到evbuffer的结尾
返回值:成功返回0,失败返回-1
int evbuffer_remove(struct evbuffer*buf, void *data, size_t datlen);含义:从evbuffer读取数据到data
返回值:成功返回0,失败返回-1
size_t evbuffer_get_length(const structevbuffer *buf);含义:返回evbuffer中存储的字节长度
暂时先分析到这里,下面是代码,客户端发送消息:HTTP/1.0, Client 0 send Message:
Request: Hello Server! over,服务端一条消息收完成后,会回复:Response ok! Hello Client!
服务端从bufferevent中取出消息是按行取的。代码可能有不完善的地方,希望高手提出宝贵意见。
libevent_eventbuffer_server.c
#include <netinet/in.h>#include <sys/socket.h>#include <fcntl.h> #include <event2/event.h>#include <event2/buffer.h>#include <event2/bufferevent.h> #include <assert.h>#include <unistd.h>#include <string.h>#include <stdlib.h>#include <stdio.h>#include <errno.h> void do_read(evutil_socket_t fd, short events, void *arg); //struct bufferevent内建了两个event(read/write)和对应的缓冲区(struct evbuffer *input, *output),并提供相应的函数用来操作>缓冲区(或者直接操作bufferevent)//接收到数据后,判断是不一样一条消息的结束,结束标志为"over"字符串void readcb(struct bufferevent *bev, void *ctx){ printf("called readcb!\n"); struct evbuffer *input, *output; char *request_line; size_t len; input = bufferevent_get_input(bev);//其实就是取出bufferevent中的input output = bufferevent_get_output(bev);//其实就是取出bufferevent中的output size_t input_len = evbuffer_get_length(input); printf("input_len: %d\n", input_len); size_t output_len = evbuffer_get_length(output); printf("output_len: %d\n", output_len); while(1) { request_line = evbuffer_readln(input, &len, EVBUFFER_EOL_CRLF);//从evbuffer前面取出一行,用一个新分配的空字符结束的字符串返回这一行,EVBUFFER_EOL_CRLF表示行尾是一个可选的回车,后随一个换行符 if(NULL == request_line) { printf("The first line has not arrived yet.\n"); free(request_line);//之所以要进行free是因为 line = mm_malloc(n_to_copy+1)),在这里进行了malloc break; } else { printf("Get one line date: %s\n", request_line); if(strstr(request_line, "over") != NULL)//用于判断是不是一条消息的结束 { char *response = "Response ok! Hello Client!\r\n"; evbuffer_add(output, response, strlen(response));//Adds data to an event buffer printf("服务端接收一条数据完成,回复客户端一条消息: %s\n", response); free(request_line); break; } } free(request_line); } size_t input_len1 = evbuffer_get_length(input); printf("input_len1: %d\n", input_len1); size_t output_len1 = evbuffer_get_length(output); printf("output_len1: %d\n\n", output_len1);} void errorcb(struct bufferevent *bev, short error, void *ctx){ if (error & BEV_EVENT_EOF) { printf("connection closed\n"); } else if (error & BEV_EVENT_ERROR) { printf("some other error\n"); } else if (error & BEV_EVENT_TIMEOUT) { printf("Timed out\n"); } bufferevent_free(bev);} void do_accept(evutil_socket_t listener, short event, void *arg){ struct event_base *base = arg; struct sockaddr_storage ss; socklen_t slen = sizeof(ss); int fd = accept(listener, (struct sockaddr*)&ss, &slen); if (fd < 0) { perror("accept"); } else if (fd > FD_SETSIZE) { close(fd); } else { struct bufferevent *bev; evutil_make_socket_nonblocking(fd); //使用bufferevent_socket_new创建一个struct bufferevent *bev,关联该sockfd,托管给event_base BEV_OPT_CLOSE_ON_FREE表示释放bufferevent时关闭底层传输端口。这将关闭底层套接字,释放底层bufferevent等。 bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE); //设置读写对应的回调函数 bufferevent_setcb(bev, readcb, NULL, errorcb, NULL);// bufferevent_setwatermark(bev, EV_READ, 0, MAX_LINE); //启用读写事件,其实是调用了event_add将相应读写事件加入事件监听队列poll。正如文档所说,如果相应事件不置为true,bufferevent是不会读写数据的 bufferevent_enable(bev, EV_READ|EV_WRITE); }} void run(void){ evutil_socket_t listener; struct sockaddr_in sin; struct event_base *base; struct event *listener_event; base = event_base_new(); if (!base) return; sin.sin_family = AF_INET; sin.sin_addr.s_addr = 0; sin.sin_port = htons(8000); listener = socket(AF_INET, SOCK_STREAM, 0); evutil_make_socket_nonblocking(listener); #ifndef WIN32 { int one = 1; setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)); }#endif if (bind(listener, (struct sockaddr*)&sin, sizeof(sin)) < 0) { perror("bind"); return; }if (listen(listener, 16)<0) { perror("listen"); return; } listener_event = event_new(base, listener, EV_READ|EV_PERSIST, do_accept, (void*)base); event_add(listener_event, NULL); event_base_dispatch(base);} int main(int argc, char **argv){ setvbuf(stdout, NULL, _IONBF, 0); run(); return 0;}编译:gcc -I/usr/include-o test libevent_eventbuffer_server.c -L/usr/local/lib –levent
运行:
服务端:
客户端:
