linux内核select/poll,epoll实现与区别
下面文章在这段时间内研究select/poll/epoll的内核实现的一点心得体会:
select,poll,epoll都是多路复用IO的函数,简单说就是在一个线程里,可以同时处理多个文件描述符的读写。
select/poll的实现很类似,epoll是从select/poll扩展而来,主要是为了解决select/poll天生的缺陷。
epoll在内核版本2.6以上才出现的新的函数,而他们在linux内核中的实现都是十分相似。
这三种函数都需要设备驱动提供poll回调函数,对于套接字而言,他们是tcp_poll,udp_poll和datagram_poll;
对于自己开发的设备驱动而言,是自己实现的poll接口函数。
select实现(2.6的内核,其他版本的内核,应该都相差不多)
应用程序调用select,进入内核调用sys_select,做些简单初始化工作,接着进入core_sys_select,
此函数主要工作是把描述符集合从用户空间复制到内核空间,最终进入do_select,完成其主要的功能。
do_select里,调用poll_initwait,主要工作是注册poll_wait的回调函数为__pollwait,
当在设备驱动的poll回调函数里调用poll_wait,其实就是调用__pollwait,
__pollwait的主要工作是把当前进程挂载到等待队列里,当等待的事件到来就会唤醒此进程。
接着执行for循环,循环里首先遍历每个文件描述符,调用对应描述符的poll回调函数,检测是否就绪,
遍历完所有描述符之后,只要有描述符处于就绪状态,信号中断,出错或者超时,就退出循环,
否则会调用schedule_xxx函数,让当前进程睡眠,一直到超时或者有描述符就绪被唤醒。
接着又会再次遍历每个描述符,调用poll再次检测。
如此循环,直到符合条件才会退出。
以下是2.6.31内核的有关select函数的部分片段:
他们调用关系:
select-->sys_select-->core_sys_select-->do_select
intdo_select(intn,fd_set_bits*fds,structtimespec*end_time)
{
ktime_texpire,*to=NULL;
structpoll_wqueuestable;
poll_table*wait;
intretval,i,timed_out=0;
unsignedlongslack=0;
///这里为了获得集合中的最大描述符,这样可减少循环中遍历的次数。
///也就是为什么linux中select第一个参数为何如此重要了
rcu_read_lock();
retval=max_select_fd(n,fds);
rcu_read_unlock();
if(retval<0)
returnretval;
n=retval;
////初始化poll_table结构,其中一个重要任务是把__pollwait函数地址赋值给它,
poll_initwait(&table);
wait=&table.pt;
if(end_time&&!end_time->tv_sec&&!end_time->tv_nsec){
wait=NULL;
timed_out=1;
}
if(end_time&&!timed_out)
slack=estimate_accuracy(end_time);
retval=0;
///主循环,将会在这里完成描述符的状态轮训
for(;;){
unsignedlong*rinp,*routp,*rexp,*inp,*outp,*exp;
inp=fds->in;outp=fds->out;exp=fds->ex;
rinp=fds->res_in;routp=fds->res_out;rexp=fds->res_ex;
for(i=0;i<n;++rinp,++routp,++rexp){
unsignedlongin,out,ex,all_bits,bit=1,mask,j;
unsignedlongres_in=0,res_out=0,res_ex=0;
conststructfile_operations*f_op=NULL;
structfile*file=NULL;
///select中fd_set以及do_select中的fd_set_bits参数,都是按照位来保存描述符,意思是比如申请一个1024位的内存,
///如果第28位置1,说明此集合有描述符28,
in=*inp++;out=*outp++;ex=*exp++;
all_bits=in|out|ex;//检测读写异常3个集合中有无描述符
if(all_bits==0){
i+=__NFDBITS;
continue;
}
for(j=0;j<__NFDBITS;++j,++i,bit<<=1){
intfput_needed;
if(i>=n)
break;
if(!(bit&all_bits))
continue;
file=fget_light(i,&fput_needed);///通过描述符index获得structfile结构指针,
if(file){
f_op=file->f_op;//通过structfile获得file_operations,这是操作文件的回调函数集合。
mask=DEFAULT_POLLMASK;
if(f_op&&f_op->poll){
wait_key_set(wait,in,out,bit);
mask=(*f_op->poll)(file,wait);//调用我们的设备中实现的poll函数,
//因此,为了能让select正常工作,在我们设备驱动中,必须要提供poll的实现,
}
fput_light(file,fput_needed);
if((mask&POLLIN_SET)&&(in&bit)){
res_in|=bit;
retval++;
wait=NULL;///此处包括以下的,把wait设置为NULL,是因为检测到mask=(*f_op->poll)(file,wait);描述符已经就绪
///无需再把当前进程添加到等待队列里,do_select遍历完所有描述符之后就会退出。
}
if((mask&POLLOUT_SET)&&(out&bit)){
res_out|=bit;
retval++;
wait=NULL;
}
if((mask&POLLEX_SET)&&(ex&bit)){
res_ex|=bit;
retval++;
wait=NULL;
}
}
}
if(res_in)
*rinp=res_in;
if(res_out)
*routp=res_out;
if(res_ex)
*rexp=res_ex;
cond_resched();
}
wait=NULL;//已经遍历完一遍,该加到等待队列的,都已经加了,无需再加,因此设置为NULL
if(retval||timed_out||signal_pending(current))//描述符就绪,超时,或者信号中断就退出循环
break;
if(table.error){//出错退出循环
retval=table.error;
break;
}
/*
*Ifthisisthefirstloopandwehaveatimeout
*given,thenweconverttoktime_tandsettheto
*pointertotheexpiryvalue.
*/
if(end_time&&!to){
expire=timespec_to_ktime(*end_time);
to=&expire;
}
/////让进程休眠,直到超时,或者被就绪的描述符唤醒,
if(!poll_schedule_timeout(&table,TASK_INTERRUPTIBLE,
to,slack))
timed_out=1;
}
poll_freewait(&table);
returnretval;
}
voidpoll_initwait(structpoll_wqueues*pwq)
{
init_poll_funcptr(&pwq->pt,__pollwait);//设置poll_table的回调函数为__pollwait,这样当我们在驱动中调用poll_wait就会调用到__pollwait
........
}
staticvoid__pollwait(structfile*filp,wait_queue_head_t*wait_address,
poll_table*p)
{
...................
init_waitqueue_func_entry(&entry->wait,pollwake);//设置唤醒进程调用的回调函数,当在驱动中调用wake_up唤醒队列时候,
//pollwake会被调用,这里其实就是调用队列的默认函数default_wake_function
//用来唤醒睡眠的进程。
add_wait_queue(wait_address,&entry->wait);//加入到等待队列
}
intcore_sys_select(intn,fd_set__user*inp,fd_set__user*outp,
fd_set__user*exp,structtimespec*end_time)
{
........
//把描述符集合从用户空间复制到内核空间
if((ret=get_fd_set(n,inp,fds.in))||
(ret=get_fd_set(n,outp,fds.out))||
(ret=get_fd_set(n,exp,fds.ex)))
.........
ret=do_select(n,&fds,end_time);
.............
////把do_select返回集合,从内核空间复制到用户空间
if(set_fd_set(n,inp,fds.res_in)||
set_fd_set(n,outp,fds.res_out)||
set_fd_set(n,exp,fds.res_ex))
ret=-EFAULT;
............
}
poll的实现跟select基本差不多,按照
poll-->do_sys_poll-->do_poll-->do_pollfd的调用序列
其中do_pollfd是对每个描述符调用其回调poll状态轮训。
poll比select的好处就是没有描述多少限制,select有1024的限制,描述符不能超过此值,poll不受限制。
我们从上面代码分析,可以总结出select/poll天生的缺陷:
1)每次调用select/poll都需要要把描述符集合从用户空间copy到内核空间,检测完成之后,又要把检测的结果集合从内核空间copy到用户空间
当描述符很多,而且select经常被唤醒,这种开销会比较大
2)如果说描述符集合来回复制不算什么,那么多次的全部描述符遍历就比较恐怖了,
我们在应用程序中,每次调用select/poll都必须首先遍历描述符,把他们加到fd_set集合里,这是应用层的第一次遍历,
接着进入内核空间,至少进行一次遍历和调用每个描述符的poll回调检测,一般可能是2次遍历,第一次没发现就绪描述符,
加入等待队列,第二次是被唤醒,接着再遍历一遍。再回到应用层,我们还必须再次遍历所有描述符,用FD_ISSET检测结果集。
如果描述符很多,这种遍历就很消耗CPU资源了。
3)描述符多少限制,当然poll没有限制,select却有1024的硬性限制,除了修改内核增加1024限制外没别的办法。
既然有这么些缺点,那不是select/poll变得一无是处了,那就大错特错了。
他们依然是代码移植的最好函数,因为几乎所有平台都有对它们的实现提供接口。
在描述符不是太多,他们依然十分出色的完成多路复用IO,
而且如果每个连接上的描述符都处于活跃状态,他们的效率其实跟epoll也差不了多少。
曾经使用多个线程+每个线程采用poll的办法开发TCP服务器,处理文件收发,连接达到几千个,
当时的瓶颈已经不在网络IO,而在磁盘IO了。
我们再来看epoll为了解决select/poll天生的缺陷,是如何实现的。
epoll只是select/poll的扩展,他不是在linux内核中另起炉灶,做颠覆性的设计的,他只是在select的基础上来解决他们的缺陷。
他的底层依然需要设备驱动提供poll回调来作为状态检测基础。
epoll分为三个函数epoll_create,epoll_ctl,epoll_wait。
他们的实现在eventpoll.c代码里。
epoll_create创建epoll设备,用来管理所有添加进去的描述符,epoll_ctl用来添加新的描述符,修改或者删除描述符。
epoll_wait等待描述符事件。
epoll_wait的等待已经不再是轮训方式的等待了,epoll内部有个描述符就绪队列,epoll_wait只检测这个队列即可,
他采用睡眠一会检测一下的方式,如果发现描述符就绪队列不为空,就把此队列中的描述符copy到用户空间,然后返回。
描述符就绪队列里的数据又是从何而来的?
原来使用epoll_ctl添加新描述符时候,epoll_ctl内核实现里会修改两个回调函数,
一个是poll_table结构里的qproc回调函数指针,
在select中是__pollwait函数,在epoll中换成ep_ptable_queue_proc,
当在epoll_ctl中调用新添加的描述符的poll回调时候,底层驱动就会调用poll_wait添加等待队列,
底层驱动调用poll_wait时候,
其实就是调用ep_ptable_queue_proc,此函数会修改等待队列的回调函数为ep_poll_callback,并加入到等待队列头里;
一旦底层驱动发现数据就绪,就会调用wake_up唤醒等待队列,从而ep_poll_callback将被调用,
在ep_poll_callback中会把这个就绪的描述符添加到epoll的描述符就绪队列里,并同时唤醒epoll_wait所在的进程。
如此这般,就是epoll的内核实现的精髓。
看他是如何解决select/poll的缺陷的,首先他通过epoll_ctl的EPOLL_CTL_ADD命令把描述符添加进epoll内部管理器里,
只需添加一次即可,直到用epoll_ctl的EPOLL_CTL_DEL命令删除此描述符为止,
而不像select/poll是每次执行都必须添加,很显然大量减少了描述符在内核和用户空间不断的来回copy的开销。
其次虽然epoll_wait内部也是循环检测,但是它只需检测描述符就绪队列是否为空即可,
比起select/poll必须轮训每个描述符的poll,其开销简直可以忽略不计。
他同时也没描述符多少的限制,只要你机器的内存够大,就能容纳非常多的描述符。
以下是epoll相关部分内核代码片段:
structepitem{
/*RBtreenodeusedtolinkthisstructuretotheeventpollRBtree*/
structrb_noderbn;//红黑树节点,
structepoll_filefdffd;//存储此变量对应的描述符
structepoll_eventevent;//用户定义的结构
/*其他成员*/
};
structeventpoll{
/*其他成员*/
.......
/*Waitqueueusedbyfile->poll()*/
wait_queue_head_tpoll_wait;
/*Listofreadyfiledescriptors*/
structlist_headrdllist;///描述符就绪队列,挂载的是epitem结构
/*RBtreerootusedtostoremonitoredfdstructs*/
structrb_rootrbr;///存储新添加的描述符的红黑树根,此成员用来存储添加进来的所有描述符。挂载的是epitem结构
.........
};
//epoll_create
SYSCALL_DEFINE1(epoll_create1,int,flags)
{
interror;
structeventpoll*ep=NULL;
/*其他代码*/
......
//分配eventpoll结构,这个结构是epoll的灵魂,他包含了所有需要处理得数据。
error=ep_alloc(&ep);
if(error<0)
returnerror;
error=anon_inode_getfd("[eventpoll]",&eventpoll_fops,ep,
flags&O_CLOEXEC);///打开eventpoll的描述符,并把ep存储到file->private_data变量里。
if(error<0)
ep_free(ep);
returnerror;
}
SYSCALL_DEFINE4(epoll_ctl,int,epfd,int,op,int,fd,
structepoll_event__user*,event)
{
/*其他代码*/
.....
ep=file->private_data;
......
epi=ep_find(ep,tfile,fd);///从eventpoll的rbr里查找描述符是fd的epitem,
error=-EINVAL;
switch(op){
caseEPOLL_CTL_ADD:
if(!epi){
epds.events|=POLLERR|POLLHUP;
error=ep_insert(ep,&epds,tfile,fd);//在这个函数里添加新描述符,同时修改重要的回调函数。
//同时还调用描述符的poll,查看就绪状态
}else
error=-EEXIST;
break;
/*其他代码*/
........
}
staticintep_insert(structeventpoll*ep,structepoll_event*event,
structfile*tfile,intfd)
{
...../*其他代码*/
init_poll_funcptr(&epq.pt,ep_ptable_queue_proc);//设置poll_tabe回调函数为ep_ptable_queue_proc
//ep_ptable_queue_proc会设置等待队列的回调指针为ep_epoll_callback,同时添加等待队列。
......../*其他代码*/
revents=tfile->f_op->poll(tfile,&epq.pt);//调用描述符的poll回调,在此函数里ep_ptable_queue_proc会被调用
......./*其他代码*/
ep_rbtree_insert(ep,epi);//把新生成关于epitem添加到红黑树里
....../*其他代码*/
if((revents&event->events)&&!ep_is_linked(&epi->rdllink)){
list_add_tail(&epi->rdllink,&ep->rdllist);//如果上边的poll调用,检测到描述符就绪,添加本描述符到就绪队列里。
if(waitqueue_active(&ep->wq))
wake_up_locked(&ep->wq);
if(waitqueue_active(&ep->poll_wait))
pwake++;
}
....../*其他代码*/
/*Wehavetocallthisoutsidethelock*/
if(pwake)
ep_poll_safewake(&ep->poll_wait);//如果描述符就绪队列不为空,则唤醒epoll_wait所在的进程。
........./*其他代码*/
}
//这个函数设置等待队列回调函数为ep_poll_callback,
//这样到底层有数据唤醒等待队列时候,ep_poll_callback就会被调用,从而把就绪的描述符加到就绪队列。
staticvoidep_ptable_queue_proc(structfile*file,wait_queue_head_t*whead,
poll_table*pt)
{
structepitem*epi=ep_item_from_epqueue(pt);
structeppoll_entry*pwq;
if(epi->nwait>=0&&(pwq=kmem_cache_alloc(pwq_cache,GFP_KERNEL))){
init_waitqueue_func_entry(&pwq->wait,ep_poll_callback);
pwq->whead=whead;
pwq->base=epi;
add_wait_queue(whead,&pwq->wait);
list_add_tail(&pwq->llink,&epi->pwqlist);
epi->nwait++;
}else{
/*Wehavetosignalthatanerroroccurred*/
epi->nwait=-1;
}
}
staticintep_poll_callback(wait_queue_t*wait,unsignedmode,intsync,void*key)
{
intpwake=0;
unsignedlongflags;
structepitem*epi=ep_item_from_wait(wait);
structeventpoll*ep=epi->ep;
........./*其他代码*/
if(!ep_is_linked(&epi->rdllink))
list_add_tail(&epi->rdllink,&ep->rdllist);//把当前就绪的描述epitem结构添加到就绪队列里
........./*其他代码*/
if(pwake)
ep_poll_safewake(&ep->poll_wait);//如果队列不为空,唤醒epoll_wait所在进程
........./*其他代码*/
}
epoll_wait内核代码里主要是调用ep_poll,列出ep_poll部分代码片段:
staticintep_poll(structeventpoll*ep,structepoll_event__user*events,
intmaxevents,longtimeout)
{
intres,eavail;
unsignedlongflags;
longjtimeout;
wait_queue_twait;
........./*其他代码*/
if(list_empty(&ep->rdllist)){
init_waitqueue_entry(&wait,current);
wait.flags|=WQ_FLAG_EXCLUSIVE;
__add_wait_queue(&ep->wq,&wait);
//如果检测到就绪队列为空,添加当前进程到等待队列,并执行否循环
for(;;){
set_current_state(TASK_INTERRUPTIBLE);
if(!list_empty(&ep->rdllist)||!jtimeout)//如果就绪队列不为空,或者超时则退出循环
break;
if(signal_pending(current)){//如果信号中断,退出循环
res=-EINTR;
break;
}
spin_unlock_irqrestore(&ep->lock,flags);
jtimeout=schedule_timeout(jtimeout);//睡眠,知道被唤醒或者超时为止。
spin_lock_irqsave(&ep->lock,flags);
}
__remove_wait_queue(&ep->wq,&wait);
set_current_state(TASK_RUNNING);
}
........./*其他代码*/
if(!res&&eavail&&
!(res=ep_send_events(ep,events,maxevents))&&jtimeout)
gotoretry;
//ep_send_events主要任务是把就绪队列的就绪描述符copy到用户空间的epoll_event数组里,
returnres;
}
可以看到ep_poll既epoll_wait的循环是相当轻松的循环,他只是简单检测就绪队列而已,因此他的开销很小。
我们最后看看描述符就绪时候,是如何通知给select/poll/epoll的,以网络套接字的TCP协议来进行说明。
tcp协议对应的poll回调是tcp_poll,对应的等待队列头是structsock结构里sk_sleep成员,
在tcp_poll中会把sk_sleep加入到等待队列,等待数据就绪。
当物理网卡接收到数据包,引发硬件中断,驱动在中断ISR例程里,构建skb包,把数据copy进skb,接着调用netif_rx
把skb挂载到CPU相关的input_pkt_queue队列,同时引发软中断,在软中断的net_rx_action回调函数里从input_pkt_queue里取出
skb数据包,通过分析,调用协议相关的回调函数,这样层层传递,一直到structsock,此结构里的sk_data_ready回调指针被调用
sk_data_ready指向sock_def_readable函数,sock_def_readable函数其实就是wake_up唤醒sock结构里的sk_sleep。
以上机制,对select/poll/epoll都是一样的,接下来唤醒sk_sleep方式就不一样了,因为他们指向了不同的回调函数。
在select/poll实现中,等待队列回调函数是pollwake其实就是调用default_wake_function,唤醒被select阻塞住的进程。
epoll实现中,等待回调函数是ep_poll_callback,此回调函数只是把就绪描述符加入到epoll的就绪队列里。
所以呢select/poll/epoll其实他们在内核实现中,差别也不是太大,其实都差不多。
epoll虽然效率不错,可以跟windows平台中的完成端口比美,但是移植性太差,
目前几乎就只有linux平台才实现了epoll而且必须是2.6以上的内核版本。