相比于发送数据,接收数据更复杂一些。接收数据这里和3层的接口是tcp_v4_rcv(我前面的blog有介绍3层和4层的接口的实现).而4层和用户空间,也就是系统调用是socket_recvmsg(其他的读取函数也都会调用这个函数).而这个系统调用会调用__sock_recvmsg.下面我们就先来看下这个函数。
它的主要功能是初始化sock_iocb,以便与将来数据从内核空间拷贝到用户空间。然后调用
recvmsg这个虚函数(tcp协议的话也就是tcp_recvmsg).
- static inline int __sock_recvmsg(struct kiocb *iocb, struct socket *sock,
- struct msghdr *msg, size_t size, int flags)
- {
- int err;
- struct sock_iocb *si = kiocb_to_siocb(iocb);
-
- si->sock = sock;
- si->scm = NULL;
- si->msg = msg;
- si->size = size;
- si->flags = flags;
-
- err = security_socket_recvmsg(sock, msg, size, flags);
- if (err)
- return err;
-
- return sock->ops->recvmsg(iocb, sock, msg, size, flags);
- }
static inline int __sock_recvmsg(struct kiocb *iocb, struct socket *sock,struct msghdr *msg, size_t size, int flags){int err;struct sock_iocb *si = kiocb_to_siocb(iocb);///初始化si。si->sock = sock;si->scm = NULL;si->msg = msg;si->size = size;si->flags = flags;err = security_socket_recvmsg(sock, msg, size, flags);if (err)return err;//调用tcp_recvmsgreturn sock->ops->recvmsg(iocb, sock, msg, size, flags);}
内核对待数据的接收分为2部分,一部分是当用户是阻塞的读取数据时,这时如果有数据则是直接拷贝到用户空间。而另一方面,如果是非阻塞,则会先把数据拷贝到接收队列。
而在内核中这个队列分为3种形式。分别是:
1 sock域结构的 sk_backlog队列。
2 tcp_sock的ucopy.prequeue队列。
3 sock结构的 receive_queue队列。
我们先来看两个主要的结构体,然后再来解释这3各队列的区别,首先是ucopy结构.
这个结构表示将要直接复制到用户空间的数据。
-
- struct {
-
- struct sk_buff_head prequeue;
-
- struct task_struct *task;
-
- struct iovec *iov;
-
- int memory;
-
- int len;
- ........................
- } ucopy;
/* Data for direct copy to user */struct {///prequeue队列。struct sk_buff_head prequeue;///表示当前所处的进程,其实也就是skb的接受者。struct task_struct *task;///数据区struct iovec *iov;///prequeue队列总的所占用的内存大小int memory;///这个域表示用户所请求的长度(要注意这个值是可变的,随着拷贝给用户的数据而减少)int len;........................} ucopy;
接下来是sock的sock_lock结构.
内核的注释很详细,这个锁主要是用来对软中断和进程上下文之间提供一个同步。
-
-
-
-
- typedef struct {
-
- spinlock_t slock;
-
- int owned;
-
- wait_queue_head_t wq;
-
-
-
-
-
-
- #ifdef CONFIG_DEBUG_LOCK_ALLOC
- struct lockdep_map dep_map;
- #endif
- } socket_lock_t;
/* This is the per-socket lock. The spinlock provides a synchronization* between user contexts and software interrupt processing, whereas the* mini-semaphore synchronizes multiple users amongst themselves.*/typedef struct {///自选锁spinlock_t slock;///如果有用户进程在使用这个sock 则owned为1,否则为0int owned;///等待队列,也就是当sock被锁住后,等待使用这个sock对象。wait_queue_head_t wq;/** We express the mutex-alike socket_lock semantics* to the lock validator by explicitly managing* the slock as a lock variant (in addition to* the slock itself):*/#ifdef CONFIG_DEBUG_LOCK_ALLOCstruct lockdep_map dep_map;#endif} socket_lock_t;
然后来看3个队列的区别。
首先sk_backlog队列是当当前的sock在进程上下文中被使用时,如果这个时候有数据到来,则将数据拷贝到sk_backlog.
prequeue则是数据buffer第一站一般都是这里,如果prequeue已满,则会拷贝数据到receive_queue队列种。
最后一个receive_queue也就是进程上下文第一个取buffer的队列。(后面介绍tcp_recvmsg时会再介绍这3个队列).
这里为什么要有prequeue呢,直接放到receive_queue不就好了.这里我是认receive_queue的处理比较繁琐(看tcp_rcv_established的实现就知道了,分为slow path和fast path),而软中断每次只能处理一个数据包(在一个cpu上),因此为了软中断能尽快完成,我们就可以先将数据放到prequeue中(
tcp_prequeue),然后软中断就直接返回.而处理prequeue就放到进程上下文去处理了.
最后在分析tcp_v4_rcv和tcp_recvmsg之前,我们要知道tcp_v4_rcv还是处于软中断上下文,而tcp_recvmsg是处于进程上下文,因此比如socket_lock_t才会提供一个owned来锁住对应的sock。而我们也就是需要这3个队列来进行软中断上下文和进程上下文之间的通信。最终当数据拷贝到对应队列,则软中断调用返回。这里要注意的是相同的函数在软中断上下文和进程上下文种调用是不同的,我们下面就会看到(比如tcp_rcv_established函数)
ok,现在来看tcp_v4_rcv的源码。这个函数是在软中断上下文中被调用的,我们这里来看下她的代码片断:
- int tcp_v4_rcv(struct sk_buff *skb)
- {
-
- const struct iphdr *iph;
- struct tcphdr *th;
- struct sock *sk;
- int ret;
- struct net *net = dev_net(skb->dev);
- ............................
-
-
- sk = __inet_lookup_skb(&tcp_hashinfo, skb, th->source, th->dest);
- if (!sk)
- goto no_tcp_socket;
-
- process:
-
-
- if (sk->sk_state == TCP_TIME_WAIT)
- goto do_time_wait;
- .................................
-
- bh_lock_sock_nested(sk);
- ret = 0;
-
- if (!sock_owned_by_user(sk)) {
- 。........................
- {
-
- if (!tcp_prequeue(sk, skb))
-
- ret = tcp_v4_do_rcv(sk, skb);
- }
- } else
-
- sk_add_backlog(sk, skb);
-
- bh_unlock_sock(sk);
-
- sock_put(sk);
-
- return ret;
- ...................................................
int tcp_v4_rcv(struct sk_buff *skb){///一些用到的变量const struct iphdr *iph;struct tcphdr *th;struct sock *sk;int ret;struct net *net = dev_net(skb->dev);............................//通过四元组得到对应的sock。sk = __inet_lookup_skb(&tcp_hashinfo, skb, th->source, th->dest);if (!sk)goto no_tcp_socket;process:///如果是time_wait状态,则进入相关处理(这次不会分析time_wait状态,以后分析tcp的断开状态变迁时,会详细分析这个).if (sk->sk_state == TCP_TIME_WAIT)goto do_time_wait;.................................///加下半部的锁bh_lock_sock_nested(sk);ret = 0;///这个宏很简单就是判断(sk)->sk_lock.owned.也就是当进程上下文在使用这个sock时为1.if (!sock_owned_by_user(sk)) {。........................{///先将buffer放到prequeue队列中。如果成功则返回1.if (!tcp_prequeue(sk, skb))///假设失败,则直接调用tcp_v4_do_rcv处理这个skb(其实也就是直接放到receive_queue中).ret = tcp_v4_do_rcv(sk, skb);}} else///当有进程在使用这个sock则放buf到sk_backlog中。sk_add_backlog(sk, skb);//解锁。bh_unlock_sock(sk);sock_put(sk);return ret;...................................................
上面的流程很简单,我们接下来来看几个跳过的函数,第一个是
tcp_prequeue。
这里我们可以看到sysctl_tcp_low_latency可以决定我们是否使用prequeue队列.
-
- static inline int tcp_prequeue(struct sock *sk, struct sk_buff *skb)
- {
- struct tcp_sock *tp = tcp_sk(sk);
-
-
- if (sysctl_tcp_low_latency || !tp->ucopy.task)
- return 0;
-
- __skb_queue_tail(&tp->ucopy.prequeue, skb);
-
- tp->ucopy.memory += skb->truesize;
-
- if (tp->ucopy.memory > sk->sk_rcvbuf) {
- struct sk_buff *skb1;
-
- BUG_ON(sock_owned_by_user(sk));
-
- while ((skb1 = __skb_dequeue(&tp->ucopy.prequeue)) != NULL) {
-
- sk_backlog_rcv(sk, skb1);
- NET_INC_STATS_BH(sock_net(sk),
- LINUX_MIB_TCPPREQUEUEDROPPED);
- }
-
- tp->ucopy.memory = 0;
- } else if (skb_queue_len(&tp->ucopy.prequeue) == 1) {
-
- wake_up_interruptible_poll(sk->sk_sleep,
- POLLIN | POLLRDNORM | POLLRDBAND);
-
-
- if (!inet_csk_ack_scheduled(sk))
- inet_csk_reset_xmit_timer(sk, ICSK_TIME_DACK,
- (3 * tcp_rto_min(sk)) / 4,
- TCP_RTO_MAX);
- }
- return 1;
- }
static inline int tcp_prequeue(struct sock *sk, struct sk_buff *skb){struct tcp_sock *tp = tcp_sk(sk);///如果启用tcp_low_latency或者ucopy.task为空则返回0.ucopy.task为空一般是表示进程空间有进程在等待sock的数据的到来,因此我们需要直接复制数据到receive队列。并唤醒它。if (sysctl_tcp_low_latency || !tp->ucopy.task)return 0;///加数据包到prequeue队列。__skb_queue_tail(&tp->ucopy.prequeue, skb);///update内存大小。tp->ucopy.memory += skb->truesize;///如果prequeue已满,则将处理prequeue队列。if (tp->ucopy.memory > sk->sk_rcvbuf) {struct sk_buff *skb1;BUG_ON(sock_owned_by_user(sk));///遍历prequeue队列。while ((skb1 = __skb_dequeue(&tp->ucopy.prequeue)) != NULL) {///这个函数最终也会调用tcp_v4_do_rcv(也就是加入到receive队列中).sk_backlog_rcv(sk, skb1);NET_INC_STATS_BH(sock_net(sk),LINUX_MIB_TCPPREQUEUEDROPPED);}///清空内存。tp->ucopy.memory = 0;} else if (skb_queue_len(&tp->ucopy.prequeue) == 1) {///这里表示这个数据包是prequeue的第一个包。然后唤醒等待队列。wake_up_interruptible_poll(sk->sk_sleep,POLLIN | POLLRDNORM | POLLRDBAND);///这里的定时器以后会详细介绍。if (!inet_csk_ack_scheduled(sk))inet_csk_reset_xmit_timer(sk, ICSK_TIME_DACK,(3 * tcp_rto_min(sk)) / 4,TCP_RTO_MAX);}return 1;}
我们这里只关注TCP_ESTABLISHED状态,来看tcp_v4_do_rcv:它主要是通过判断相应的tcp状态来进入相关的处理函数。
- int tcp_v4_do_rcv(struct sock *sk, struct sk_buff *skb)
- {
- struct sock *rsk;
- ...................................
- if (sk->sk_state == TCP_ESTABLISHED) {
- TCP_CHECK_TIMER(sk);
-
- if (tcp_rcv_established(sk, skb, tcp_hdr(skb), skb->len)) {
- rsk = sk;
- goto reset;
- }
- TCP_CHECK_TIMER(sk);
- return 0;
- }
-
- ........................................
- }
int tcp_v4_do_rcv(struct sock *sk, struct sk_buff *skb){struct sock *rsk;...................................if (sk->sk_state == TCP_ESTABLISHED) { /* Fast path */TCP_CHECK_TIMER(sk);///处理数据包。if (tcp_rcv_established(sk, skb, tcp_hdr(skb), skb->len)) {rsk = sk;goto reset;}TCP_CHECK_TIMER(sk);return 0;}........................................}
因此我们这里重点要看的函数就是tcp_rcv_established,当它在软中断上下文中被调用时,主要的目的是将skb加入到receive_queue队列中。因此这里我们只看这一部分,等下面分析tcp_recvmsg时,我们再来看进程上下文才会处理的一部分。
-
- if (!eaten) {
-
- if (tcp_checksum_complete_user(sk, skb))
-
- goto csum_error;
- ..................................................
-
- __skb_pull(skb, tcp_header_len);
-
-
- __skb_queue_tail(&sk->sk_receive_queue, skb);
- skb_set_owner_r(skb, sk);
-
- tp->rcv_nxt = TCP_SKB_CB(skb)->end_seq;
- }
- ............................................
///程序如何到达这里,我们在分析tcp_recvmsg时会再次分析tcp_rcv_established,那个时候会介绍这个。if (!eaten) {///进行checksumif (tcp_checksum_complete_user(sk, skb))goto csum_error;..................................................__skb_pull(skb, tcp_header_len);///最重要的在这里,我们可以看到直接将skb加入到sk_receive队列中。__skb_queue_tail(&sk->sk_receive_queue, skb);skb_set_owner_r(skb, sk);///更新rcv_nxt,也就是表示下一个接收序列起始号。tp->rcv_nxt = TCP_SKB_CB(skb)->end_seq;}............................................
接下来来看tcp_rcvmsg函数。
通过上面我们知道有找个队列可供我们取得skbuf,那么具体的次序是什么呢,我这里摘抄内核的注释,它讲的非常清楚:
引用
Look: we have the following (pseudo)queues:
1. packets in flight
2. backlog
3. prequeue
4. receive_queue
Each queue can be processed only if the next ones are empty. At this point we have empty receive_queue.But prequeue _can_ be not empty after 2nd iteration, when we jumped to start of loop because backlog
processing added something to receive_queue. We cannot release_sock(), because backlog containd packets arrived _after_ prequeued ones.
Shortly, algorithm is clear --- to process all the queues in order. We could make it more directly,requeueing packets from backlog to prequeue, if is not empty. It is more elegant, but eats cycles,
由于这个函数比较复杂,因此我们分段来分析这个函数。
首先是处理包之前的一些合法性判断,以及取得一些有用的值。
- int tcp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
- size_t len, int nonblock, int flags, int *addr_len)
- {
- ...................................
-
-
- lock_sock(sk);
-
- TCP_CHECK_TIMER(sk);
-
- err = -ENOTCONN;
- if (sk->sk_state == TCP_LISTEN)
- goto out;
-
-
-
- timeo = sock_rcvtimeo(sk, nonblock);
-
-
- if (flags & MSG_OOB)
- goto recv_urg;
-
-
- seq = &tp->copied_seq;
- if (flags & MSG_PEEK) {
- peek_seq = tp->copied_seq;
- seq = &peek_seq;
- }
-
-
- target = sock_rcvlowat(sk, flags & MSG_WAITALL, len);
- }
int tcp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,size_t len, int nonblock, int flags, int *addr_len){...................................///锁住当前的socket。lock_sock(sk);TCP_CHECK_TIMER(sk);err = -ENOTCONN;if (sk->sk_state == TCP_LISTEN)goto out;///得到超时时间(前面已经介绍过了).如果非阻塞则为0.timeo = sock_rcvtimeo(sk, nonblock);/* Urgent data needs to be handled specially. */if (flags & MSG_OOB)goto recv_urg;///取得当前tcp字节流中的未读数据的起始序列号。seq = &tp->copied_seq;if (flags & MSG_PEEK) {peek_seq = tp->copied_seq;seq = &peek_seq;}///主要是用来处理MSG_WAITALL套接字选项。这个选项是用来标记是否等待所有的数据到达才返回的。target = sock_rcvlowat(sk, flags & MSG_WAITALL, len);}
在上面我们看到了lock_sock,这个函数是用来锁住当前的sock, 我们来看它的详细实现,它最终会调用lock_sock_nested:
-
- void lock_sock_nested(struct sock *sk, int subclass)
- {
- might_sleep();
-
- spin_lock_bh(&sk->sk_lock.slock);
-
- if (sk->sk_lock.owned)
- __lock_sock(sk);
-
- sk->sk_lock.owned = 1;
-
- spin_unlock(&sk->sk_lock.slock);
-
-
-
- mutex_acquire(&sk->sk_lock.dep_map, subclass, 0, _RET_IP_);
- local_bh_enable();
- }
void lock_sock_nested(struct sock *sk, int subclass){might_sleep();///首先加锁。spin_lock_bh(&sk->sk_lock.slock);///如果owned为1,也就是有其他进程在使用这个sock。此时调用__lock_sock(这个函数用来休眠进程,进入等待队列)。if (sk->sk_lock.owned)__lock_sock(sk);///当sock可以使用了,则设置owned为1,标记被当前进程所使用。sk->sk_lock.owned = 1;///解锁。spin_unlock(&sk->sk_lock.slock);/** The sk_lock has mutex_lock() semantics here:*/mutex_acquire(&sk->sk_lock.dep_map, subclass, 0, _RET_IP_);local_bh_enable();}
我们再来看__lock_sock如何来处理的。
- static void __lock_sock(struct sock *sk)
- {
- DEFINE_WAIT(wait);
-
- for (;;) {
-
- prepare_to_wait_exclusive(&sk->sk_lock.wq, &wait,
- TASK_UNINTERRUPTIBLE);
-
- spin_unlock_bh(&sk->sk_lock.slock);
-
- schedule();
- spin_lock_bh(&sk->sk_lock.slock);
-
- if (!sock_owned_by_user(sk))
- break;
- }
- finish_wait(&sk->sk_lock.wq, &wait);
- }
static void __lock_sock(struct sock *sk){DEFINE_WAIT(wait);for (;;) {///加入等待队列,可以看到加入的等待队列是sl_lock.wq,也就是我们上面介绍过得。而这个等待队列的唤醒我们下面会介绍。prepare_to_wait_exclusive(&sk->sk_lock.wq, &wait,TASK_UNINTERRUPTIBLE);///解锁。spin_unlock_bh(&sk->sk_lock.slock);///让出cpu,进入休眠。schedule();spin_lock_bh(&sk->sk_lock.slock);///如果轮到我们处理这个sock,则跳出循环。if (!sock_owned_by_user(sk))break;}finish_wait(&sk->sk_lock.wq, &wait);}
ok,再回到tcp_recvmsg.接下来我们来看如何处理数据包。
下面这一段主要是用来从receive队列中读取数据。
-
- do {
- u32 offset;
-
-
- if (tp->urg_data && tp->urg_seq == *seq) {
- if (copied)
- break;
- if (signal_pending(current)) {
- copied = timeo ? sock_intr_errno(timeo) : -EAGAIN;
- break;
- }
- }
-
-
-
- skb_queue_walk(&sk->sk_receive_queue, skb) {
-
- if (before(*seq, TCP_SKB_CB(skb)->seq)) {
- printk(KERN_INFO "recvmsg bug: copied %X "
- "seq %X\n", *seq, TCP_SKB_CB(skb)->seq);
- break;
- }
-
- offset = *seq - TCP_SKB_CB(skb)->seq;
-
- if (tcp_hdr(skb)->syn)
- offset--;
-
- if (offset < skb->len)
- goto found_ok_skb;
- if (tcp_hdr(skb)->fin)
- goto found_fin_ok;
- WARN_ON(!(flags & MSG_PEEK));
- }
- ....................................
- }while(len > 0)
do {u32 offset;///是否有urgent数据,如果已经读取了一些数据或者有个未决的sigurg信号,则直接退出循环。if (tp->urg_data && tp->urg_seq == *seq) {if (copied)break;if (signal_pending(current)) {copied = timeo ? sock_intr_errno(timeo) : -EAGAIN;break;}}///开始处理buf,首先是从receive队列中读取buf。skb_queue_walk(&sk->sk_receive_queue, skb) {///开始遍历receive_queue.if (before(*seq, TCP_SKB_CB(skb)->seq)) {printk(KERN_INFO "recvmsg bug: copied %X ""seq %X\n", *seq, TCP_SKB_CB(skb)->seq);break;}///由于tcp是字节流,因此我们拷贝给用户空间,需要正序的拷贝给用户,这里的第一个seq前面已经描述了,表示当前的总的sock连接中的未读数据的起始序列号,而后一个seq表示当前skb的起始序列号。因此这个差值如果小于skb->len,就表示,当前的skb就是我们需要读取的那个skb(因为它的序列号最小).offset = *seq - TCP_SKB_CB(skb)->seq;///跳过syn。if (tcp_hdr(skb)->syn)offset--;///找到skb。if (offset < skb->len)goto found_ok_skb;if (tcp_hdr(skb)->fin)goto found_fin_ok;WARN_ON(!(flags & MSG_PEEK));}....................................}while(len > 0)
接下来是对tcp状态做一些校验。这里要注意,copied表示的是已经复制到用户空间的skb的大小。而len表示还需要拷贝多少数据。
-
-
- if (copied >= target && !sk->sk_backlog.tail)
- break;
-
- if (copied) {
- if (sk->sk_err ||
- sk->sk_state == TCP_CLOSE ||
- (sk->sk_shutdown & RCV_SHUTDOWN) ||
- !timeo ||
- signal_pending(current))
- break;
- } else {
-
-
- if (sock_flag(sk, SOCK_DONE))
- break;
-
- if (sk->sk_err) {
- copied = sock_error(sk);
- break;
- }
-
- if (sk->sk_shutdown & RCV_SHUTDOWN)
- break;
-
- if (sk->sk_state == TCP_CLOSE) {
- if (!sock_flag(sk, SOCK_DONE)) {
- copied = -ENOTCONN;
- break;
- }
- break;
- }
-
- if (!timeo) {
- copied = -EAGAIN;
- break;
- }
-
- if (signal_pending(current)) {
- copied = sock_intr_errno(timeo);
- break;
- }
- }
///如果复制的值大于等于所需要复制的,并且sk_backlog为空,则跳出循环。这是因为我们每次复制完毕之后,都需要将sk_backlog中的数据复制到receive队列中。if (copied >= target && !sk->sk_backlog.tail)break;if (copied) {if (sk->sk_err ||sk->sk_state == TCP_CLOSE ||(sk->sk_shutdown & RCV_SHUTDOWN) ||!timeo ||signal_pending(current))break;} else {///如没有复制到数据(也就是receive为空),则判断是否有错误发生。这里主要是状态的判断和超时的判断。if (sock_flag(sk, SOCK_DONE))break;if (sk->sk_err) {copied = sock_error(sk);break;}if (sk->sk_shutdown & RCV_SHUTDOWN)break;if (sk->sk_state == TCP_CLOSE) {if (!sock_flag(sk, SOCK_DONE)) {copied = -ENOTCONN;break;}break;}if (!timeo) {copied = -EAGAIN;break;}if (signal_pending(current)) {copied = sock_intr_errno(timeo);break;}}
然后就是根据已经复制的数据大小来清理receive队列中的数据,并且发送ACK给对端。然后就是给tcp_socket的ucopy域赋值,主要是iov域和task域。一个是数据区,一个是当前从属的进程。
-
- tcp_cleanup_rbuf(sk, copied);
-
- if (!sysctl_tcp_low_latency && tp->ucopy.task == user_recv) {
-
-
- if (!user_recv && !(flags & (MSG_TRUNC | MSG_PEEK))) {
-
- user_recv = current;
- tp->ucopy.task = user_recv;
- tp->ucopy.iov = msg->msg_iov;
- }
-
- tp->ucopy.len = len;
-
- WARN_ON(tp->copied_seq != tp->rcv_nxt &&
- !(flags & (MSG_PEEK | MSG_TRUNC)));
-
- if (!skb_queue_empty(&tp->ucopy.prequeue))
- goto do_prequeue;
-
-
- }
-
- if (copied >= target) {
-
- release_sock(sk);
- lock_sock(sk);
- } else
-
- sk_wait_data(sk, &timeo);
tcp_cleanup_rbuf(sk, copied);if (!sysctl_tcp_low_latency && tp->ucopy.task == user_recv) {///循环的第一次的话user_recv为空,因此给ucopy得想关域赋值。if (!user_recv && !(flags & (MSG_TRUNC | MSG_PEEK))) {//进程为当前进程。user_recv = current;tp->ucopy.task = user_recv;tp->ucopy.iov = msg->msg_iov;}///长度为还需拷贝的数据的长度。tp->ucopy.len = len;WARN_ON(tp->copied_seq != tp->rcv_nxt &&!(flags & (MSG_PEEK | MSG_TRUNC)));///如果prequeue不为空则跳到 do_prequeue,处理backlog队列。if (!skb_queue_empty(&tp->ucopy.prequeue))goto do_prequeue;/* __ Set realtime policy in scheduler __ */}///已经复制完毕,则开始拷贝back_log队列到receive队列。if (copied >= target) {/* Do not sleep, just process backlog. */release_sock(sk);lock_sock(sk);} else///否则进入休眠,等待数据的到来。sk_wait_data(sk, &timeo);
上面的分析中有release_sock函数,这个函数用来release这个sock,也就是对这个sock解除锁定。然后唤醒等待队列。
这里要注意,sock一共有两个等待队列,一个是sock的sk_sleep等待队列,这个等待队列用来等待数据的到来。一个是ucopy域的等待队列wq,这个表示等待使用这个sock。
- void release_sock(struct sock *sk)
- {
-
- mutex_release(&sk->sk_lock.dep_map, 1, _RET_IP_);
-
- spin_lock_bh(&sk->sk_lock.slock);
-
- if (sk->sk_backlog.tail)
- __release_sock(sk);
-
- sk->sk_lock.owned = 0;
-
- if (waitqueue_active(&sk->sk_lock.wq))
- wake_up(&sk->sk_lock.wq);
- spin_unlock_bh(&sk->sk_lock.slock);
- }
void release_sock(struct sock *sk){mutex_release(&sk->sk_lock.dep_map, 1, _RET_IP_);spin_lock_bh(&sk->sk_lock.slock);///如果backlog队列不为空,则调用__release_sock处理if (sk->sk_backlog.tail)__release_sock(sk);///处理完毕则给owened赋值为0.释放对这个sock的控制。sk->sk_lock.owned = 0;///唤醒wq上的所有元素。if (waitqueue_active(&sk->sk_lock.wq))wake_up(&sk->sk_lock.wq);spin_unlock_bh(&sk->sk_lock.slock);}
然后来看主要的处理函数__release_sock,它主要是遍历backlog队列,然后处理skb。这里它有两个循环,外部循环是遍历backlog,而内部循环是遍历skb(也就是数据)。
- static void __release_sock(struct sock *sk)
- {
- struct sk_buff *skb = sk->sk_backlog.head;
-
-
- do {
- sk->sk_backlog.head = sk->sk_backlog.tail = NULL;
- bh_unlock_sock(sk);
-
- do {
- struct sk_buff *next = skb->next;
-
- skb->next = NULL;
-
-
- sk_backlog_rcv(sk, skb);
- cond_resched_softirq();
-
- skb = next;
- } while (skb != NULL);
-
- bh_lock_sock(sk);
- } while ((skb = sk->sk_backlog.head) != NULL);
- }
static void __release_sock(struct sock *sk){struct sk_buff *skb = sk->sk_backlog.head;///遍历backlog队列。do {sk->sk_backlog.head = sk->sk_backlog.tail = NULL;bh_unlock_sock(sk);do {struct sk_buff *next = skb->next;skb->next = NULL;///这个函数我们知道最终会调tcp_v4_do_rcv.而在tcp_v4_do_rcv中,会把数据复制到receive_queue队列中。sk_backlog_rcv(sk, skb);cond_resched_softirq();skb = next;} while (skb != NULL);bh_lock_sock(sk);} while ((skb = sk->sk_backlog.head) != NULL);}
而当数据tp->ucopy.prequeue为空,并且所复制的数据不能达到所期望的值,此时我们进入sk_wait_data等待数据的到来。
- #define sk_wait_event(__sk, __timeo, condition) \
- ({int __rc; \
-
- release_sock(__sk); \
- __rc = condition;
-
- if (!__rc) {
-
- *(__timeo) = schedule_timeout(*(__timeo)); \
- } \
- lock_sock(__sk); \
- __rc = condition; \
- __rc; \
- })
-
-
-
- int sk_wait_data(struct sock *sk, long *timeo)
- {
- int rc;
- DEFINE_WAIT(wait);
-
- prepare_to_wait(sk->sk_sleep, &wait, TASK_INTERRUPTIBLE);
- set_bit(SOCK_ASYNC_WAITDATA, &sk->sk_socket->flags);
-
- rc = sk_wait_event(sk, timeo, !skb_queue_empty(&sk->sk_receive_queue));
- clear_bit(SOCK_ASYNC_WAITDATA, &sk->sk_socket->flags);
- finish_wait(sk->sk_sleep, &wait);
- return rc;
- }
#define sk_wait_event(__sk, __timeo, condition) ({int __rc; ///这个我们通过上面知道,会将数据复制到receive-queue队列。release_sock(__sk); __rc = condition;///当sk_wait_data调用时,rc是用来判断receive_queue是否为空的, if (!__rc) {///如果为空则会休眠等待,sk_sleep等待队列的唤醒。 *(__timeo) = schedule_timeout(*(__timeo)); } lock_sock(__sk); __rc = condition; __rc; })int sk_wait_data(struct sock *sk, long *timeo){int rc;DEFINE_WAIT(wait);///加入sk_sleep的等待队列prepare_to_wait(sk->sk_sleep, &wait, TASK_INTERRUPTIBLE);set_bit(SOCK_ASYNC_WAITDATA, &sk->sk_socket->flags);///处理事件rc = sk_wait_event(sk, timeo, !skb_queue_empty(&sk->sk_receive_queue));clear_bit(SOCK_ASYNC_WAITDATA, &sk->sk_socket->flags);finish_wait(sk->sk_sleep, &wait);return rc;}
接下来就是一些域的更新,以及处理prequeue队列:
-
-
- if (user_recv) {
- int chunk;
-
-
-
- if ((chunk = len - tp->ucopy.len) != 0) {
- NET_ADD_STATS_USER(sock_net(sk), LINUX_MIB_TCPDIRECTCOPYFROMBACKLOG, chunk);
- len -= chunk;
- copied += chunk;
- }
-
- if (tp->rcv_nxt == tp->copied_seq &&
- !skb_queue_empty(&tp->ucopy.prequeue)) {
- do_prequeue:
-
- tcp_prequeue_process(sk);
-
-
- if ((chunk = len - tp->ucopy.len) != 0) {
- NET_ADD_STATS_USER(sock_net(sk), LINUX_MIB_TCPDIRECTCOPYFROMPREQUEUE, chunk);
- len -= chunk;
- copied += chunk;
- }
- }
- }
- ...................................
- continue;
if (user_recv) {int chunk;/* __ Restore normal policy in scheduler __ *////这个判断主要是由于在release_sock中,有可能会将数据直接复制到用户空间了。此时我们需要更新len以及copied域。if ((chunk = len - tp->ucopy.len) != 0) {NET_ADD_STATS_USER(sock_net(sk), LINUX_MIB_TCPDIRECTCOPYFROMBACKLOG, chunk);len -= chunk;copied += chunk;}///tp->rcv_nxt == tp->copied_seq主要用来判断是否receive队列中还需要数据要执行吗(下面会说明为什么)。if (tp->rcv_nxt == tp->copied_seq &&!skb_queue_empty(&tp->ucopy.prequeue)) {do_prequeue:///执行prequeuetcp_prequeue_process(sk);///和上面一样,更新len和cpoied域。if ((chunk = len - tp->ucopy.len) != 0) {NET_ADD_STATS_USER(sock_net(sk), LINUX_MIB_TCPDIRECTCOPYFROMPREQUEUE, chunk);len -= chunk;copied += chunk;}}}...................................continue;
在分析tcp_prequeue_process之前,我们先来看下什么情况下release_sock会直接复制数据到用户空间。我们知道它最终会调用tcp_rcv_established函数,因此来看tcp_rcv_established的代码片断
内核接收到的数据包有可能不是正序的,可是内核传递给用户空间的数据必须是正序的,只有这样才能拷贝给用户空间。
-
- else {
- int eaten = 0;
- int copied_early = 0;
-
- if (tp->copied_seq == tp->rcv_nxt &&
- len - tcp_header_len <= tp->ucopy.len) {
- .........................
-
-
- if (tp->ucopy.task == current &&
- sock_owned_by_user(sk) && !copied_early) {
- __set_current_state(TASK_RUNNING);
-
- if (!tcp_copy_to_iovec(sk, skb, tcp_header_len))
- eaten = 1;
- }
-
- ...............................
else {int eaten = 0;int copied_early = 0;///判断从这里开始。copied_seq表示未读的skb的序列号。而rcv_nxt为我们所期望接收的下一个数据的序列号。这里我们是要保证字节流的正序。而第二个条件len - tcp_header_len <= tp->ucopy.len这个说明用户请求的数据还没有复制够。如果已经复制够了,则会复制数据到receive_queue队列。if (tp->copied_seq == tp->rcv_nxt &&len - tcp_header_len <= tp->ucopy.len) {.........................///然后判断从属的进程必须等于当前调用进程。并且必须为进程上下文。if (tp->ucopy.task == current &&sock_owned_by_user(sk) && !copied_early) {__set_current_state(TASK_RUNNING);///开始复制数据到用户空间。if (!tcp_copy_to_iovec(sk, skb, tcp_header_len))eaten = 1;}...............................
通过上面的判断条件我们很容易看出前面调用release_sock,为何有时将数据拷贝到用户空间,有时拷贝到receive队列。
ok,最后我们来看下tcp_prequeue_process的实现。它的实现很简单,就是遍历prequeue,然后处理buf。这里要注意,它会处理完所有的prequeue,也就是会清空prequeue.
- static void tcp_prequeue_process(struct sock *sk)
- {
- struct sk_buff *skb;
- struct tcp_sock *tp = tcp_sk(sk);
-
- NET_INC_STATS_USER(sock_net(sk), LINUX_MIB_TCPPREQUEUED);
-
-
-
- local_bh_disable();
-
- while ((skb = __skb_dequeue(&tp->ucopy.prequeue)) != NULL)
-
- sk_backlog_rcv(sk, skb);
- local_bh_enable();
-
- tp->ucopy.memory = 0;
- }
static void tcp_prequeue_process(struct sock *sk){struct sk_buff *skb;struct tcp_sock *tp = tcp_sk(sk);NET_INC_STATS_USER(sock_net(sk), LINUX_MIB_TCPPREQUEUED);/* RX process wants to run with disabled BHs, though it is not* necessary */local_bh_disable();///遍历并处理skb。while ((skb = __skb_dequeue(&tp->ucopy.prequeue)) != NULL)///最终会调用tcp_rcv_established.sk_backlog_rcv(sk, skb);local_bh_enable();///内存清空为0.tp->ucopy.memory = 0;}
最后简要的分析下数据如何复制到用户空间。这里的主要函数是skb_copy_datagram_iovec。最终都是通过这个函数复制到用户空间的。
我们知道内核存储数据有两种形式如果支持S/G IO的网卡,它会保存数据到skb_shinfo(skb)->frags(详见前面的blog),否则则会保存在skb的data区中。
因此这里也是分为两部分处理。
还有一个就是这里遍历frags也是遍历两次,第一次遍历是查找刚好
- int skb_copy_datagram_iovec(const struct sk_buff *skb, int offset,
- struct iovec *to, int len)
- {
- int start = skb_headlen(skb);
- int i, copy = start - offset;
- struct sk_buff *frag_iter;
-
- if (copy > 0) {
- if (copy > len)
- copy = len;
-
- if (memcpy_toiovec(to, skb->data + offset, copy))
- goto fault;
- if ((len -= copy) == 0)
- return 0;
- offset += copy;
- }
-
-
- for (i = 0; i < skb_shinfo(skb)->nr_frags; i++) {
- int end;
-
- WARN_ON(start > offset + len);
-
- end = start + skb_shinfo(skb)->frags[i].size;
-
- if ((copy = end - offset) > 0) {
- int err;
- u8 *vaddr;
- skb_frag_t *frag = &skb_shinfo(skb)->frags[i];
- struct page *page = frag->page;
-
- if (copy > len)
- copy = len;
-
- vaddr = kmap(page);
-
- err = memcpy_toiovec(to, vaddr + frag->page_offset +
- offset - start, copy);
- kunmap(page);
- if (err)
- goto fault;
-
- if (!(len -= copy))
- return 0;
-
- offset += copy;
- }
-
- start = end;
- }
-
-
- skb_walk_frags(skb, frag_iter) {
- int end;
-
- WARN_ON(start > offset + len);
-
- end = start + frag_iter->len;
- if ((copy = end - offset) > 0) {
- if (copy > len)
- copy = len;
-
- if (skb_copy_datagram_iovec(frag_iter,
- offset - start,
- to, copy))
- goto fault;
- if ((len -= copy) == 0)
- return 0;
- offset += copy;
- }
- start = end;
- }
- if (!len)
- return 0;
-
- fault:
- return -EFAULT;
- }