Linux网卡数据包的接收
在探测函数中,设置了netdev->open = e100_open; 指定了设备的open函数为e100_open:
CODE:
static int e100_open(struct net_device *netdev)
{ struct nic *nic = netdev_priv(netdev); int err = 0; netif_carrier_off(netdev); if((err = e100_up(nic))) DPRINTK(IFUP, ERR, "Cannot open interface, aborting.\n"); return err; } 大多数涉及物理设备可以感知信号载波(carrier)的存在,载波的存在意味着设备可以工作 据个例子来讲:当一个用户拔掉了网线,也就意味着信号载波的消失。 netif_carrier_off:关闭载波信号; netif_carrier_on:打开载波信号; netif_carrier_ok:检测载波信号; 对于探测网卡网线是否连接,这一组函数被使用得较多; 接着,调用e100_up函数启动网卡,这个“启动”的过程,最重要的步骤有: 1、调用request_irq向内核注册中断; 2、调用netif_wake_queue函数来重新启动传输队例;
CODE:
static int e100_up(struct nic *nic)
{ int err; if((err = e100_rx_alloc_list(nic))) return err; if((err = e100_alloc_cbs(nic))) goto err_rx_clean_list; if((err = e100_hw_init(nic))) goto err_clean_cbs; e100_set_multicast_list(nic->netdev); e100_start_receiver(nic, 0); mod_timer(&nic->watchdog, jiffies); if((err = request_irq(nic->pdev->irq, e100_intr, SA_SHIRQ, nic->netdev->name, nic->netdev))) goto err_no_irq; netif_wake_queue(nic->netdev); netif_poll_enable(nic->netdev); /* enable ints _after_ enabling poll, preventing a race between * disable ints+schedule */ e100_enable_irq(nic); return 0; err_no_irq: del_timer_sync(&nic->watchdog); err_clean_cbs: e100_clean_cbs(nic); err_rx_clean_list: e100_rx_clean_list(nic); return err; } 这样,中断函数e100_intr将被调用; 三、网卡中断 从本质上来讲,中断,是一种电信号,当设备有某种事件发生的时候,它就会产生中断,通过总线把电信号发送给中断控制器,如果中断的线是激活的,中断控制器 就把电信号发送给处理器的某个特定引脚。处理器于是立即停止自己正在做的事,跳到内存中内核设置的中断处理程序的入口点,进行中断处理。 在内核中断处理中,会检测中断与我们刚才注册的中断号匹配,于是,注册的中断处理函数就被调用了。 当需要发/收数据,出现错误,连接状态变化等,网卡的中断信号会被触发。当接收到中断后,中断函数读取中断状态位,进行合法性判断,如判断中断信号是否是自己的等,然后,应答设备中断——OK,我已经知道了,你回去继续工作吧…… 接着,它就屏蔽此中断,然后netif_rx_schedule函数接收,接收函数 会在未来某一时刻调用设备的poll函数(对这里而言,注册的是e100_poll)实现设备的轮询:
CODE:
static irqreturn_t e100_intr(int irq, void *dev_id, struct pt_regs *regs)
{ struct net_device *netdev = dev_id; struct nic *nic = netdev_priv(netdev); u8 stat_ack = readb(&nic->csr->scb.stat_ack); DPRINTK(INTR, DEBUG, "stat_ack = 0x%02X\n", stat_ack); if(stat_ack == stat_ack_not_ours || /* Not our interrupt */ stat_ack == stat_ack_not_present) /* Hardware is ejected */ return IRQ_NONE; /* Ack interrupt(s) */ writeb(stat_ack, &nic->csr->scb.stat_ack); /* We hit Receive No Resource (RNR); restart RU after cleaning */ if(stat_ack & stat_ack_rnr) nic->ru_running = RU_SUSPENDED; e100_disable_irq(nic); netif_rx_schedule(netdev); return IRQ_HANDLED; } 对于数据包的接收而言,我们关注的是poll函数中,调用e100_rx_clean进行数据的接收:
CODE:
static int e100_poll(struct net_device *netdev, int *budget)
{ struct nic *nic = netdev_priv(netdev); /* * netdev->quota是当前CPU能够从所有接口中接收数据包的最大数目,budget是在 * 初始化阶段分配给接口的weight值,轮询函数必须接受二者之间的最小值。表示 * 轮询函数本次要处理的数据包个数。 */ unsigned int work_to_do = min(netdev->quota, *budget); unsigned int work_done = 0; int tx_cleaned; /*进行数据包的接收和传输*/ e100_rx_clean(nic, &work_done, work_to_do); tx_cleaned = e100_tx_clean(nic); /*接收和传输完成后,就退出poll模块,重启中断*/ /* If no Rx and Tx cleanup work was done, exit polling mode. */ if((!tx_cleaned && (work_done == 0)) || !netif_running(netdev)) { netif_rx_complete(netdev); e100_enable_irq(nic); return 0; } *budget -= work_done; netdev->quota -= work_done; return 1; } static inline void e100_rx_clean(struct nic *nic, unsigned int *work_done, unsigned int work_to_do) { struct rx *rx; int restart_required = 0; struct rx *rx_to_start = NULL; /* are we already rnr? then pay attention!!! this ensures that * the state machine progression never allows a start with a * partially cleaned list, avoiding a race between hardware * and rx_to_clean when in NAPI mode */ if(RU_SUSPENDED == nic->ru_running) restart_required = 1; /* Indicate newly arrived packets */ for(rx = nic->rx_to_clean; rx->skb; rx = nic->rx_to_clean = rx->next) { int err = e100_rx_indicate(nic, rx, work_done, work_to_do); if(-EAGAIN == err) { /* hit quota so have more work to do, restart once * cleanup is complete */ restart_required = 0; break; } else if(-ENODATA == err) break; /* No more to clean */ } /* save our starting point as the place we'll restart the receiver */ if(restart_required) rx_to_start = nic->rx_to_clean; /* Alloc new skbs to refill list */ for(rx = nic->rx_to_use; !rx->skb; rx = nic->rx_to_use = rx->next) { if(unlikely(e100_rx_alloc_skb(nic, rx))) break; /* Better luck next time (see watchdog) */ } if(restart_required) { // ack the rnr? writeb(stat_ack_rnr, &nic->csr->scb.stat_ack); e100_start_receiver(nic, rx_to_start); if(work_done) (*work_done)++; } } 四、网卡的数据接收 内核如何从网卡接受数据,传统的经典过程: 1、数据到达网卡; 2、网卡产生一个中断给内核; 3、内核使用I/O指令,从网卡I/O区域中去读取数据; 我们在许多网卡驱动中,都可以在网卡的中断函数中见到这一过程。 但是,这一种方法,有一种重要的问题,就是大流量的数据来到,网卡会产生大量的中断,内核在中断上下文中,会浪费大量的资源来处理中断本身。所以,一个问 题是,“可不可以不使用中断”,这就是轮询技术,所谓NAPI技术,说来也不神秘,就是说,内核屏蔽中断,然后隔一会儿就去问网卡,“你有没有数据 啊?”…… 从这个描述本身可以看到,哪果数据量少,轮询同样占用大量的不必要的CPU资源,大家各有所长吧,呵呵…… OK,另一个问题,就是从网卡的I/O区域,包括I/O寄存器或I/O内存中去读取数据,这都要CPU去读,也要占用CPU资源,“CPU从I/O区域 读,然后把它放到内存(这个内存指的是系统本身的物理内存,跟外设的内存不相干,也叫主内存)中”。于是自然地,就想到了DMA技术——让网卡直接从主内 存之间读写它们的I/O数据,CPU,这儿不干你事,自己找乐子去: 1、首先,内核在主内存中为收发数据建立一个环形的缓冲队列(通常叫DMA环形缓冲区)。 2、内核将这个缓冲区通过DMA映射,把这个队列交给网卡; 3、网卡收到数据,就直接放进这个环形缓冲区了——也就是直接放进主内存了;然后,向系统产生一个中断; 4、内核收到这个中断,就取消DMA映射,这样,内核就直接从主内存中读取数据; ——呵呵,这一个过程比传统的过程少了不少工作,因为设备直接把数据放进了主内存,不需要CPU的干预,效率是不是提高不少? 对应以上4步,来看它的具体实现: 1、分配环形DMA缓冲区 Linux内核中,用skb来描述一个缓存,所谓分配,就是建立一定数量的skb,然后把它们组织成一个双向链表; 2、建立DMA映射 内核通过调用 dma_map_single(struct device *dev,void *buffer,size_t size,enum dma_data_direction direction) 建立映射关系。 struct device *dev,描述一个设备; buffer:把哪个地址映射给设备;也就是某一个skb——要映射全部,当然是做一个双向链表的循环即可; size:缓存大小; direction:映射方向——谁传给谁:一般来说,是“双向”映射,数据在设备和内存之间双向流动; 对于PCI设备而言(网卡一般是PCI的),通过另一个包裹函数pci_map_single,这样,就把buffer交给设备了!设备可以直接从里边读/取数据。 3、这一步由硬件完成; 4、取消映射 dma_unmap_single,对PCI而言,大多调用它的包裹函数pci_unmap_single,不取消的话,缓存控制权还在设备手里,要调用它,把主动权掌握在CPU手里——因为我们已经接收到数据了,应该由CPU把数据交给上层网络栈; 当然,不取消之前,通常要读一些状态位信息,诸如此类,一般是调用 dma_sync_single_for_cpu() 让CPU在取消映射前,就可以访问DMA缓冲区中的内容。 关于DMA映射的更多内容,可以参考《Linux设备驱动程序》“内存映射和DMA”章节相关内容! OK,有了这些知识,我们就可以来看e100的代码了,它跟上面讲的步骤基本上一样的——绕了这么多圈子,就是想绕到e100上面了,呵呵! 在e100_open函数中,调用e100_up,我们前面分析它时,略过了一个重要的东东,就是环形缓冲区的建立,这一步,是通过 e100_rx_alloc_list函数调用完成的:
CODE:
static int e100_rx_alloc_list(struct nic *nic)
{ struct rx *rx; unsigned int i, count = nic->params.rfds.count; nic->rx_to_use = nic->rx_to_clean = NULL; nic->ru_running = RU_UNINITIALIZED; /*结构struct rx用来描述一个缓冲区节点,这里分配了count个*/ if(!(nic->rxs = kmalloc(sizeof(struct rx) * count, GFP_ATOMIC))) return -ENOMEM; memset(nic->rxs, 0, sizeof(struct rx) * count); /*虽然是连续分配的,不过还是遍历它,建立双向链表,然后为每一个rx的skb指针分员分配空间 skb用来描述内核中的一个数据包,呵呵,说到重点了*/ for(rx = nic->rxs, i = 0; i < count; rx++, i++) { rx->next = (i + 1 < count) ? rx + 1 : nic->rxs; rx->prev = (i == 0) ? nic->rxs + count - 1 : rx - 1; if(e100_rx_alloc_skb(nic, rx)) { /*分配缓存*/ e100_rx_clean_list(nic); return -ENOMEM; } } nic->rx_to_use = nic->rx_to_clean = nic->rxs; nic->ru_running = RU_SUSPENDED; return 0; }
CODE:
#define RFD_BUF_LEN (sizeof(struct rfd) + VLAN_ETH_FRAME_LEN)
static inline int e100_rx_alloc_skb(struct nic *nic, struct rx *rx) { /*skb缓存的分配,是通过调用系统函数dev_alloc_skb来完成的,它同内核栈中通常调用alloc_skb的区别在于, 它是原子的,所以,通常在中断上下文中使用*/ if(!(rx->skb = dev_alloc_skb(RFD_BUF_LEN + NET_IP_ALIGN))) return -ENOMEM; /*初始化必要的成员 */ rx->skb->dev = nic->netdev; skb_reserve(rx->skb, NET_IP_ALIGN); /*这里在数据区之前,留了一块sizeof(struct rfd) 这么大的空间,该结构的 一个重要作用,用来保存一些状态信息,比如,在接收数据之前,可以先通过 它,来判断是否真有数据到达等,诸如此类*/ memcpy(rx->skb->data, &nic->blank_rfd, sizeof(struct rfd)); /*这是最关键的一步,建立DMA映射,把每一个缓冲区rx->skb->data都映射给了设备,缓存区节点 rx利用dma_addr保存了每一次映射的地址,这个地址后面会被用到*/ rx->dma_addr = pci_map_single(nic->pdev, rx->skb->data, RFD_BUF_LEN, PCI_DMA_BIDIRECTIONAL); if(pci_dma_mapping_error(rx->dma_addr)) { dev_kfree_skb_any(rx->skb); rx->skb = 0; rx->dma_addr = 0; return -ENOMEM; } /* Link the RFD to end of RFA by linking previous RFD to * this one, and clearing EL bit of previous. */ if(rx->prev->skb) { struct rfd *prev_rfd = (struct rfd *)rx->prev->skb->data; /*put_unaligned(val,ptr);用到把var放到ptr指针的地方,它能处理处理内存对齐的问题 prev_rfd是在缓冲区开始处保存的一点空间,它的link成员,也保存了映射后的地址*/ put_unaligned(cpu_to_le32(rx->dma_addr), (u32 *)&prev_rfd->link); wmb(); prev_rfd->command &= ~cpu_to_le16(cb_el); pci_dma_sync_single_for_device(nic->pdev, rx->prev->dma_addr, sizeof(struct rfd), PCI_DMA_TODEVICE); } return 0; } e100_rx_alloc_list函数在一个循环中,建立了环形缓冲区,并调用e100_rx_alloc_skb为每个缓冲区分配了空间,并做了 DMA映射。这样,我们就可以来看接收数据的过程了。 前面我们讲过,中断函数中,调用netif_rx_schedule,表明使用轮询技术,系统会在未来某一时刻,调用设备的poll函数:
CODE:
static int e100_poll(struct net_device *netdev, int *budget)
{ struct nic *nic = netdev_priv(netdev); unsigned int work_to_do = min(netdev->quota, *budget); unsigned int work_done = 0; int tx_cleaned; e100_rx_clean(nic, &work_done, work_to_do); tx_cleaned = e100_tx_clean(nic); /* If no Rx and Tx cleanup work was done, exit polling mode. */ if((!tx_cleaned && (work_done == 0)) || !netif_running(netdev)) { netif_rx_complete(netdev); e100_enable_irq(nic); return 0; } *budget -= work_done; netdev->quota -= work_done; return 1; } 目前,我们只关心rx,所以,e100_rx_clean函数就成了我们关注的对像,它用来从缓冲队列中接收全部数据(这或许是取名为clean的原因吧!):
CODE:
static inline void e100_rx_clean(struct nic *nic, unsigned int *work_done,
unsigned int work_to_do) { struct rx *rx; int restart_required = 0; struct rx *rx_to_start = NULL; /* are we already rnr? then pay attention!!! this ensures that * the state machine progression never allows a start with a * partially cleaned list, avoiding a race between hardware * and rx_to_clean when in NAPI mode */ if(RU_SUSPENDED == nic->ru_running) restart_required = 1; /* 函数最重要的工作,就是遍历环形缓冲区,接收数据*/ for(rx = nic->rx_to_clean; rx->skb; rx = nic->rx_to_clean = rx->next) { int err = e100_rx_indicate(nic, rx, work_done, work_to_do); if(-EAGAIN == err) { /* hit quota so have more work to do, restart once * cleanup is complete */ restart_required = 0; break; } else if(-ENODATA == err) break; /* No more to clean */ } /* save our starting point as the place we'll restart the receiver */ if(restart_required) rx_to_start = nic->rx_to_clean; /* Alloc new skbs to refill list */ for(rx = nic->rx_to_use; !rx->skb; rx = nic->rx_to_use = rx->next) { if(unlikely(e100_rx_alloc_skb(nic, rx))) break; /* Better luck next time (see watchdog) */ } if(restart_required) { // ack the rnr? writeb(stat_ack_rnr, &nic->csr->scb.stat_ack); e100_start_receiver(nic, rx_to_start); if(work_done) (*work_done)++; } }
CODE:
static inline int e100_rx_indicate(struct nic *nic, struct rx *rx,
unsigned int *work_done, unsigned int work_to_do) { struct sk_buff *skb = rx->skb; struct rfd *rfd = (struct rfd *)skb->data; u16 rfd_status, actual_size; if(unlikely(work_done && *work_done >= work_to_do)) return -EAGAIN; /* 读取数据之前,也就是取消DMA映射之前,需要先读取cb_complete 状态位, 以确定数据是否真的准备好了,并且,rfd的actual_size中,也包含了真实的数据大小 pci_dma_sync_single_for_cpu函数前面已经介绍过,它让CPU在取消DMA映射之前,具备 访问DMA缓存的能力*/ pci_dma_sync_single_for_cpu(nic->pdev, rx->dma_addr, sizeof(struct rfd), PCI_DMA_FROMDEVICE); rfd_status = le16_to_cpu(rfd->status); DPRINTK(RX_STATUS, DEBUG, "status=0x%04X\n", rfd_status); /* If data isn't ready, nothing to indicate */ if(unlikely(!(rfd_status & cb_complete))) return -ENODATA; /* Get actual data size */ actual_size = le16_to_cpu(rfd->actual_size) & 0x3FFF; if(unlikely(actual_size > RFD_BUF_LEN - sizeof(struct rfd))) actual_size = RFD_BUF_LEN - sizeof(struct rfd); /* 取消映射,因为通过DMA,网卡已经把数据放在了主内存中,这里一取消,也就意味着, CPU可以处理主内存中的数据了 */ pci_unmap_single(nic->pdev, rx->dma_addr, RFD_BUF_LEN, PCI_DMA_FROMDEVICE); /* this allows for a fast restart without re-enabling interrupts */ if(le16_to_cpu(rfd->command) & cb_el) nic->ru_running = RU_SUSPENDED; /*正确地设置data指针,因为最前面有一个sizeof(struct rfd)大小区域,跳过它*/ skb_reserve(skb, sizeof(struct rfd)); /*更新skb的tail和len指针,也是就更新接收到这么多数据的长度*/ skb_put(skb, actual_size); /*设置协议位*/ skb->protocol = eth_type_trans(skb, nic->netdev); if(unlikely(!(rfd_status & cb_ok))) { /* Don't indicate if hardware indicates errors */ nic->net_stats.rx_dropped++; dev_kfree_skb_any(skb); } else if(actual_size > nic->netdev->mtu + VLAN_ETH_HLEN) { /* Don't indicate oversized frames */ nic->rx_over_length_errors++; nic->net_stats.rx_dropped++; dev_kfree_skb_any(skb); } else { /*网卡驱动要做的最后一步,就是统计接收计数器,设置接收时间戳,然后调用netif_receive_skb, 把数据包交给上层协议栈,自己的光荣始命也就完成了*/ nic->net_stats.rx_packets++; nic->net_stats.rx_bytes += actual_size; nic->netdev->last_rx = jiffies; netif_receive_skb(skb); if(work_done) (*work_done)++; } rx->skb = NULL; return 0; } 网卡驱动执行到这里,数据接收的工作,也就处理完成了。但是,使用这一种方法的驱动,省去了网络栈中一个重要的内容,就是 “队列层”,让我们来看看,传统中断接收数据包模式下,使用netif_rx函数调用,又会发生什么。 PS:九贱没有去研究过所谓的“零拷贝”技术,不太清楚,它同这种DMA直取方式有何不同?难道是把网卡中的I/O内存直接映射到主内存中,这样CPU就 可以像读取主内存一样,读取网卡的内存,但是这要求设备要有好大的I/O内存来做缓冲呀!!^o^,外行了……希望哪位DX提点! 五、队列层 1、软中断与下半部 当用中断处理的时候,为了减少中断处理的工作量,比如,一般中断处理时,需要屏蔽其它中断,如果中断处理时间过长,那么其它中断 有可能得不到及时处理,也以,有一种机制,就是把“不必马上处理”的工作,推迟一点,让它在中断处理后的某一个时刻得到处理。这就 是下半部。 下半部只是一个机制,它在Linux中,有多种实现方式,其中一种对时间要求最严格的实现方式,叫“软中断”,可以使用: open_softirq() 来向内核注册一个软中断, 然后,在合适的时候,调用 raise_softirq_irqoff() 触发它。 如果采用中断方式接收数据(这一节就是在说中断方式接收,后面,就不用这种假设了),同样也需要软中断,可以调用 open_softirq(NET_RX_SOFTIRQ, net_rx_action, NULL); 向内核注册一个名为NET_RX_SOFTIR的软中断,net_rx_action是软中断的处理函数。 然后,在驱动中断处理完后的某一个时刻,调用 raise_softirq_irqoff(NET_RX_SOFTIRQ); 触发它,这样net_rx_action将得到执行。 2、队列层 什么是队列层?通常,在网卡收发数据的时候,需要维护一个缓冲区队列,来缓存可能存在的突发数据,类似于前面的DMA环形缓冲区。 队列层中,包含了一个叫做struct softnet_data:
CODE:
struct softnet_data
{ /*throttle 用于拥塞控制,当拥塞发生时,throttle将被设置,后续进入的数据包将被丢弃*/ int throttle; /*netif_rx函数返回的拥塞级别*/ int cng_level; int avg_blog; /*softnet_data 结构包含一个指向接收和传输队列的指针,input_pkt_queue成员指向准备传送 给网络层的sk_buffs包链表的首部的指针,这个队列中的包是由netif_rx函数递交的*/ struct sk_buff_head input_pkt_queue; struct list_head poll_list; struct net_device *output_queue; struct sk_buff *completion_queue; struct net_device backlog_dev; /* Sorry. 8) */ }; 内核使用了一个同名的变量softnet_data,它是一个Per-CPU变量,每个CPU都有一个。 net/core/dev.c
CODE:
DECLARE_PER_CPU(struct softnet_data,softnet_data);
CODE:
/*
* 网络模块的核心处理模块. */ static int __init net_dev_init(void) { int i, rc = -ENOMEM; BUG_ON(!dev_boot_phase); net_random_init(); if (dev_proc_init()) /*初始化proc文件系统*/ goto out; if (netdev_sysfs_init()) /*初始化sysfs文件系统*/ goto out; /*ptype_all和ptype_base是重点,后面会详细分析,它们都是 struct list_head类型变量,这里初始化链表成员*/ INIT_LIST_HEAD(&ptype_all); for (i = 0; i < 16; i++) INIT_LIST_HEAD(&ptype_base[i]); for (i = 0; i < ARRAY_SIZE(dev_name_head); i++) INIT_HLIST_HEAD(&dev_name_head[i]); for (i = 0; i < ARRAY_SIZE(dev_index_head); i++) INIT_HLIST_HEAD(&dev_index_head[i]); /* * 初始化包接收队列,这里我们的重点了. */ /*遍历每一个CPU,取得它的softnet_data,我们说过,它是一个struct softnet_data的Per-CPU变量*/ for (i = 0; i < NR_CPUS; i++) { struct softnet_data *queue; /*取得第i个CPU的softnet_data,因为队列是包含在它里边的,所以,我会直接说,“取得队列”*/ queue = &per_cpu(softnet_data, i); /*初始化队列头*/ skb_queue_head_init(&queue->input_pkt_queue); queue->throttle = 0; queue->cng_level = 0; queue->avg_blog = 10; /* arbitrary non-zero */ queue->completion_queue = NULL; INIT_LIST_HEAD(&queue->poll_list); set_bit(__LINK_STATE_START, &queue->backlog_dev.state); queue->backlog_dev.weight = weight_p; /*这里,队列中backlog_dev设备,它是一个伪网络设备,不对应任何物理设备,它的poll函数,指向了 process_backlog,后面我们会详细分析*/ queue->backlog_dev.poll = process_backlog; atomic_set(&queue->backlog_dev.refcnt, 1); } #ifdef OFFLINE_SAMPLE samp_timer.expires = jiffies + (10 * HZ); add_timer(&samp_timer); #endif dev_boot_phase = 0; /*注册收/发软中断*/ open_softirq(NET_TX_SOFTIRQ, net_tx_action, NULL); open_softirq(NET_RX_SOFTIRQ, net_rx_action, NULL); hotcpu_notifier(dev_cpu_callback, 0); dst_init(); dev_mcast_init(); rc = 0; out: return rc; } 这样,初始化完成后,在驱动程序中,在中断处理函数中,会调用netif_rx将数据交上来,这与采用轮询技术,有本质的不同:
CODE:
int netif_rx(struct sk_buff *skb)
{ int this_cpu; struct softnet_data *queue; unsigned long flags; /* if netpoll wants it, pretend we never saw it */ if (netpoll_rx(skb)) return NET_RX_DROP; /*接收时间戳未设置,设置之*/ if (!skb->stamp.tv_sec) net_timestamp(&skb->stamp); /* * 这里准备将数据包放入接收队列,需要禁止本地中断,在入队操作完成后,再打开中断. */ local_irq_save(flags); /*获取当前CPU对应的softnet_data变量*/ this_cpu = smp_processor_id(); queue = &__get_cpu_var(softnet_data); /*接收计数器累加*/ __get_cpu_var(netdev_rx_stat).total++; /*接收队列是否已满*/ if (queue->input_pkt_queue.qlen <= netdev_max_backlog) { if (queue->input_pkt_queue.qlen) { if (queue->throttle) /*拥塞发生了,丢弃数据包*/ goto drop; /*数据包入队操作*/ enqueue: dev_hold(skb->dev); /*累加设备引入计数器*/ __skb_queue_tail(&queue->input_pkt_queue, skb); /*将数据包加入接收队列*/ #ifndef OFFLINE_SAMPLE get_sample_stats(this_cpu); #endif local_irq_restore(flags); return queue->cng_level; } /* * 驱动程序不断地调用net_rx函数,实现接收数据包的入队操作,当qlen == 0时, 则进入这段代码,这里,如果已经被设置拥塞标志的话,则清除它,因为这里将要调用软中断,开始将数据包交给 上层了,即上层协议的接收函数将执行出队操作,拥塞自然而然也就不存在了。 */ if (queue->throttle) queue->throttle = 0; /* * netif_rx_schedule函数完成两件重要的工作: * 1、将bakclog_dev设备加入“处理数据包的设备”的链表当中; * 2、触发软中断函数,进行数据包接收处理; */ netif_rx_schedule(&queue->backlog_dev); goto enqueue; } /*前面判断了队列是否已满,如果已满而标志未设置,设置之,并累加拥塞计数器*/ if (!queue->throttle) { queue->throttle = 1; __get_cpu_var(netdev_rx_stat).throttled++; } /*拥塞发生,累加丢包计数器,释放数据包*/ drop: __get_cpu_var(netdev_rx_stat).dropped++; local_irq_restore(flags); kfree_skb(skb); return NET_RX_DROP; } 从 这段代码的分析中,我们可以看到,当第一个数据包被接收后,因为qlen==0,所以首先会调用netif_rx_schedule触发软中断,然后利用 goto跳转至入队。因为软中断被触发后,将执行出队操作,把数据交往上层处理。而当这个时候,又有数据包进入,即网卡中断产生,因为它的优先级高过软中 断,这样,出队操作即被中断,网卡中断程序再将被调用,netif_rx函数又再次被执行,如果队列未满,就入队返回。中断完成后,软中断的执行过程被恢 复而继续执行出队——如此生产者/消费者循环不止,生生不息…… netif_rx调用netif_rx_schedule进一步处理数据包,我们注意到: 1、前面讨论过,采用轮询技术时,同样地,也是调用netif_rx_schedule,把设备自己传递了过去; 2、这里,采用中断方式,传递的是队列中的一个“伪设备”,并且,这个伪设备的poll函数指针,指向了一个叫做process_backlog的函数; netif_rx_schedule函数完成两件重要的工作: 1、将bakclog_dev设备加入“处理数据包的设备”的链表当中; 2、触发软中断函数,进行数据包接收处理; 这样,我们可以猜想,在软中断函数中,不论是伪设备bakclog_dev,还是真实的设备(如前面讨论过的e100),都会被软中断函数以: dev->poll() 的形式调用,对于e100来说,poll函数的接收过程已经分析了,而对于其它所有没有采用轮询技术的网络设备来说,它们将统统调用 process_backlog函数(我觉得把它改名为pseudo-poll是否更合适一些^o^)。 OK,我想分析到这里,关于中断处理与轮询技术的差异,已经基本分析开了…… 继续来看,netif_rx_schedule进一步调用__netif_rx_schedule:
CODE:
/* Try to reschedule poll. Called by irq handler. */
static inline void netif_rx_schedule(struct net_device *dev) { if (netif_rx_schedule_prep(dev)) __netif_rx_schedule(dev); }
CODE:
/* Add interface to tail of rx poll list. This assumes that _prep has
* already been called and returned 1. */ static inline void __netif_rx_schedule(struct net_device *dev) { unsigned long flags; local_irq_save(flags); dev_hold(dev); /*伪设备也好,真实的设备也罢,都被加入了队列层的设备列表*/ list_add_tail(&dev->poll_list, &__get_cpu_var(softnet_data).poll_list); if (dev->quota < 0) dev->quota += dev->weight; else dev->quota = dev->weight; /*触发软中断*/ __raise_softirq_irqoff(NET_RX_SOFTIRQ); local_irq_restore(flags); } 软中断被触发,注册的net_rx_action函数将被调用:
CODE:
/*接收的软中断处理函数*/
static void net_rx_action(struct softirq_action *h) { struct softnet_data *queue = &__get_cpu_var(softnet_data); unsigned long start_time = jiffies; int budget = netdev_max_backlog; local_irq_disable(); /* * 遍历队列的设备链表,如前所述,__netif_rx_schedule已经执行了 * list_add_tail(&dev->poll_list, &__get_cpu_var(softnet_data).poll_list); * 设备bakclog_dev已经被添加进来了 */ while (!list_empty(&queue->poll_list)) { struct net_device *dev; if (budget <= 0 || jiffies - start_time > 1) goto softnet_break; local_irq_enable(); /*取得链表中的设备*/ dev = list_entry(queue->poll_list.next, struct net_device, poll_list); netpoll_poll_lock(dev); /*调用设备的poll函数,处理接收数据包,这样,采用轮询技术的网卡,它的真实的poll函数将被调用, 这就回到我们上一节讨论的e100_poll函数去了,而对于采用传统中断处理的设备,它们调用的,都将是 bakclog_dev的process_backlog函数*/ if (dev->quota <= 0 || dev->poll(dev, &budget)) { netpoll_poll_unlock(dev); /*处理完成后,把设备从设备链表中删除,又重置于末尾*/ local_irq_disable(); list_del(&dev->poll_list); list_add_tail(&dev->poll_list, &queue->poll_list); if (dev->quota < 0) dev->quota += dev->weight; else dev->quota = dev->weight; } else { netpoll_poll_unlock(dev); dev_put(dev); local_irq_disable(); } } out: local_irq_enable(); return; softnet_break: __get_cpu_var(netdev_rx_stat).time_squeeze++; __raise_softirq_irqoff(NET_RX_SOFTIRQ); goto out; } 对于dev->poll(dev, &budget)的调用,一个真实的poll函数的例子,我们已经分析过了,现在来看process_backlog,
CODE:
static int process_backlog(struct net_device *backlog_dev, int *budget)
{ int work = 0; int quota = min(backlog_dev->quota, *budget); struct softnet_data *queue = &__get_cpu_var(softnet_data); unsigned long start_time = jiffies; backlog_dev->weight = weight_p; /*在这个循环中,执行出队操作,把数据从队列中取出来,交给netif_receive_skb,直至队列为空*/ for (;;) { struct sk_buff *skb; struct net_device *dev; local_irq_disable(); skb = __skb_dequeue(&queue->input_pkt_queue); if (!skb) goto job_done; local_irq_enable(); dev = skb->dev; netif_receive_skb(skb); dev_put(dev); work++; if (work >= quota || jiffies - start_time > 1) break; } backlog_dev->quota -= work; *budget -= work; return -1; /*当队列中的数据包被全部处理后,将执行到这里*/ job_done: backlog_dev->quota -= work; *budget -= work; list_del(&backlog_dev->poll_list); smp_mb__before_clear_bit(); netif_poll_enable(backlog_dev); if (queue->throttle) queue->throttle = 0; local_irq_enable(); return 0; } 这个函数重要的工作,就是出队,然后调用netif_receive_skb()将数据包交给上层,这与上一节讨论的poll是一样的。这也是为什么, 在网卡驱动的编写中,采用中断技术,要调用netif_rx,而采用轮询技术,要调用netif_receive_skb啦! 到了这里,就处理完数据包与设备相关的部分了,数据包将进入上层协议栈…… |
联系客服