0 Star 3 Fork 5

linxyruffy / ft_event

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
ft_event.c 47.42 KB
一键复制 编辑 原始数据 按行查看 历史
linxyruffy 提交于 2017-01-23 18:48 . add code
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149
// complier: gcc -o srv ft_event.c -lpthread -I ./
#include <stdio.h>
#include <stdlib.h>
#include <ctype.h>
#include <fcntl.h>
#include <errno.h>
#include <unistd.h>
#include <netdb.h>
#include <stdint.h>
#include <netinet/in.h>
#include <strings.h>
#include <string.h>
#include <assert.h>
#include <stddef.h>
#include <signal.h>
#include <stdarg.h>
#include <netinet/tcp.h>
#include <sys/time.h>
#include <sys/ioctl.h>
#include <sys/syscall.h>
#include <sys/types.h>
#include <sys/stat.h>
#include "ft_event.h"
static int exit_main_loop;
//////////////////////////////////////////////////////////////////////////////////////////////////
////////////////////// util //////////////////////////////////////////////////////////////////////
int64_t ev_usec_now()
{
struct timeval now;
int64_t usec;
int status;
status = gettimeofday(&now, NULL);
if (status < 0) {
perror("gettimeofday failed");
return -1;
}
usec = (int64_t)now.tv_sec * 1000000LL + (int64_t)now.tv_usec;
return usec;
}
int64_t ev_msec_now()
{
return ev_usec_now() / 1000LL;
}
int ev_socket_nonblocing(int fd)
{
int flags;
flags = fcntl(fd, F_GETFL, 0);
if (flags < 0) {
return flags;
}
return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
}
int ev_socket_reuseaddr(int fd)
{
int reuse;
socklen_t len;
reuse = 1;
len = sizeof(reuse);
return setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &reuse, len);
}
int ev_socket_nodelay(int fd)
{
int nodelay;
socklen_t len;
nodelay = 1;
len = sizeof(nodelay);
return setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &nodelay, len);
}
int ev_socket_keepalive(int fd)
{
int val = 1;
return setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &val, sizeof(val));
}
int ev_socket_listen(const char* ip, uint16_t port)
{
int fd = -1;
struct sockaddr_in sa;
struct hostent* ent;
fd = socket(AF_INET, SOCK_STREAM, 0);
if(fd <0) {
perror("srv_fd < 0");
exit(1);
}
if(ev_socket_nonblocing(fd) < 0) {
perror("fcntl(srv_fd, F_SETFL, flags | O_NONBLOCK) error");
exit(1);
}
if(ev_socket_reuseaddr(fd) < 0) {
perror("setsockopt(srv_fd, SOL_SOCKET, SO_REUSEADDR, &reuseaddr, sizeof(reuseaddr)) error");
exit(1);
}
bzero((char*)&sa, sizeof(sa));
if(ip == NULL) {
sa.sin_addr.s_addr = INADDR_ANY;
} else {
ent = gethostbyname(ip);
if(ent == NULL) {
perror("gethostbyname error");
exit(0);
}
bcopy((char *)ent->h_addr,
(char *)&sa.sin_addr.s_addr,
ent->h_length);
}
sa.sin_family = AF_INET;
sa.sin_port = htons(port);
if(bind(fd, (struct sockaddr*)&sa, sizeof(sa)) < 0) {
perror("bind error");
exit(1);
}
if(listen(fd, SOMAXCONN) < 0) {
perror("listen error");
exit(1);
}
return fd;
}
void ev_daemon()
{
pid_t pid;
/* Fork off the parent process */
pid = fork();
/* An error occurred */
if (pid < 0)
exit(EXIT_FAILURE);
/* Success: Let the parent terminate */
if (pid > 0)
exit(EXIT_SUCCESS);
/* On success: The child process becomes session leader */
if (setsid() < 0)
exit(EXIT_FAILURE);
/* Catch, ignore and handle signals */
//TODO: Implement a working signal handler */
signal(SIGCHLD, SIG_IGN);
signal(SIGHUP, SIG_IGN);
/* Fork off for the second time*/
pid = fork();
/* An error occurred */
if (pid < 0)
exit(EXIT_FAILURE);
/* Success: Let the parent terminate */
if (pid > 0)
exit(EXIT_SUCCESS);
/* Set new file permissions */
umask(0);
/* Change the working directory to the root directory */
/* or another appropriated directory */
chdir("/");
/* Close all open file descriptors */
int x;
for (x = sysconf(_SC_OPEN_MAX); x>0; x--)
{
close (x);
}
}
//////////////////////////////////////////////////////////////////////////////////////////////////
////////////////////// log //////////////////////////////////////////////////////////////////////
ev_log_t* ev_log_init(int level, char *filename)
{
int f = open(filename, O_WRONLY | O_APPEND | O_CREAT, 0644);
if(f == -1) {
return NULL;
}
ev_log_t* log = (ev_log_t*)malloc(sizeof(ev_log_t));
if(log == NULL) {
perror("malloc log error");
close(f);
exit(0);
}
bzero(log, sizeof(*log));
log->fd = f;
log->level = level;
return log;
}
void ev_log_finit(ev_log_t* self)
{
if(self->fd > 0) {
close(self->fd);
}
free(self);
}
void ev_log_write(ev_log_t* self, int level, const char* fmt, ...)
{
static const char *levelstr[] = {"NONE", "ERROR" ,"INFO" ,"DEBUG"};
if(self && level > self->level) {
return;
}
va_list ap;
char msg[EV_LOG_MAX_LEN];
char buf[EV_LOG_MAX_LEN];
va_start(ap, fmt);
vsnprintf(msg, sizeof(msg), fmt, ap);
va_end(ap);
struct timeval tv;
gettimeofday(&tv, NULL);
struct tm *t = localtime(&tv.tv_sec);
size_t n = snprintf(buf, EV_LOG_MAX_LEN, "%d-%02d-%02d %02d:%02d:%02d.%06d [%s][%d] %s\n",
t->tm_year + 1900,
t->tm_mon + 1,
t->tm_mday,
t->tm_hour,
t->tm_min,
t->tm_sec,
tv.tv_usec,
levelstr[level],
getpid(),
msg);
if(self && self->fd) {
printf("%s", buf);
write(self->fd, buf, n);
} else {
fprintf(stderr, "%s", buf);
}
}
ev_log_t* ev_default_log()
{
static ev_log_t* log = NULL;
if(log == NULL) {
log = ev_log_init(ev_default_config()->log_level, ev_default_config()->logfile);
}
return log;
}
//////////////////////////////////////////////////////////////////////////////////////////////////
////////////////////// array //////////////////////////////////////////////////////////////////////
ev_array_t* ev_array_new(uint32_t alloc_size, size_t elem_size)
{
ev_array_t* a;
assert(alloc_size != 0 && elem_size != 0);
a = (ev_array_t*)malloc(sizeof(a));
if(NULL == a) {
return NULL;
}
if(0 != ev_array_init(a, alloc_size, elem_size)) {
free(a);
return NULL;
}
return a;
}
void ev_array_del(ev_array_t *a)
{
if(a->elem != NULL) {
free(a->elem);
}
free(a);
}
int ev_array_init(ev_array_t *a, uint32_t alloc_size, size_t elem_size)
{
assert(alloc_size != 0 && elem_size != 0);
a->elem = malloc(alloc_size * elem_size);
if (a->elem == NULL) {
return -1;
}
a->nelem = 0;
a->size = elem_size;
a->nalloc = alloc_size;
return 0;
}
void ev_array_deinit(ev_array_t *a)
{
if (a->elem != NULL) {
free(a->elem);
}
}
void* ev_array_get(ev_array_t *a, uint32_t idx)
{
void *elem;
assert(a->nelem != 0);
assert(idx < a->nelem);
elem = (uint8_t *)a->elem + (a->size * idx);
return elem;
}
void* ev_array_push(ev_array_t* a)
{
void *elem, *new;
size_t size;
if (a->nelem == a->nalloc) {
size = a->size * a->nalloc;
new = realloc(a->elem, 2 * size);
if (new == NULL) {
return NULL;
}
a->elem = new;
a->nalloc *= 2;
}
elem = (uint8_t *)a->elem + a->size * a->nelem;
a->nelem++;
return elem;
}
void* ev_array_pop(ev_array_t* a)
{
void *elem;
assert(a->nelem != 0);
a->nelem--;
elem = (uint8_t *)a->elem + a->size * a->nelem;
return elem;
}
void ev_array_swap(ev_array_t *a, ev_array_t *b)
{
ev_array_t tmp;
tmp = *a;
*a = *b;
*b = tmp;
}
void* array_get(ev_array_t *a, uint32_t idx)
{
void *elem;
assert(a->nelem != 0);
assert(idx < a->nelem);
elem = (uint8_t *)a->elem + (a->size * idx);
return elem;
}
int ev_array_each(ev_array_t* a, array_each_t func, void* data)
{
uint32_t i, nelem;
int status;
assert(a->nelem != 0);
assert(func != NULL);
for (i = 0, nelem = a->nelem; i < nelem; ++i) {
void *elem = array_get(a, i);
status = func(elem, data);
if (status != 0) {
return status;
}
}
return 0;
}
//////////////////////////////////////////////////////////////////////////////////////////////////
////////////////////// buffer //////////////////////////////////////////////////////////////////////
ev_buf_t* ev_buf_new(const char* data, size_t size)
{
ev_buf_t* buf = (ev_buf_t*)malloc(sizeof(ev_buf_t));
if(buf == NULL) {
LOG_ERROR("(ev_buf_t*)malloc(sizeof(ev_buf_t)) error");
return NULL;
}
bzero(buf, sizeof(*buf));
if(size) {
ev_buf_resize(buf, size);
ev_buf_append(buf, data, size);
}
return buf;
}
ev_buf_t* ev_buf_new2(size_t size)
{
ev_buf_t* buf = (ev_buf_t*)malloc(sizeof(ev_buf_t));
if(buf == NULL) {
LOG_ERROR("(ev_buf_t*)malloc(sizeof(ev_buf_t)) error");
return NULL;
}
bzero(buf, sizeof(*buf));
if(size > 0) {
ev_buf_resize(buf, size);
}
return buf;
}
void ev_buf_del(ev_buf_t* self)
{
if(self->buf) {
free(self->buf);
}
free(self);
}
void ev_buf_resize(ev_buf_t* self, size_t n)
{
if(self->total < n) {
self->buf = realloc(self->buf, n);
self->total = n;
}
}
size_t ev_buf_append(ev_buf_t* self, const char* data, size_t len)
{
if(!len) {
return self->len;
}
size_t tailFree = self->total - self->offset - self->len;
if(tailFree < len){
if(tailFree + self->offset >= len){
memcpy(self->buf, self->buf + self->offset, self->len);
self->offset = 0;
}else{
ev_buf_resize(self, self->total + len);
}
}
memcpy(self->buf + self->offset + self->len, data, len);
self->len+= len;
self->buf[self->offset + self->len] = '\0';
return self->len;
}
size_t ev_buf_seek(ev_buf_t* self, size_t n)
{
if(n >= self->len){
self->len = 0;
self->offset = 0;
}else{
self->offset+= n;
self->len-= n;
}
return self->len;
}
int ev_buf_empty(ev_buf_t* self)
{
return self->len == 0;
}
void ev_buf_clear(ev_buf_t* self)
{
self->len = self->offset = 0;
}
size_t ev_buf_size(ev_buf_t* self)
{
return self->len;
}
const char* ev_buf_lock(ev_buf_t* self)
{
return self->buf + self->offset;
}
//////////////////////////////////////////////////////////////////////////////////////////////////
////////////////////// timer manager //////////////////////////////////////////////////////////////////////
ev_timer_mgr_t* ev_timer_mgr_new(size_t slots, int granularity)
{
int i;
ev_timer_mgr_t* mgr;
assert(slots > 0 && granularity > 0);
mgr = (ev_timer_mgr_t*)malloc(sizeof(*mgr));
if(mgr == NULL) {
LOG_ERROR("ev_timer_mgr_new: mgr = (ev_timer_mgr_t*)malloc(sizeof(*mgr)) error\n");
return NULL;
}
if( 0 != ev_array_init(&mgr->timer_slots, slots, sizeof(ev_timer_head_t)) ) {
LOG_ERROR("ev_timer_mgr_new: 0 != ev_array_init(mgr->timer_slots, slots, sizeof(ev_timer_t)) error\n");
free(mgr);
return NULL;
}
for(i=0; i<slots; ++i) {
ev_timer_head_t* head = ev_array_push(&mgr->timer_slots);
TAILQ_INIT(head);
}
mgr->granularity = granularity;
mgr->index = 0;
mgr->slots = slots;
mgr->update_time = ev_msec_now();
return mgr;
}
void ev_timer_mgr_del(ev_timer_mgr_t* mgr)
{
}
int ev_timer_mgr_start_timer(ev_timer_mgr_t* mgr, ev_timer_t* t, int64_t expire, int repeate)
{
assert(mgr && t);
uint32_t cursor;
uint32_t ticks;
uint32_t td;
if(expire < mgr->granularity) {
ticks = mgr->granularity;
} else {
ticks = (expire / mgr->granularity);
}
td = (ticks % mgr->slots);
t->rotation = (ticks / mgr->slots);
cursor = ((mgr->index + td) % mgr->slots);
t->repeat = repeate;
t->expire = expire;
t->slot = cursor;
//INSERT_TAIL(t, (ev_timer_t*)ev_array_get(&mgr->timer_slots, cursor));
TAILQ_INSERT_TAIL((ev_timer_head_t*)ev_array_get(&mgr->timer_slots, cursor), t, node);
return 0;
}
int ev_tiemr_mgr_stop_timer(ev_timer_mgr_t* mgr, ev_timer_t* t)
{
//REMOVE_ITEM(t);
TAILQ_REMOVE((ev_timer_head_t*)ev_array_get(&mgr->timer_slots, t->slot), t, node);
return 0;
}
int ev_timer_mgr_tick(ev_timer_mgr_t* mgr, int64_t now)
{
int32_t span;
int32_t i;
span = (now - mgr->update_time) / mgr->granularity;
if(span == 0) {
return 0;
}
for(i =0; i < span; ++i) {
ev_timer_t *item;
ev_timer_t *temp;
ev_timer_head_t swap;
ev_timer_head_t* head = (ev_timer_head_t*)ev_array_get(&mgr->timer_slots, mgr->index);
TAILQ_INIT(&swap);
TAILQ_SWAP(head, &swap, ev_timer_s, node);
// LIST_REPLACE(head, &timers);
// LIST_INIT(head);
TAILQ_FOREACH_SAFE(item, &swap, node, temp) {
TAILQ_REMOVE(&swap, item, node);
if(item->rotation > 0) {
item->rotation--;
TAILQ_INSERT_TAIL(head, item, node);
} else {
if(item->job.cb) {
if(item->job.cb(item->job.ptr) != 0) {
continue;
}
}
if(item->repeat) {
ev_timer_mgr_start_timer(mgr, item, item->expire, item->repeat);
}
}
}
// LIST_EACH_SAFE(item, temp, &timers) {
// REMOVE_ITEM(item);
// if(item->rotation > 0) {
// item->rotation--;
// INSERT_TAIL(item, head);
// } else {
// item->job.cb(item->job.ptr);
// if(item->repeat) {
// ev_timer_mgr_start_timer(mgr, item, item->expire, item->repeat);
// }
// }
// }
mgr->index = ++mgr->index % mgr->slots;
}
mgr->update_time = ev_msec_now();
return 0;
}
//////////////////////////////////////////////////////////////////////////////////////////////////
////////////////////// async //////////////////////////////////////////////////////////////////////
ev_async_t* ev_async_new(ev_loop_t* ev)
{
int fds[2];
ev_async_t* async = NULL;
async = (ev_async_t*)malloc(sizeof(ev_async_t));
if(async == NULL) {
return NULL;
}
bzero(async, sizeof(ev_async_t));
if(0 != pthread_mutex_init(&async->watch_mtx, NULL) ) {
perror("ev_async_new: pthread_mutex_init error\n");
exit(1);
}
#ifdef USE_KQUEUE
if( pipe(fds) < 0) {
perror("ev_async_new: pipe2 error\n");
pthread_mutex_destroy(&async->watch_mtx);
free(async);
return NULL;
}
int flags = fcntl(fds[0], F_GETFD);
flags |= O_NONBLOCK|O_CLOEXEC;
fcntl(fds[0], F_SETFD, flags);
flags = fcntl(fds[1], F_GETFD);
flags |= O_NONBLOCK|O_CLOEXEC;
fcntl(fds[1], F_SETFD, flags);
#else
if( pipe2(fds, O_NONBLOCK | O_CLOEXEC) < 0) {
perror("ev_async_new: pipe2 error\n");
pthread_mutex_destroy(&async->watch_mtx);
free(async);
return NULL;
}
#endif
if(0 != ev_array_init(&async->watch_jobs, 1, sizeof(ev_job_t))) {
LOG_ERROR("ev_async_new: ev_array_init(async->watch_jobs, 64, sizeof(ev_job_t)) error\n");
pthread_mutex_destroy(&async->watch_mtx);
close(fds[0]);
close(fds[1]);
free(async);
return NULL;
}
async->fd = fds[0];
async->wfd = fds[1];
async->type = EV_ASYNC;
async->ev = ev;
async->refc = 1;
ev_event_add_in(ev->evp, async->fd, (ev_stream_t*)async);
return async;
}
static int ev__async_read(ev_async_t* async)
{
int size = 0;
int n;
int i;
ev_array_t temp;
char buf[1024];
LOG_ENTER_FN;
for(;;) {
do {
n = read(async->fd, buf, sizeof(buf));
}while(n < 0 && errno == EINTR);
if(n <= 0) {
break;
}
if(n == sizeof(buf)) {
continue;
} else {
break;
}
}
pthread_mutex_lock(&async->watch_mtx);
if(ev_array_n(&async->watch_jobs) > 0) {
ev_array_swap(&async->watch_jobs, &temp);
} else {
pthread_mutex_unlock(&async->watch_mtx);
return 0;
}
ev_array_init(&async->watch_jobs, 1, sizeof(ev_job_t));
pthread_mutex_unlock(&async->watch_mtx);
size = ev_array_n(&temp);
for(i =0; i < size; ++i) {
ev_job_t* job = ev_array_get(&temp, i);
if(job->cb) {
job->cb(job->ptr);
}
}
ev_array_deinit(&temp);
return 0;
}
static int ev__async_stop(ev_async_t* async)
{
LOG_ENTER_FN;
ev_event_del_in(async->ev->evp, async->fd, (ev_stream_t*)async);
close(async->fd);
close(async->wfd);
return 0;
}
static int ev__async_free(ev_async_t* async)
{
LOG_ENTER_FN;
return 0;
}
int ev_async_post(ev_async_t* async, ev_job_t* op)
{
int r;
int len = 1;
char* buf = "";
ev_job_t* job;
pthread_mutex_lock(&async->watch_mtx);
job = ev_array_push(&async->watch_jobs);
if(job == NULL) {
LOG_ERROR("ev__async_post_job error\n");
pthread_mutex_unlock(&async->watch_mtx);
return -1;
}
*job = *op;
pthread_mutex_unlock(&async->watch_mtx);
do {
r = write(async->wfd, buf, len);
}while (r == -1 && errno == EINTR);
if (r == len) {
return 0;
}
if (r == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
return 0;
}
}
exit(1);
return 0;
}
// timeout op
void ev_add_timeout_watch(ev_tcp_t* c)
{
if(ev_default_config()->max_timeout <=0) {
return;
}
c->timeout = (uint32_t)time(NULL) + ev_default_config()->max_timeout;
TAILQ_INSERT_TAIL(&c->ev->streams_timeout, c, timeout_tqe);
//INSERT_TAIL(&c->timeout, &c->ev->stream_timeoutQ);
}
void ev_del_timeout_watch(ev_tcp_t* c)
{
if(ev_default_config()->max_timeout <=0) {
return;
}
//REMOVE_ITEM(&c->timeout);
TAILQ_REMOVE(&c->ev->streams_timeout, c, timeout_tqe);
}
void ev_reset_timeout_watch(ev_tcp_t* c)
{
if(ev_default_config()->max_timeout <=0) {
return;
}
//REMOVE_ITEM(&c->timeout);
ev_del_timeout_watch(c);
ev_add_timeout_watch(c);
}
//////////////////////////////////////////////////////////////////////////////////////////////////
////////////////////// tcp //////////////////////////////////////////////////////////////////////
ev_tcp_t* ev_tcp_new(ev_loop_t* ev, int fd)
{
ev_tcp_t* c = NULL;
c = (ev_tcp_t*)malloc(sizeof(ev_tcp_t));
if(c == NULL) {
LOG_ERROR("client_t* c = malloc(sizeof(struct client_t)) == NULL\n");
return NULL;
}
memset(c, 0, sizeof(ev_tcp_t));
c->in_buf = ev_buf_new2(ev_default_config()->buf_trunk_size);
c->out_buf = ev_buf_new2(ev_default_config()->buf_trunk_size);
c->type = EV_TCP;
c->fd = fd;
c->ev = ev;
c->refc = 1;
ev_loop_put_stream(ev, (ev_stream_t*)c, 1);
return c;
}
static int ev__tcp_stop(ev_tcp_t* c)
{
LOG_ENTER_FN;
ev_del_timeout_watch(c);
ev_buf_del(c->in_buf);
ev_buf_del(c->out_buf);
ev_event_del_in(c->ev->evp, c->fd, (ev_stream_t*)c);
ev_event_del_out(c->ev->evp, c->fd, (ev_stream_t*)c);
close(c->fd);
if(c->stop_cb) {
c->stop_cb(c);
}
return 0;
}
static int ev__tcp_free(ev_tcp_t* c)
{
LOG_ENTER_FN;
if(c->free_cb) {
c->free_cb(c);
}
return 0;
}
static int ev__tcp_write(ev_tcp_t* c)
{
if(!ev_buf_empty(c->out_buf)) {
int n = 0;
const char* data = ev_buf_lock(c->out_buf);
size_t size = ev_buf_size(c->out_buf);
ev_reset_timeout_watch(c);
if(c->write_cb) {
c->write_cb(c);
}
do {
n = send(c->fd, data, size, 0);
}while(n < 0 && errno == EINTR);
if(n > 0) {
int remain = ev_buf_seek(c->out_buf, n);
if(remain <= 0) {
if(ev_event_del_out(c->ev->evp, c->fd, (ev_stream_t*)c) < 0) {
ev_stream_del((ev_stream_t*)c);
return -1;
}
}
}
if(n < 0) {
if(errno == EAGAIN) {
return 0;
}
LOG_ERROR("send data error");
ev_stream_del((ev_stream_t*)c);
return -1;
}
} else {
if(ev_event_del_out(c->ev->evp, c->fd, (ev_stream_t*)c) < 0) {
ev_stream_del((ev_stream_t*)c);
return -1;
}
}
return 0;
}
static int ev__tcp_read(ev_tcp_t* c)
{
LOG("begin client_t read\n");
int n = 0;
int nrecv = 0;
char buf[4096] = {0};
ev_reset_timeout_watch(c);
do {
n = recv(c->fd, buf, sizeof(buf), 0);
}while(n < 0 && errno == EINTR);
if( n == 0) {
LOG_INFO("peer close\n");
ev_stream_del((ev_stream_t*)c);
return -1;
}
if(n < 0) {
if(errno == EWOULDBLOCK || errno == EAGAIN) {
return 0;
} else {
ev_stream_del((ev_stream_t*)c);
return -1;
}
}
ev_buf_append(c->in_buf, buf, n);
if(c->read_cb) {
c->read_cb(c);
}
return nrecv;
}
//////////////////////////////////////////////////////////////////////////////////////////////////
////////////////////// loop //////////////////////////////////////////////////////////////////////
static void ev__loop_pending_queue_cb(ev_loop_t* ev)
{
int size = 0;
int i;
ev_array_t temp = NULL_ARRAY;
pthread_mutex_lock(&ev->pending_mtx);
if(ev_array_n(&ev->pending_jobs) > 0 ) {
ev_array_swap(&ev->pending_jobs, &temp);
} else {
pthread_mutex_unlock(&ev->pending_mtx);
return;
}
ev_array_init(&ev->pending_jobs, 64, sizeof(ev_job_t));
pthread_mutex_unlock(&ev->pending_mtx);
size = ev_array_n(&temp);
for(i =0; i < size; ++i) {
ev_job_t* job = ev_array_get(&temp, i);
job->cb(job->ptr);
}
ev_array_deinit(&temp);
}
static void ev__loop_watch_timeout(ev_loop_t* ev)
{
ev_tcp_t *item, *temp;
uint32_t now = time(NULL);
TAILQ_FOREACH_SAFE(item, &ev->streams_timeout, timeout_tqe, temp) {
if(now >= item->timeout ) {
TAILQ_REMOVE(&ev->streams_timeout, item, timeout_tqe);
LOG_INFO("%d time out", item->fd);
ev_stream_del((ev_stream_t*)item);
} else {
break;
}
}
}
static void* ev__loop_run_loop(void* ptr)
{
ev_loop_t* ev = NULL;
LOG("run thread: %02x succ", pthread_self());
ev = (ev_loop_t*)ptr;
while(!ev->exit) {
ev__loop_watch_timeout(ev);
ev__loop_pending_queue_cb(ev);
ev_timer_mgr_tick(ev->timer_mgr, ev_msec_now());
ev_event_poll(ev->evp, 100);
}
LOG("thread %02x exit\n", pthread_self());
pthread_exit(NULL);
}
static int on_loop_cb(void* data, unsigned int mask)
{
ev_stream_t* c = (ev_stream_t*)data;
if(mask & kEventError) {
ev_stream_del(c);
return -1;
}
if(mask & kEventRead) {
if(c->type == EV_TCP) {
ev_tcp_t* tcp = (ev_tcp_t*)c;
if(ev__tcp_read(tcp) < 0) {
ev_stream_del(c);
return -1;
}
} else if(c->type == EV_ASYNC) {
ev_async_t* async = (ev_async_t*)c;
if(ev__async_read(async) < 0) {
ev_stream_del(c);
return -1;
}
}
}
if(mask & kEventWrite) {
if(c->type == EV_TCP) {
ev_tcp_t* tcp = (ev_tcp_t*)c;
if(ev__tcp_write(tcp) < 0) {
ev_stream_del(c);
return -1;
}
} else if(c->type == EV_ASYNC) {
}
}
return 0;
}
ev_loop_t* ev_loop_new()
{
ev_loop_t* ev = (ev_loop_t*)malloc(sizeof(ev_loop_t));
if(ev == NULL) {
return NULL;
}
memset(ev, 0, sizeof(ev_loop_t));
//LIST_INIT(&ev->streamQ);
//LIST_INIT(&ev->stream_timeoutQ);
TAILQ_INIT(&ev->streams_list);
TAILQ_INIT(&ev->streams_timeout);
ev->evp = ev_event_new(32, on_loop_cb);
if(ev->evp == NULL) {
perror("create evbase error\n");
}
if(ev_array_init(&ev->pending_jobs, 64, sizeof(ev_job_t)) != 0) {
goto err;
}
ev->timer_mgr = ev_timer_mgr_new(256, 100);
if(ev->timer_mgr == NULL) {
goto err;
}
if(pthread_mutex_init(&ev->pending_mtx, NULL) != 0) {
goto err;
}
if(pthread_mutex_init(&ev->stream_mtx, NULL) != 0) {
goto err;
}
if(0 != pthread_create(&ev->cur_thread, NULL, ev__loop_run_loop, (void*)ev)) {
perror("pthread_create error\n");
exit(1);
}
return ev;
err:
perror("ev_loop_new error\n");
if(ev) {
if(ev->evp) {
ev_event_destroy(ev->evp);
}
free(ev);
}
return NULL;
}
void ev_loop_del(ev_loop_t* ev)
{
ev_stream_t* c, *tvar;
TAILQ_FOREACH_SAFE(c, &ev->streams_list, node, tvar) {
ev_stream_del(c);
}
ev_event_destroy(ev->evp);
pthread_mutex_destroy(&ev->pending_mtx);
pthread_mutex_destroy(&ev->stream_mtx);
free(ev);
}
void ev_loop_put_pending_queue(ev_loop_t* ev, ev_job_t* op)
{
ev_job_t* job;
pthread_mutex_lock(&ev->pending_mtx);
job = ev_array_push(&ev->pending_jobs);
if(job == NULL) {
LOG_ERROR("ev_loop_put_peeding_queue error\n");
pthread_mutex_unlock(&ev->pending_mtx);
return;
}
*job = *op;
pthread_mutex_unlock(&ev->pending_mtx);
}
void ev_loop_put_stream(ev_loop_t* ev, ev_stream_t* c, int put_tail)
{
pthread_mutex_lock(&ev->stream_mtx);
if(put_tail) {
//INSERT_TAIL(c, &ev->streamQ)
TAILQ_INSERT_TAIL(&ev->streams_list, c, node);
} else {
//INSERT_HEAD(c, &ev->streamQ)
TAILQ_INSERT_HEAD(&ev->streams_list, c, node);
}
++ev->nstreams;
pthread_mutex_unlock(&ev->stream_mtx);
}
void ev_loop_remove_stream(ev_loop_t* ev, ev_stream_t* c)
{
assert(c->ev == ev);
if(c->type == EV_TCP) {
pthread_mutex_lock(&ev->stream_mtx);
TAILQ_REMOVE(&ev->streams_list, c, node);
--ev->nstreams;
pthread_mutex_unlock(&ev->stream_mtx);
}
}
int ev_loop_accept(ev_loop_t* ev, int sfd)
{
int fd;
do {
fd = accept(sfd, NULL, NULL);
} while(fd < 0 && errno == EINTR);
if(fd < 0) {
if(errno != EAGAIN) {
LOG_ERROR("accept fd < 0 error\n");
return -1;
}
}
else {
LOG("incomming connect: %d\n", fd);
if(ev_socket_nonblocing(fd) < 0) {
LOG_ERROR("connect fd set nonblocking error\n");
close(fd);
return -1;
}
if(ev_socket_nodelay(fd) < 0) {
LOG_ERROR("connect fd set nodelay error\n");
close(fd);
return -1;
}
ev_tcp_t* c = ev_tcp_new(ev, fd);
if( c == NULL) {
LOG_ERROR("create client_t error\n");
close(fd);
return -1;
}
ev_add_timeout_watch(c);
ev_event_add_in(ev->evp, fd, (ev_stream_t*)c);
}
return 0;
}
static int ev__stream_force_del(void* ptr)
{
ev_stream_t* c = (ev_stream_t*)ptr;
assert(c->refc == 0);
if(c->type == EV_TCP) {
ev__tcp_free((ev_tcp_t*)c);
} else if(c->type == EV_ASYNC) {
ev__async_free((ev_async_t*)c);
}
ev_loop_remove_stream(c->ev, (ev_stream_t*)c);
free(c);
return 1;
}
void ev_stream_del(ev_stream_t* c)
{
if(__sync_bool_compare_and_swap(&c->stop, 0, 1) ) {
if(c->type == EV_TCP) {
ev__tcp_stop((ev_tcp_t*)c);
} else if(c->type == EV_ASYNC) {
ev__async_stop((ev_async_t*)c);
}
}
if( __sync_fetch_and_sub(&c->refc, 1) == 1) {
if(pthread_self() == c->ev->cur_thread) {
ev__stream_force_del(c);
} else {
LOG("ev_loop_put_pending_queue");
ev_job_t op = { c, ev__stream_force_del };
ev_loop_put_pending_queue(c->ev, &op);
}
}
}
ev_tcp_t* ev_tcp_connect(ev_loop_t* ev, const char* ip, uint16_t port)
{
ev_tcp_t* tcp = NULL;
int fd;
fd = socket(AF_INET, SOCK_STREAM, 0);
if(fd == -1) {
LOG_ERROR("create tcp socket error %d", errno);
return NULL;
}
ev_socket_nonblocing(fd);
ev_socket_reuseaddr(fd);
struct sockaddr_in sa;
struct hostent* ent = gethostbyname(ip);
if(ent == NULL) {
LOG_ERROR("gethostbyname(%s) error: %d", ip, errno);
close(fd);
return NULL;
}
bcopy((char *)ent->h_addr,
(char *)&sa.sin_addr.s_addr,
ent->h_length);
sa.sin_family = AF_INET;
sa.sin_port = htons(port);
tcp = ev_tcp_new(ev, fd);
if(tcp == NULL) {
close(fd);
return NULL;
}
if( connect(fd, (const struct sockaddr*)&sa, sizeof(sa)) != 0 ) {
if(errno != EINPROGRESS) {
LOG_ERROR("connect (%s:%d) error: %d", ip, port, errno);
ev_stream_del((ev_stream_t*)tcp);
return NULL;
} else {
tcp->connecting = 1;
return tcp;
}
}
tcp->connected = 1;
return tcp;
}
size_t ev_tcp_write(ev_tcp_t* self, const char* data, size_t len)
{
if(self->connecting == 1) {
self->connecting = 0;
self->connected = 1;
if(self->connect_cb) {
self->connect_cb(self);
}
}
if(data && len) {
if(ev_buf_empty(self->out_buf)) {
ev_event_add_out(self->ev->evp, self->fd, (ev_stream_t*)self);
}
return ev_buf_append( self->out_buf, data, len);
}
return 0;
}
static void* work_run(void* arg)
{
ev_thread_pool_t* pool = (ev_thread_pool_t*)arg;
while(!pool->exit) {
ev_job_t* job;
pthread_mutex_lock(&pool->mtx);
while(ev_array_n(&pool->jobs) == 0) {
++pool->idle_cnt;
pthread_cond_wait(&pool->cond, &pool->mtx);
--pool->idle_cnt;
if(pool->exit) {
goto exit;
}
}
LOG("run thread pool job");
job = ev_array_pop(&pool->jobs);
pthread_mutex_unlock(&pool->mtx);
if(job->cb) {
job->cb(job->ptr);
}
}
exit:
return NULL;
}
ev_thread_pool_t* ev_thread_pool_new(int nthread)
{
int i = 0;
ev_thread_pool_t* pool = (ev_thread_pool_t*)malloc(sizeof(ev_thread_pool_t) + sizeof(pthread_t) * nthread );
if(pool == NULL) {
perror("create thread pool error");
exit(0);
}
bzero(pool, sizeof(ev_thread_pool_t) + sizeof(pthread_t) * nthread);
pool->init_queue_cnt = 16;
pthread_mutex_init(&pool->mtx, NULL);
pthread_cond_init(&pool->cond, NULL);
ev_array_init(&pool->jobs, pool->init_queue_cnt, sizeof(ev_job_t));
pool->thread_cnt = nthread;
for(i = 0; i < nthread; ++i) {
pthread_create(&pool->thread[i], NULL, work_run, (void*)pool);
}
return pool;
}
void ev_thread_pool_post(ev_thread_pool_t* self, ev_job_t* op)
{
ev_job_t* job;
LOG("enter ev_thread_pool_post");
pthread_mutex_lock(&self->mtx);
job = ev_array_push(&self->jobs);
*job = *op;
if(self->idle_cnt > 0) {
LOG("enter pthread_cond_signal");
pthread_cond_signal(&self->cond);
}
pthread_mutex_unlock(&self->mtx);
}
ev_thread_pool_t* ev_default_thread_pool()
{
static ev_thread_pool_t* pool = NULL;
if(pool == NULL) {
pool = ev_thread_pool_new(ev_default_config()->default_npool);
}
return pool;
}
void ev_thread_pool_destroy(ev_thread_pool_t* pool)
{
int i =0;
pool->exit = 1;
pthread_cond_broadcast(&pool->cond);
for(; i < pool->thread_cnt; ++i) {
pthread_join(pool->thread[i], NULL);
}
pthread_mutex_destroy(&pool->mtx);
pthread_cond_destroy(&pool->cond);
free(pool);
}
ev_event_t* ev_event_new(int nevensts, event_cb_pt cb)
{
int evfd;
ev_event_t* ev = NULL;
#ifdef USE_KQUEUE
evfd = kqueue();
#else
evfd = epoll_create(1024);
#endif
if(evfd < 0) {
perror("create evfd error\n");
exit(0);
}
ev = (ev_event_t*)malloc(sizeof(ev_event_t));
if(ev == NULL) {
close(evfd);
return NULL;
}
bzero(ev, sizeof(ev_event_t));
#ifdef USE_KQUEUE
ev->events = (struct kevent*)malloc(sizeof(struct kevent) * nevensts);
#else
ev->events = (struct epoll_event*)malloc(sizeof(struct epoll_event) * nevensts);
#endif
if(ev->events == NULL) {
close(evfd);
free(ev);
return NULL;
}
ev->evfd = evfd;
ev->nevents = nevensts;
ev->ev_cb = cb;
return ev;
}
void ev_event_destroy(ev_event_t* self) {
if(self->events) {
free(self->events);
}
close(self->evfd);
free(self);
}
static int ev__event_op(ev_event_t* self, int fd, int ev, int op, void* data)
{
#ifdef USE_KQUEUE
struct kevent ke;
int kop = (op == kEventOpAdd) ? EV_ADD | EV_CLEAR : EV_DELETE;
if(ev & kEventRead) {
EV_SET(&ke, fd, EVFILT_READ, kop, 0, 0, data);
if (kevent(self->evfd, &ke, 1, NULL, 0, NULL) == -1) {
return -1;
}
}
if(ev & kEventWrite) {
EV_SET(&ke, fd, EVFILT_WRITE, kop, 0, 0, data);
if (kevent(self->evfd, &ke, 1, NULL, 0, NULL) == -1) {
return -1;
}
}
#else
struct epoll_event evp;
int kop = (op == kEventOpAdd) ? EPOLL_CTL_ADD : EPOLL_CTL_DEL;
bzero(&evp, sizeof(evp));
evp.data.fd = fd;
evp.data.ptr = data;
if(ev & kEventRead) {
evp.events = EPOLLIN;
if( epoll_ctl(self->evfd, kop, fd, &evp) < 0) {
if(errno == EEXIST) {
if( epoll_ctl(self->evfd, EPOLL_CTL_MOD, fd, &evp) < 0) {
return -1;
}
} else {
return -1;
}
}
}
if(ev & kEventWrite) {
evp.events = EPOLLOUT;
if( epoll_ctl(self->evfd, kop, fd, &evp) < 0) {
if(errno == EEXIST) {
if( epoll_ctl(self->evfd, EPOLL_CTL_MOD, fd, &evp) < 0) {
return -1;
}
} else {
return -1;
}
}
}
#endif
return 0;
}
int ev_event_add_in(ev_event_t* self, int fd, ev_stream_t* c)
{
if(c && c->recv_active) {
return 0;
}
if( ev__event_op(self, fd, kEventRead, kEventOpAdd, c) < 0 ) {
LOG_ERROR("ev_event_add_in error");
return -1;
} else {
if(c) {
c->recv_active = 1;
}
}
return 0;
}
int ev_event_del_in(ev_event_t* self, int fd, ev_stream_t* c)
{
if(c && c->recv_active == 0) {
return 0;
}
if(ev__event_op(self, fd, kEventRead, kEventOpDel, NULL) < 0) {
LOG_ERROR("ev_event_del_in error");
return -1;
} else {
if(c) {
c->recv_active = 0;
}
}
return 0;
}
int ev_event_add_out(ev_event_t* self, int fd, ev_stream_t* c)
{
if(c && c->send_active) {
return 0;
}
if( ev__event_op(self, fd, kEventWrite, kEventOpAdd, c) < 0) {
LOG_ERROR("ev_event_add_out error");
} else {
if(c) {
c->send_active = 1;
}
}
return 0;
}
int ev_event_del_out(ev_event_t* self, int fd, ev_stream_t* c)
{
if(c && c->send_active == 0) {
return 0;
}
if(ev__event_op(self, fd, kEventWrite, kEventOpDel, NULL) < 0) {
LOG_ERROR("ev_event_del_out error");
} else {
if(c) {
c->send_active = 0;
}
}
return 0;
}
int ev_event_add_all(ev_event_t* self, int fd, ev_stream_t* c)
{
if(c && c->send_active == 1 && c->recv_active == 1) {
return 0;
}
if(c && c->send_active == 0 && c->recv_active == 0) {
if(ev__event_op(self, fd, kEventAll, kEventOpAdd, c) < 0) {
LOG_ERROR("ev_event_add_all error");
return -1;
} else if(c){
c->send_active = 1;
c->recv_active = 1;
return 0;
}
} else if(c && c->send_active == 0) {
return ev_event_add_out(self, fd, c);
} else if(c){
return ev_event_add_in(self, fd, c);
}
return -1;
}
int ev_event_resize(ev_event_t* self, int n)
{
if(self->nevents < n) {
#ifdef USE_KQUEUE
self->events = (struct kevent*)realloc(self->events, sizeof(struct kevent) * n);
#else
self->events = (struct epoll_event*)realloc(self->events, sizeof(struct epoll_event) * n);
#endif
}
return 0;
}
int ev_event_poll(ev_event_t* self, int timeout)
{
int n = 0;
int i = 0;
#ifdef USE_KQUEUE
do {
if(timeout < 0) {
n = kevent(self->evfd, NULL, 0, self->events, self->nevents, NULL);
} else {
struct timespec time = { timeout / 1000L, (timeout % 1000L) * 1000000L };
n = kevent(self->evfd, NULL, 0, self->events, self->nevents, &time);
}
}while(n < 0 && errno == EINTR);
if(n > 0) {
for(i = 0; i < n; ++i) {
int mask = 0;
struct kevent* e = self->events + i;
if(e->flags & EV_ERROR) {
if (e->data == EBADF || e->data == EINVAL ||
e->data == ENOENT || e->data == EINTR) {
continue;
}
mask |= kEventError;
}
if(e->filter & EVFILT_READ) mask |= kEventRead;
if(e->filter & EVFILT_WRITE) mask |= kEventWrite;
if(self->ev_cb && mask != 0) {
self->ev_cb(e->udata, mask);
}
}
if(n == self->nevents) {
ev_event_resize(self, self->nevents * 2);
}
return n;
}
if(n == 0) {
if(timeout == -1) {
LOG_ERROR("ev_event_poll kevent error");
return -1;
}
return 0;
}
return -1;
#else
do {
n = epoll_wait(self->evfd, self->events, self->nevents, timeout);
}while(n < 0 && errno == EINTR);
if( n > 0) {
for(i = 0; i < n; ++i) {
int mask = 0;
struct epoll_event* e = self->events + i;
if(e->events & EPOLLERR) mask |= kEventError;
if(e->events & EPOLLIN|EPOLLHUP) mask |= kEventRead;
if(e->events & EPOLLOUT) mask |= kEventWrite;
if(self->ev_cb && mask != 0) {
self->ev_cb(e->data.ptr, mask);
}
}
if(n == self->nevents) {
ev_event_resize(self, self->nevents * 2);
}
return n;
}
if(n == 0) {
if(timeout == -1) {
LOG_ERROR("ev_event_poll epoll wait error");
return -1;
}
return 0;
}
return -1;
#endif
}
//////////////////////////////////////////////////////////////////////////////////////////////////
////////////////////// config //////////////////////////////////////////////////////////////////////
static inline const char* ev_config_skip_start_space(const char* line) {
const char* pl = line;
if(pl == NULL) {
return NULL;
}
while (isspace(*pl)) pl++;
return pl;
}
static inline const char* ev_config_trim_end_space(const char* line) {
const char* pl = line;
if(pl == NULL) {
return NULL;
}
pl = pl + strlen(line) - 1;
while (isspace(*pl)) pl--;
return pl;
}
static inline const char* ev_config_skip_comment_empty(const char* line)
{
const char* pl = ev_config_skip_start_space(line);
if(pl && *pl == '\0') {
return NULL;
}
if(pl && *pl == '#') {
return NULL;
}
return pl;
}
static inline int ev_config_read_int32(const char* line, const char* key, int32_t* value)
{
const char* pl = ev_config_skip_start_space(line);
if(strncasecmp(pl, key, strlen(key)) == 0) {
pl += strlen(key);
pl = ev_config_skip_start_space(pl);
if(*pl++ != '=') {
return 0;
}
*value = atoi(pl);
return 0;
}
return -1;
}
static inline int ev_config_read_string(const char* line, const char* key, char* value, int len)
{
const char* pl = ev_config_skip_start_space(line);
if(strncasecmp(pl, key, strlen(key)) == 0) {
pl += strlen(key);
pl = ev_config_skip_start_space(pl);
if(*pl++ != '=') {
return 0;
}
const char* end = ev_config_trim_end_space(line);
if(len < end - pl) {
perror("read_string buf size too small");
exit(0);
}
strncpy(value, pl, end-pl);
return 0;
}
return -1;
}
static void ev_config_print(const char* var, ev_config_t* cfg)
{
LOG_SAFE("%s set:\n"
"====================================\n"
"daemon = %d\n"
"logfile = %s\n"
"pidfile = %s\n"
"loglevel = %d\n"
"max_nconnect = %d\n"
"max_thread = %d\n"
"npool = %d\n"
"buf trunk size = %d\n"
"max timeout = %d\n"
"server port = %d\n"
"====================================",
var,
cfg->daemon,
cfg->logfile,
cfg->pidfile,
cfg->log_level,
cfg->max_nconnect,
cfg->max_nthread,
cfg->default_npool,
cfg->buf_trunk_size,
cfg->max_timeout,
cfg->port
);
}
static int ev_config_set_default(ev_config_t* cfg)
{
char cwd[256] = {0};
bzero(cfg, sizeof(ev_config_t));
cfg->daemon = 0;
getcwd(cwd, sizeof(cwd));
strcat(cfg->logfile, cwd);
strcat(cfg->logfile, "/a.log");
cfg->log_level = EV_LOG_DEBUG;
cfg->max_nconnect = 1024;
cfg->max_nthread = 2;
cfg->max_nwork_thread = 1;
cfg->default_npool = 3;
cfg->port = 7070;
cfg->buf_trunk_size = 4096;
strcat(cfg->pidfile, cwd);
strcat(cfg->pidfile, "/a.pid");
ev_config_print("default", cfg);
return 0;
}
ev_config_t* ev_default_config()
{
static ev_config_t* config = NULL;
if(config == NULL) {
config = malloc(sizeof(*config));
if(config == NULL) {
LOG_SAFE("malloc config error");
}
ev_config_set_default(config);
}
return config;
}
static int ev_config_load(const char* path)
{
FILE* f = fopen(path, "r");
char line[256] = {0};
if(f == NULL) {
LOG_SAFE("load %s config error", path);
return -1;
}
ev_config_t* config = ev_default_config();
do {
bzero(line, sizeof(line));
if(!fgets(line, sizeof(line), f)) {
break;
}
const char* pl = ev_config_skip_comment_empty(line);
if(!pl) {
continue;
}
if(ev_config_read_int32(pl, MAX_THREAD, &config->max_nthread) == 0) {
if(config->max_nthread < 1) {
//
}
continue;
}
if(ev_config_read_int32(pl, SERV_PORT, (int*)&config->port) == 0) {
continue;
}
if(ev_config_read_int32(pl, MAX_TIMEOUT, &config->max_timeout) == 0) {
continue;
}
if(ev_config_read_int32(pl, MAX_CONNECTS, &config->max_nconnect) == 0) {
continue;
}
if(ev_config_read_int32(pl, DAEMON, &config->daemon) == 0) {
continue;
}
if(ev_config_read_string(pl, LOG_FILE_PATH, config->logfile, sizeof(config->logfile)) == 0) {
continue;
}
if(ev_config_read_string(pl, PID_FILE_PATH, config->pidfile, sizeof(config->pidfile)) == 0) {
continue;
}
if(ev_config_read_string(pl, DEFAULT_POOL_SIZE, config->pidfile, sizeof(config->pidfile)) == 0) {
continue;
}
LOG_SAFE("unknow config %s", line);
}while(1);
fclose(f);
ev_config_print("custom", config);
return 0;
}
static int on_new_connect(void* data, unsigned int mask)
{
static int curloop = -1;
if(mask & kEventRead) {
ev_server_t* server = (ev_server_t*)data;
ev_loop_accept(server->evloop[++curloop%ev_default_config()->max_nthread], server->sfd);
}
return 0;
}
static void ev_sigint_handler(int signum)
{
LOG_ENTER_FN;
exit_main_loop = 1;
}
static void ev_init_signal()
{
struct sigaction saint;
signal(SIGPIPE, SIG_IGN);
sigemptyset(&saint.sa_mask);
saint.sa_flags = 0;
saint.sa_handler = ev_sigint_handler;
sigaction(SIGQUIT, &saint, NULL);
sigaction(SIGTERM, &saint, NULL);
sigaction(SIGINT, &saint, NULL);
sigaction(SIGUSR1, &saint, NULL);
}
int ev_server_run(const char* config_filepath, pre_server_run_pt pre_run)
{
int i;
if(config_filepath != NULL) {
if(0 != ev_config_load(config_filepath)) {
perror("load config file error");
return -1;
}
}
if(ev_default_config()->daemon == 1) {
ev_daemon();
}
ev_init_signal();
ev_config_t* config = ev_default_config();
ev_server_t* server = malloc(sizeof(ev_server_t));
if(server == NULL) {
perror("malloc serer error");
exit(0);
}
bzero((void*)server, sizeof(ev_server_t));
server->evloop = (ev_loop_t**)malloc(config->max_nthread * sizeof(ev_loop_t*));
if(server->evloop == NULL) {
perror("malloc ev_loop_t error");
exit(0);
}
for(i =0; i < config->max_nthread; ++i) {
server->evloop[i] = ev_loop_new();
if(server->evloop[i] == NULL) {
perror("gev[i] = evloop_t_new() error");
exit(1);
}
}
ev_default_thread_pool();
server->sfd = ev_socket_listen(NULL, config->port);
if(server->sfd == -1) {
perror("create server sfd error");
exit(0);
}
server->evbase = ev_event_new(1, on_new_connect);
ev__event_op(server->evbase, server->sfd, kEventRead, kEventOpAdd, (void*)server);
if(pre_run) {
pre_run(server);
}
while(!exit_main_loop) {
ev_event_poll(server->evbase, 200);
}
for(i =0; i < config->max_nthread; ++i) {
server->evloop[i]->exit = 1;
pthread_join(server->evloop[i]->cur_thread, NULL);
ev_loop_del(server->evloop[i]);
}
close(server->sfd);
free(server);
return 0;
}
C
1
https://gitee.com/linxyruffy/ft_event.git
git@gitee.com:linxyruffy/ft_event.git
linxyruffy
ft_event
ft_event
master

搜索帮助