EPOLL及消息队列实现

#include "smtpd_mock.h"
char* strsub (char *instr, unsigned start, unsigned end)
{
unsigned n = end - start;
char * outstr = (char *)malloc(n+1);
//bzero(outstr,n+1);
strncpy (outstr, instr + start, n);
outstr[n] = 0;
return outstr;
}
int setnonblocking(int sockfd)
{
if (fcntl(sockfd, F_SETFL, fcntl(sockfd, F_GETFD, 0)|O_NONBLOCK) == -1)
{
return -1;
}
return 0;
}
void smtp_echo(void* data)
{
int socket = *(int*)data;
char ebuf[128],buffer[BUFFER_SIZE];
int length = 0, z;
regex_t reg;
regmatch_t pm[10];
const size_t nmatch = 10;
const char * split = "/r/n";
char * pline, * cmd;

z = regcomp (®, smtp_cmd_format, REG_EXTENDED);
if (z != 0){
regerror (z,®, ebuf, sizeof (ebuf));
fprintf (stderr, "%s: regcomp()/n", ebuf);
return;
}
{
while (1) {
bzero(buffer,BUFFER_SIZE);
length = recv(socket,buffer,BUFFER_SIZE,0);
if (length == -1) {
if(errno == EAGAIN){
break;
}
syslog(LOG_ERR,"recv - %m");
break;
}
syslog(LOG_DEBUG,"%s",buffer);
pline = strtok (buffer,split);
while(pline!=NULL) {
syslog(LOG_DEBUG,"%s/n",pline);
if (0==(strcasecmp(pline, "."))){
smtp_cmd("HELO");
continue;
}
z = regexec (®, pline, nmatch, pm, 0);
if (z == REG_NOMATCH)
{
// do nothing;
}
else if (z != 0)
{
regerror (z,®, ebuf, sizeof (ebuf));
fprintf (stderr, "%s: regexec('%s')/n", ebuf, pline);
return ;
}
if(pm[1].rm_so != -1)
{
cmd = strsub (pline, pm[1].rm_so, pm[1].rm_eo);
syslog(LOG_NOTICE,"cmd => %s/n", cmd);
if(pm[2].rm_so != -1)
{
syslog(LOG_NOTICE,"other content => %s/n", strsub (pline, pm[2].rm_so, pm[2].rm_eo));
}

smtp_cmd(cmd,socket);
}
pline = strtok(NULL,split);
}

if(length < BUFFER_SIZE)
break;
}
}

regfree (®);
return;
}
void smtp_cmd(char * cmd,int socket)
{
char buffer[BUFFER_SIZE];
bzero(buffer, BUFFER_SIZE);
if(0 == (strcasecmp(cmd,"HELO")))
{
strcpy(buffer,"250 Regards from CharlesCui/r/n");
send(socket,buffer,strlen(buffer),0);
} else if(0==(strcasecmp(cmd,"QUIT")))
{
strcpy(buffer,"221 QUIT OK/r/n");
send(socket,buffer,strlen(buffer),0);
close(socket);
epoll_ctl(kdpfd, EPOLL_CTL_DEL, socket, &ev);
} else if(0==(strcasecmp(cmd,"NOOP")))
{
strcpy(buffer,"250 NOOP/r/n");
send(socket,buffer,strlen(buffer),0);
} else if(0==(strcasecmp(cmd,"DATA")))
{
strcpy(buffer,"354 End data with ./r/n");
send(socket,buffer,strlen(buffer),0);
} else if(0==(strcasecmp(cmd,"EHLO")))
{
strcpy(buffer,"334 250-mail/r/n250-PIPELINING/r/n250-AUTH LOGIN PLAIN/r/n250-AUTH=LOGIN PLAIN/r/n250 8BITMI/r/n");
send(socket,buffer,strlen(buffer),0);
} else if(0==(strcasecmp(cmd,"AUTH")))
{
strcpy(buffer,"334 dXNlcm5hbWU6/r/n");
send(socket,buffer,strlen(buffer),0);
} else if(0==(strcasecmp(cmd, "MAIL")))
{
strcpy(buffer,"250 Mail Ok/r/n");
send(socket,buffer,strlen(buffer),0);
} else if(0==(strcasecmp(cmd, "RCPT")))
{
strcpy(buffer,"250 Rcpt Ok/r/n");
send(socket,buffer,strlen(buffer),0);
} else if(0==(strcasecmp(cmd,"220")))
{
strcpy(buffer,"220 Welcome to CharlesCui's smtpd mock server./r/n");
send(socket,buffer,strlen(buffer),0);
} else
{
strcpy(buffer,"");
send(socket,buffer,strlen(buffer),0);
syslog(LOG_NOTICE,"Error smtp command.");
}
}
【EPOLL及消息队列实现】int init_smtp(int port)
{
struct sockaddr_in *server_addr;
server_addr = malloc(sizeof(struct sockaddr_in));
server_addr->sin_family = AF_INET;
server_addr->sin_addr.s_addr = htons(INADDR_ANY);
server_addr->sin_port = htons(port);

int server_socket = socket(AF_INET,SOCK_STREAM,0);
syslog(LOG_NOTICE,"init_smtp:server_socket => %d/n",server_socket);
setnonblocking(server_socket);
if( server_socket < 0)
{
syslog(LOG_ERR,"Create Socket Failed! - %m/n");
exit(1);
}

if( bind(server_socket,(struct sockaddr*)server_addr,sizeof(struct sockaddr_in)))
{
syslog(LOG_ERR,"Server Bind Port : %d Failed! - %m/n", port);
exit(1);
}

if ( listen(server_socket, g_listen_size) )
{
syslog(LOG_ERR,"Server Listen Failed! - %m/n");
exit(1);
}

struct rlimit rt;
rt.rlim_max = rt.rlim_cur = g_epoll_size;
if (setrlimit(RLIMIT_NOFILE, &rt) == -1)
{
syslog(LOG_ERR,"setrlimit - %m");
exit(1);
}
else
{
syslog(LOG_NOTICE,"设置系统资源参数成功!/n");
}
return server_socket;
}
void block_queue(void * param)
{
/*
姑娘们,排好对,等客了!
老鸨吩咐要做什么都知道了吗?(func为回调函数)
*/
void(* func)(void* );
int fd;
block_queue_node_t *head_node;

//param是全局变量bqp
block_queue_param_t* bque = (block_queue_param_t*)param;
func = bque->func;

for(; ; )
{
pthread_mutex_lock(&bque->mutex);
pthread_cond_wait(&bque->cond,&bque->mutex);
/*
来客啦!
*/
if(list_empty(&head))
{
//哪个小二瞎喊,命名一个客人都没来!
pthread_mutex_unlock(&bque->mutex);
continue;
}else
{
/*
大爷,跟我走吧,我那儿宽敞
从链表头部取出一个节点
*/
head_node = list_entry(head.next,block_queue_node_t,list);
fd = head_node->fd;
//大爷,你是我的了!
//同时删除该节点
list_del(&head_node->list);
/**/
free(head_node);
counter--;
}

pthread_mutex_unlock(&bque->mutex);
/*干*/
func(&fd);
}
}
int insert_queue(block_queue_param_t *bque,int fd)
{
//生成临时节点,用来保存fd
block_queue_node_t *b = (block_queue_node_t *)malloc(sizeof(block_queue_node_t));
b->fd = fd;

pthread_mutex_lock(&bque->mutex);

if(counter > g_listen_size){
//当客人数量超过小姐接待能力的时候
//就放弃接待该客人,并且返回1.
//青楼是残酷滴,一个萝卜一个坑
return 1;
}else{
counter++;
}
/*
将新增的节点插入到尾部,
相对应的,block_queue循环体中取节点时,
是从链表头取到的.
*/
list_add_tail(&b->list,&head);
/*
客人到!
姐妹们快抢客啊!(内核用broadcast通知各阻塞的线程)
*/
pthread_cond_broadcast(&bque->cond);
pthread_mutex_unlock(&bque->mutex);

return 0;
}
int init_threads()
{
size_t i=0;
//这是今天的流水账,
//客人们来了都会在这里(head链表)登记的.
//都知道今天各位姑娘要做什么吧(smtp_echo).
//为全局变量bqp设置属性
bqp.func = (void*)smtp_echo;
/*
不许抢客人!(互斥mutex)
说了多少次了,不管男女老幼长短粗细,
只有客人想不到,没有我们做不到!
别只盯着帅哥.
*/
pthread_cond_init(&bqp.cond,NULL);
pthread_mutex_init(&bqp.mutex,NULL);
/*
姑娘们起床了!
初始化各个线程
*/
for( i = 0; i < g_th_count; ++i)
{
pthread_t child_thread;
pthread_attr_t child_thread_attr;
pthread_attr_init(&child_thread_attr);
pthread_attr_setdetachstate(&child_thread_attr,PTHREAD_CREATE_DETACHED);
/*
养你们是要干活(block_queue)的,
没活的时候可以休息着(pthread_cond_wait)
活来了(pthread_cond_signal)就麻利点去接客(head链表非空)
*/
if( pthread_create(&child_thread,&child_thread_attr,(void *)block_queue, (void *)&bqp) < 0 )
{
syslog(LOG_ERR,"pthread_create Failed : %s - %m/n",strerror(errno));
return 1;
}
else
{
syslog(LOG_NOTICE,"pthread_create Success : %d/n",(int)child_thread);
}
}
}
int handler(void* fd)
{
syslog(LOG_NOTICE,"handler:fd => %d/n",*(int *)(fd));
//向全局变量bqp中插入一个节点
//姑娘们听好了,
//大爷都在排队呢,
//一个个麻利点,伺候起来了!
return insert_queue(&bqp,*(int *)fd);
}
void init_daemon(void)
{
int pid;
int i;
if(pid=fork())
exit(0); //是父进程,结束父进程
else if(pid< 0)
exit(1); //fork失败,退出
//是第一子进程,后台继续执行
setsid(); //第一子进程成为新的会话组长和进程组长
//并与控制终端分离
if(pid=fork())
exit(0); //是第一子进程,结束第一子进程
else if(pid< 0)
exit(1); //fork失败,退出
//是第二子进程,继续
//第二子进程不再是会话组长
for(i=0; i< NOFILE; ++i)//关闭打开的文件描述符
close(i);
chdir("/tmp"); //改变工作目录到/tmp
umask(0); //重设文件创建掩模
return;
}
int main(int argc, char **argv)
{
char ch;
int d = 0;
//处理argv
while( ( ch = getopt( argc, argv, "p:t:l:e:d?" ) ) != EOF )
{
switch(ch)
{
case 'p':
printf("SMTPD_PORT =>%s ", optarg);
g_port = atoi(optarg);
break;
case 't':
printf("THREADS_COUNT => %s ", optarg);
g_th_count = atoi(optarg);
break;
case 'l':
printf("LENGTH_OF_LISTEN_QUEUE => %s. ",optarg);
g_listen_size = atol(optarg);
break;
case 'e':
printf("MAX_EPOLL_SIZE => %s. ",optarg);
g_epoll_size = atol(optarg);
break;
case 'd':
printf("RUN AS DAEMON. ");
d = 1;
case '?':
printf("Useage: -p [SMTPD_PORT|8025] -t [THREADS_COUNT|100] -l [LENGTH_OF_LISTEN_QUEUE|1024] -e [MAX_EPOLL_SIZE|1000] -d (RUN AS DAEMON.)/n");
exit(1);
default:
printf("Not support option :%c/n",ch);
exit(2);
}
}
if(d == 1)
init_daemon();
//一天的流水账要记录下来啊
//初始化 syslog
char *ident = "Smtp Mock";
int logopt = LOG_PID | LOG_CONS;
int facility = LOG_USER;
openlog(ident, logopt, facility);
setlogmask(LOG_UPTO(LOG_ERR));

syslog(LOG_INFO,"syslog inited.");
//初始化链表
INIT_LIST_HEAD(&head);
//生成smtp套接字
//本店开张了,欢迎访问
int server_socket = init_smtp(g_port);
int n;

if(init_threads() == 0)
syslog(LOG_NOTICE,"Success full init_threads.");

/*
下面要把本店加入全球领先的企业管理系统中,
该系统节省人力资源,
不需要服务员傻等在门口,
而是客人到了就会通知服务员出来迎宾.
*/
kdpfd = epoll_create(g_epoll_size);
ev.events = EPOLLIN | EPOLLET;
ev.data.fd = server_socket;
if (epoll_ctl(kdpfd, EPOLL_CTL_ADD, server_socket, &ev) < 0) {
fprintf(stderr, "epoll set insertion error: fd=%d < 0",
server_socket);
return -1;
}
//老鸨(主线程)负责拉客,姑娘(子线程)负责接客
for(; ; ) {
struct sockaddr_in local;
socklen_t length = sizeof(local);
int client;
//epoll_wait实现了阻塞,而不是busy loop
nfds = epoll_wait(kdpfd, events, g_epoll_size, -1);
for(n = 0; n < nfds; ++n) {
//判断套接字
//看是熟客还是生客
if(events[n].data.fd == server_socket) {
//新新新新,新来的吧
//你是新新新新新来的吧
client = accept(server_socket, (struct sockaddr *) &local,&length);
//是生客就发一个新的id卡(client)
if(client < 0){
syslog(LOG_ERR,"accept - %m");
continue;
}
setnonblocking(client);
//先跟大爷打声招呼,显得我们姑娘主动些
smtp_cmd("220",client);
ev.events = EPOLLIN | EPOLLET;
ev.data.fd = client;
/*
再发张VIP卡,
把大爷加入VIP客户列表,
享受天上人间的服务
*/
if (epoll_ctl(kdpfd, EPOLL_CTL_ADD, client, &ev) < 0) {
fprintf(stderr, "epoll set insertion error: fd=%d < 0",
client);
return -1;
}
}
else
/*
这位大爷肯定来过好几次了,
否则怎么连后门都知道.
*/
/*
后屋一排姑娘,大爷您慢慢挑
老鸨就不奉陪了,姑娘们伺候着!
*/
if(handler((void *)&events[n].data.fd) != 0)
syslog(LOG_ERR,"handler ret != 0 - %m");
}
}
//打击色..情产业,
//被迫歇业了
close(server_socket);
return 0;
}

    推荐阅读