背景

最初学习计算机网络的时候,通常都是从 TCP/IP 网络分层模型入手,学习各种协议,如 TCP、IP等,以及相关的原理,并未过多关注整个协议栈具体是如何实现的。在开发的过程中,通过高级语言往往只需要几行就能实现一个简单的网络程序,并从网络接收数据。然而数据具体是如何从网卡达到我们的进程中的呢?在这个过程中网卡和内核到底又做了些什么呢?数据在这个过程中是如何流转、复制的呢?带着这些问题,笔者最近学习了下 Linux 网络数据包的接收,并总结如下。

通过阅读本文应该能够了解:

  1. Linux 网络栈是如何接收数据的,数据从网卡到内核各个部分是如何进行流转的;
  2. 网络栈在正式工作之前需要经过哪些初始化,这些初始化工作的目的是什么;
  3. 数据包从网卡到内核需要经过几次复制;
  4. 网络相关的硬中断、软中断分别是如何实现的,二者是如何配合的;
  5. ethtooltcpdumpiptables 等分别工作在何处,原理是什么。

本文基于内核 3.10 版本,网卡举例使用的是 igb 网卡。

系统初始化

Linux 系统在接收数据包之前需要做很多的准备工作,比如创建内核 ksoftirqd 线程,便于后续处理软中断;网络子系统初始化,注册各个协议的处理函数,便于后面的协议栈处理;网卡设备子系统初始化,便于后面从网卡接收数据包。这里首先对这些初始化的过程进行记录,其中重点是网卡设备的初始化。

内核线程 ksoftirqd 初始化

Linux 在接收网络数据的时候需要用到中断机制,包括硬中断和软中断。 其中 Linux 的软中断都是在内核线程 ksoftirq 中进行的,该线程在系统中有 N 个 (N=机器核数),线程被 CPU 调度。

系统初始化的时候创建了这一线程,之后它就进入到这自己的线程循环函数 ksoftirqd_should_runrun_ksoftirqd,判断有无中断需要处理。这里的中断不仅仅包括网络相关的软中断,还有其他的中断类型, 具体可以查看 Linux 的定义

关于Linux中断相关的原理与实现,推荐阅读下面这篇文章:

网络子系统初始化

Linux 内核通过 subsys_initcall调用来初始化各个子系统,这里我们来看一看网络子系统的初始化 net_dev_init

static int __init net_dev_init(void)
{
  ...
  ...
	for_each_possible_cpu(i) {
		struct softnet_data *sd = &per_cpu(softnet_data, i);

		memset(sd, 0, sizeof(*sd));
		skb_queue_head_init(&sd->input_pkt_queue);
		skb_queue_head_init(&sd->process_queue);
		sd->completion_queue = NULL;
		INIT_LIST_HEAD(&sd->poll_list);
		sd->output_queue = NULL;
		sd->output_queue_tailp = &sd->output_queue;
#ifdef CONFIG_RPS
		sd->csd.func = rps_trigger_softirq;
		sd->csd.info = sd;
		sd->csd.flags = 0;
		sd->cpu = i;
#endif

		sd->backlog.poll = process_backlog;
		sd->backlog.weight = weight_p;
		sd->backlog.gro_list = NULL;
		sd->backlog.gro_count = 0;
	}

	...
	open_softirq(NET_TX_SOFTIRQ, net_tx_action);
	open_softirq(NET_RX_SOFTIRQ, net_rx_action);

out:
	return rc;
}

subsys_initcall(net_dev_init);

网络子系统初始化的核心是为每个 CPU 都申请了一个 softnet_data数据结构,并且为网络中断注册了对应的处理函数,其中 NET_TX_SOFTIRQ 的处理函数是 net_tx_action, NET_RX_SOFTIRQ的处理函数是net_rx_action。中断和其对应的处理函数之间是如何对应的可以跟踪阅读 open_softirq

最后,来看一看这个 softnet_data 数据结构:

struct softnet_data {
    struct Qdisc		*output_queue;          // 指向输出队列的指针
    struct Qdisc		**output_queue_tailp;   // 指向输出队列尾部的指针
    struct list_head	poll_list;              // 用于轮询的链表
    struct sk_buff		*completion_queue;      // 指向完成队列的指针
    struct sk_buff_head	process_queue;          // 处理队列

    /* stats */
    unsigned int		processed;              // 已处理的数据包数量
    unsigned int		time_squeeze;           // 时间挤压计数
    unsigned int		cpu_collision;          // CPU冲突计数
    unsigned int		received_rps;           // 接收到的RPS(接收包调度)计数

#ifdef CONFIG_RPS
    struct softnet_data	*rps_ipi_list;          // RPS IPI(中断处理程序)列表

    /* Elements below can be accessed between CPUs for RPS */
    struct call_single_data	csd ____cacheline_aligned_in_smp; // 用于单个CPU调用的数据,缓存行对齐
    struct softnet_data	*rps_ipi_next;         // 下一个RPS IPI
    unsigned int		cpu;                    // CPU编号
    unsigned int		input_queue_head;       // 输入队列头部
    unsigned int		input_queue_tail;       // 输入队列尾部
#endif
    unsigned int		dropped;                // 丢弃的数据包数量
    struct sk_buff_head	input_pkt_queue;        // 输入数据包队列
    struct napi_struct	backlog;                // NAPI(新API)结构体,用于处理网络数据包的后台日志
};

协议栈注册

网络传输的过程中涉及到各种协议,每种协议也都有其对应的处理函数,如 IP 协议对应的是 ip_rcv(), TCP,UDP 协议对应的是 tcp_v4_rcvudp_rcv。这些协议的处理函数也是通过内核注册的,内核在初始化的时候通过 fs_initcall调用 inet_init进行网络协议栈的注册。

具体来说,inet_init 中调用 inet_add_protocol 将 udp 和 tcp 的处理函数注册到了 inet_protos 数组中,将 ip 处理函数注册到了 ptype_base 哈希表中。后面软中断中以及对应的调用中就是通过这些数据结构找到对应的处理函数的。

网卡设备初始化

每个驱动程序会通过 module_init 向内核注册一个初始化函数。驱动被加载的时候,这个初始化函数会被调用,调用完成内核就知道了驱动的相关信息,比如驱动的 name, 驱动注册的 probe 函数。网卡被识别之后,内核就会调用 probe 函数。probe 做的事情因厂商和设备而异,其目的是为了让设备 ready,典型的过程包括:

  • PCI 设备启动
  • 设置 DMA 掩码
  • 注册设备驱动支持的 ethtool

例如, intel 系列的 igb 网卡igb_probe过程主要分为这样几个步骤:

  1. 启用 PCI 设备并设置 DMA 掩码;
  2. 请求 PCI 设备的内存资源;
  3. 设置网络设备操作函数 netdev->netdev_ops = &igb_netdev_ops;, (netdev);igb_set_ethtool_ops;
  4. 注册 ethtool 处理函数;
  5. 复制 MAC 地址等信息.

其中第5步,就是 ethtool 工具的注册,使用该工具的时候,内核就会找到对应的回调函数,对于 igb 网卡设备来说,函数的实现在 drivers/net/ethernet/intel/igb/igb_ethtool.c 下。第3步,网络设备操作函数注册中包含了 igb_open等函数,会在网卡启动的时候调用。

igb_probe 调用的过程中还会通过 igb_alloc_q_vector注册一个 NAPI 机制必须的 poll 函数,对于 igb 网卡设备来说,这个函数就是 igb_poll

网卡启动

完成了上面的初始化之后,就可以启动网卡了。在上面网卡设备的初始化过程中,驱动向内核注册了 netdev_ops 变量,启动网卡的时候,其中 .ndo_open上指向的 igb_open方法就会被调用。

static const struct net_device_ops igb_netdev_ops = {
	.ndo_open		= igb_open,
	.ndo_stop		= igb_close,
	.ndo_start_xmit		= igb_xmit_frame,
	.ndo_get_stats64	= igb_get_stats64,
	.ndo_set_rx_mode	= igb_set_rx_mode,
	.ndo_set_mac_address	= igb_set_mac,
	.ndo_change_mtu		= igb_change_mtu,
	.ndo_do_ioctl		= igb_ioctl,
	.ndo_tx_timeout		= igb_tx_timeout,
	.ndo_validate_addr	= eth_validate_addr,
	.ndo_vlan_rx_add_vid	= igb_vlan_rx_add_vid,
	.ndo_vlan_rx_kill_vid	= igb_vlan_rx_kill_vid,
	.ndo_set_vf_mac		= igb_ndo_set_vf_mac,
	.ndo_set_vf_vlan	= igb_ndo_set_vf_vlan,
	.ndo_set_vf_tx_rate	= igb_ndo_set_vf_bw,
	.ndo_set_vf_spoofchk	= igb_ndo_set_vf_spoofchk,
	.ndo_get_vf_config	= igb_ndo_get_vf_config,
#ifdef CONFIG_NET_POLL_CONTROLLER
	.ndo_poll_controller	= igb_netpoll,
#endif
	.ndo_fix_features	= igb_fix_features,
	.ndo_set_features	= igb_set_features,
};

在这个函数中会分配传输/接收描述符数组,注册中断处理函数,并启用 NAPI。

/**
 *  igb_open - Called when a network interface is made active
 *  @netdev: network interface device structure
 *
 *  Returns 0 on success, negative value on failure
 *
 *  The open entry point is called when a network interface is made
 *  active by the system (IFF_UP).  At this point all resources needed
 *  for transmit and receive operations are allocated, the interrupt
 *  handler is registered with the OS, the watchdog timer is started,
 *  and the stack is notified that the interface is ready.
 **/
static int __igb_open(struct net_device *netdev, bool resuming)
{
	struct igb_adapter *adapter = netdev_priv(netdev);
	struct e1000_hw *hw = &adapter->hw;
	struct pci_dev *pdev = adapter->pdev;
	int err;
	int i;

	/* disallow open during test */
	if (test_bit(__IGB_TESTING, &adapter->state)) {
		WARN_ON(resuming);
		return -EBUSY;
	}

	if (!resuming)
		pm_runtime_get_sync(&pdev->dev);

	netif_carrier_off(netdev);

	/* allocate transmit descriptors */
  // 分配传输描述符数组
	err = igb_setup_all_tx_resources(adapter);
	if (err)
		goto err_setup_tx;

	/* allocate receive descriptors */
  // 分配接收描述符数组
	err = igb_setup_all_rx_resources(adapter);
	if (err)
		goto err_setup_rx;

	igb_power_up_link(adapter);

	/* before we allocate an interrupt, we must be ready to handle it.
	 * Setting DEBUG_SHIRQ in the kernel makes it fire an interrupt
	 * as soon as we call pci_request_irq, so we have to setup our
	 * clean_rx handler before we do so.
	 */
	igb_configure(adapter);

  // 注册中断处理函数
	err = igb_request_irq(adapter);
	if (err)
		goto err_req_irq;

	/* Notify the stack of the actual queue counts. */
	err = netif_set_real_num_tx_queues(adapter->netdev,
					   adapter->num_tx_queues);
	if (err)
		goto err_set_queues;

	err = netif_set_real_num_rx_queues(adapter->netdev,
					   adapter->num_rx_queues);
	if (err)
		goto err_set_queues;

	/* From here on the code is the same as igb_up() */
	clear_bit(__IGB_DOWN, &adapter->state);


  // 启用 NAPI
	for (i = 0; i < adapter->num_q_vectors; i++)
		napi_enable(&(adapter->q_vector[i]->napi));

	/* Clear any pending interrupts. */
	rd32(E1000_ICR);

	igb_irq_enable(adapter);

	/* notify VFs that reset has been completed */
	if (adapter->vfs_allocated_count) {
		u32 reg_data = rd32(E1000_CTRL_EXT);
		reg_data |= E1000_CTRL_EXT_PFRSTD;
		wr32(E1000_CTRL_EXT, reg_data);
	}

	netif_tx_start_all_queues(netdev);

	if (!resuming)
		pm_runtime_put(&pdev->dev);

	/* start the watchdog. */
	hw->mac.get_link_status = 1;
	schedule_work(&adapter->watchdog_task);

	return 0;

err_set_queues:
	igb_free_irq(adapter);
err_req_irq:
	igb_release_hw_control(adapter);
	igb_power_down_link(adapter);
	igb_free_all_rx_resources(adapter);
err_setup_rx:
	igb_free_all_tx_resources(adapter);
err_setup_tx:
	igb_reset(adapter);
	if (!resuming)
		pm_runtime_put(&pdev->dev);

	return err;
}

下面是描述符数组的具体创建过程:

// https://github.com/torvalds/linux/blob/8bb495e3f02401ee6f76d1b1d77f3ac9f079e376/drivers/net/ethernet/intel/igb/igb_main.c#L3063-L3086
static int igb_setup_all_rx_resources(struct igb_adapter *adapter)
{
	struct pci_dev *pdev = adapter->pdev;
	int i, err = 0;

	for (i = 0; i < adapter->num_rx_queues; i++) {
		err = igb_setup_rx_resources(adapter->rx_ring[i]);
		if (err) {
			dev_err(&pdev->dev,
				"Allocation for Rx Queue %u failed\n", i);
			for (i--; i >= 0; i--)
				igb_free_rx_resources(adapter->rx_ring[i]);
			break;
		}
	}

	return err;
}
// https://github.com/torvalds/linux/blob/8bb495e3f02401ee6f76d1b1d77f3ac9f079e376/drivers/net/ethernet/intel/igb/igb_main.c#L3023-L3060
/**
 *  igb_setup_rx_resources - allocate Rx resources (Descriptors)
 *  @rx_ring: Rx descriptor ring (for a specific queue) to setup
 *
 *  Returns 0 on success, negative on failure
 **/
int igb_setup_rx_resources(struct igb_ring *rx_ring)
{
	struct device *dev = rx_ring->dev;
	int size;

	size = sizeof(struct igb_rx_buffer) * rx_ring->count;

	rx_ring->rx_buffer_info = vzalloc(size);
	if (!rx_ring->rx_buffer_info)
		goto err;

	/* Round up to nearest 4K */
	rx_ring->size = rx_ring->count * sizeof(union e1000_adv_rx_desc);
	rx_ring->size = ALIGN(rx_ring->size, 4096);

	rx_ring->desc = dma_alloc_coherent(dev, rx_ring->size,
					   &rx_ring->dma, GFP_KERNEL);
	if (!rx_ring->desc)
		goto err;

	rx_ring->next_to_alloc = 0;
	rx_ring->next_to_clean = 0;
	rx_ring->next_to_use = 0;

	return 0;

err:
	vfree(rx_ring->rx_buffer_info);
	rx_ring->rx_buffer_info = NULL;
	dev_err(dev, "Unable to allocate memory for the Rx descriptor ring\n");
	return -ENOMEM;
}

最后看下中断函数的注册igb_request_irq(adapter), 顺着调用链,可以看到,多队列网卡会为每个队列注册中断,对应的中断处理函数是 igb_mix_ring.

/**
 *  igb_request_msix - Initialize MSI-X interrupts
 *  @adapter: board private structure to initialize
 *
 *  igb_request_msix allocates MSI-X vectors and requests interrupts from the
 *  kernel.
 **/
static int igb_request_msix(struct igb_adapter *adapter)
{
	struct net_device *netdev = adapter->netdev;
	struct e1000_hw *hw = &adapter->hw;
	int i, err = 0, vector = 0, free_vector = 0;

	err = request_irq(adapter->msix_entries[vector].vector,
			  igb_msix_other, 0, netdev->name, adapter);
	if (err)
		goto err_out;

	for (i = 0; i < adapter->num_q_vectors; i++) {
		struct igb_q_vector *q_vector = adapter->q_vector[i];

		vector++;

		q_vector->itr_register = hw->hw_addr + E1000_EITR(vector);

		...
		...

		err = request_irq(adapter->msix_entries[vector].vector,
				  igb_msix_ring, 0, q_vector->name,
				  q_vector);
		if (err)
			goto err_free;
	}

	igb_configure_msix(adapter);
	return 0;

err_free:
	/* free already assigned IRQs */
	free_irq(adapter->msix_entries[free_vector++].vector, adapter);

	vector--;
	for (i = 0; i < vector; i++) {
		free_irq(adapter->msix_entries[free_vector++].vector,
			 adapter->q_vector[i]);
	}
err_out:
	return err;
}

数据接收

网卡收包

当上面的一切初始化完成之后就网卡就可以收包了,数据从网线进入网卡,通过 DMA 直接将数据写到 Ring Buffer,这里涉及到第一次数据复制:从网卡通过DMA将数据复制到 Ring Buffer。当DMA完成之后,网卡会向 CPU 发一个硬中断,通知 CPU 有数据到达。

硬中断处理

CPU 收到网卡发起硬中断之后,就会调用驱动注册的硬中断处理函数。对于 igb 网卡,这个处理函数就是在网卡启动这一节最后提到的 igb_mix_ring

static irqreturn_t igb_msix_ring(int irq, void *data)
{
	struct igb_q_vector *q_vector = data;

	/* Write the ITR value calculated from the previous interrupt. */
	igb_write_itr(q_vector);

	napi_schedule(&q_vector->napi);

	return IRQ_HANDLED;
}

这里顺着调用链看,最后是在 ____napi_schedule

// https://github.com/torvalds/linux/blob/8bb495e3f02401ee6f76d1b1d77f3ac9f079e376/net/core/dev.c#L4065-L4077
void __napi_schedule(struct napi_struct *n)
{
	unsigned long flags;

	local_irq_save(flags);
	____napi_schedule(&__get_cpu_var(softnet_data), n);
	local_irq_restore(flags);
}
// https://github.com/torvalds/linux/blob/8bb495e3f02401ee6f76d1b1d77f3ac9f079e376/net/core/dev.c#L2874-L2879
/* Called with irq disabled */
static inline void ____napi_schedule(struct softnet_data *sd,
				     struct napi_struct *napi)
{
	list_add_tail(&napi->poll_list, &sd->poll_list);
	__raise_softirq_irqoff(NET_RX_SOFTIRQ);
}

____napi_schedule 中主要完成了两件事:

  1. 将从驱动中传来的 napi_struct 添加到一个 poll list,这个poll list 是 attach 在 CPU 变量 softness_data 上的;
  2. 触发一个 NET_RX_SOFTIRQ 类型的软中断。

这里可以看到驱动的硬中断处理函数做的事情很少,因为硬中断的一个目标就是要快,但软中断将会在和硬中断相同的 CPU 上执行。这就 是为什么给每个 CPU 一个特定的硬中断非常重要:这个 CPU 不仅处理这个硬中断,而且通 过 NAPI 处理接下来的软中断来收包。

软中断处理

内核的软中断都是在 ksoftirqd 内核线程中处理的,介绍 ksoftirqd线程初始化的时候提到过它的循环函数 ksoftirqd_should_runrun_ksoftirqd

// https://github.com/torvalds/linux/blob/8bb495e3f02401ee6f76d1b1d77f3ac9f079e376/kernel/softirq.c#L758C1-L761C2
static int ksoftirqd_should_run(unsigned int cpu)
{
	return local_softirq_pending();
}
// https://github.com/torvalds/linux/blob/8bb495e3f02401ee6f76d1b1d77f3ac9f079e376/kernel/softirq.c#L763-L774
static void run_ksoftirqd(unsigned int cpu)
{
	local_irq_disable();              // 关闭所在 CPU 的所有硬中断
	if (local_softirq_pending()) {
		__do_softirq();               // 重新打开所在 CPU 的所有硬中断
		rcu_note_context_switch(cpu);
		local_irq_enable();
		cond_resched();               // 将 CPU 交还给调度器
		return;
	}
	local_irq_enable();               // 重新打开所在 CPU 的所有硬中断
}

这里如果硬中断写入了标记,那么在 local_softirq_pending 就能够读取到。在 __do_softirq 中就会根据当前 CPU 的软中断类型,调用注册的 action 方法。对于接收数据包来说就是 net_rx_action

软中断处理函数 net_rx_action

软中断处理函数 net_rx_action 的核心逻辑是从当前 CPU 获取 softnet_data 变量,对其 poll_list进行遍历,执行网卡驱动注册的 poll 函数,例如对于 igb 网卡就是 igb_poll

// https://github.com/torvalds/linux/blob/8bb495e3f02401ee6f76d1b1d77f3ac9f079e376/net/core/dev.c#L4149-L4242
static void net_rx_action(struct softirq_action *h)
{
	struct softnet_data *sd = &__get_cpu_var(softnet_data);  // 该 CPU 的 softnet_data 统计
	unsigned long time_limit = jiffies + 2;                  // 该 CPU 的所有 NAPI 变量的总 time limit
	unsigned long time_limit = jiffies + 2;                  // 该 CPU 的所有 NAPI 变量的总 time limit
	int budget = netdev_budget;
	void *have;


	local_irq_disable();


	while (!list_empty(&sd->poll_list)) {
		struct napi_struct *n;
		int work, weight;


		/* If softirq window is exhuasted then punt.
		 * Allow this to run for 2 jiffies since which will allow
		 * an average latency of 1.5/HZ.
		 */
		if (unlikely(budget <= 0 || time_after_eq(jiffies, time_limit)))
			goto softnet_break;


		local_irq_enable();


		/* Even though interrupts have been re-enabled, this
		 * access is safe because interrupts can only add new
		 * entries to the tail of this list, and only ->poll()
		 * calls can remove this head entry from the list.
		 */
		n = list_first_entry(&sd->poll_list, struct napi_struct, poll_list);


		have = netpoll_poll_lock(n);


		weight = n->weight;


		/* This NAPI_STATE_SCHED test is for avoiding a race
		 * with netpoll's poll_napi().  Only the entity which
		 * obtains the lock and sees NAPI_STATE_SCHED set will
		 * actually make the ->poll() call.  Therefore we avoid
		 * accidentally calling ->poll() when NAPI is not scheduled.
		 */
		work = 0;
		if (test_bit(NAPI_STATE_SCHED, &n->state)) {
			work = n->poll(n, weight);
			trace_napi_poll(n);
		}


		WARN_ON_ONCE(work > weight);


		budget -= work;


		local_irq_disable();


		/* Drivers must not modify the NAPI state if they
		 * consume the entire weight.  In such cases this code
		 * still "owns" the NAPI instance and therefore can
		 * move the instance around on the list at-will.
		 */
		if (unlikely(work == weight)) {
			if (unlikely(napi_disable_pending(n))) {
				local_irq_enable();
				napi_complete(n);
				local_irq_disable();
			} else {
				if (n->gro_list) {
					/* flush too old packets
					 * If HZ < 1000, flush all packets.
					 */
					local_irq_enable();
					napi_gro_flush(n, HZ >= 1000);
					local_irq_disable();
				}
				list_move_tail(&n->poll_list, &sd->poll_list);
			}
		}


		netpoll_poll_unlock(have);
	}
out:
	net_rps_action_and_irq_enable(sd);


#ifdef CONFIG_NET_DMA
	/*
	 * There may not be any more sk_buffs coming right now, so push
	 * any pending DMA copies to hardware
	 */
	dma_issue_pending_all();
#endif


	return;


softnet_break:
	sd->time_squeeze++;
	__raise_softirq_irqoff(NET_RX_SOFTIRQ);
	goto out;
}

驱动 poll 函数处理 —— RingBuffer 取下数据帧

网卡 poll 函数的核心就是将数据帧从 RingBuffer 上取下来,然后初始化一个 struct sk_buff *skb 结构体变量,也就是我们最常打交道的内核协议栈中的数据包。这里也涉及到第二次数据复制:从 Ring buffer 复制到 skb 结构体

收取完数据后,会对其进行一些校验, 然后开始设置 sbk 变量的 time_stamp, VLAN id,protocol 等字段。接下来会进入 napi_gro_receive(),这个函数代表的是网卡 GRO 特性,GRO

之后的调用链是 napi_gro_receive() -> napi_skb_finish -> netif_receive_skb,在 netif_receive_skb 中数据被送往协议栈。

顺着调用链可以看到关键的处理逻辑如下,这里记录两个点:

一是我们常用的 tcpdump 抓包点就在这个函数中,它将抓包函数以协议的方式挂在 ptype_all上,设备层遍历所有的协议就能进行抓包了。

二是在此处会从 skb 中取出协议,根据协议类型送往具体的处理函数中。这里这个函数是如何找到的呢?还记得在之前介绍协议初始化的时候提到过,协议的处理的函数是注册在数据结构中的,具体来说这里的 ptype_base 是一个哈希表,ip_rcv 函数地址就在这个函数中。这里在对应的位置顺着调用栈 __netif_receive_skb_core() --> deliver_skb() 可以看到最后有调用 pt_prev->func(skb, skb->dev, pt_prev, orig_dev); 就是调用的协议栈的处理函数。对于 IP 包就会被送完 ip_rcv 中处理。

ip_rcv 的核心实际上就是最后的 NF_HOOK(NFPROTO_IPV4, NF_INET_PRE_ROUTING, skb, dev, NULL,ip_rcv_finish);它在最后会以 netfilter hook 的方式调用 ip_rcv_finish() 方法。 这样任何 iptables 规则都能在 packet 刚进入 IP 层协议的时候被应用。

static int __netif_receive_skb_core(struct sk_buff *skb, bool pfmemalloc)
{
	struct packet_type *ptype, *pt_prev;
	rx_handler_func_t *rx_handler;
	struct net_device *orig_dev;
	struct net_device *null_or_dev;
	bool deliver_exact = false;
	int ret = NET_RX_DROP;
	__be16 type;

	net_timestamp_check(!netdev_tstamp_prequeue, skb);

	trace_netif_receive_skb(skb);

	/* if we've gotten here through NAPI, check netpoll */
	if (netpoll_receive_skb(skb))
		goto out;

	orig_dev = skb->dev;

	skb_reset_network_header(skb);
	if (!skb_transport_header_was_set(skb))
		skb_reset_transport_header(skb);
	skb_reset_mac_len(skb);

	pt_prev = NULL;

	rcu_read_lock();

another_round:
	skb->skb_iif = skb->dev->ifindex;

	__this_cpu_inc(softnet_data.processed);

	if (skb->protocol == cpu_to_be16(ETH_P_8021Q) ||
	    skb->protocol == cpu_to_be16(ETH_P_8021AD)) {
		skb = vlan_untag(skb);
		if (unlikely(!skb))
			goto unlock;
	}

#ifdef CONFIG_NET_CLS_ACT
	if (skb->tc_verd & TC_NCLS) {
		skb->tc_verd = CLR_TC_NCLS(skb->tc_verd);
		goto ncls;
	}
#endif

	if (pfmemalloc)
		goto skip_taps;

	list_for_each_entry_rcu(ptype, &ptype_all, list) {  // 抓包
		if (!ptype->dev || ptype->dev == skb->dev) {
			if (pt_prev)
				ret = deliver_skb(skb, pt_prev, orig_dev);
			pt_prev = ptype;
		}
	}

skip_taps:
#ifdef CONFIG_NET_CLS_ACT
	skb = handle_ing(skb, &pt_prev, &ret, orig_dev);
	if (!skb)
		goto unlock;
ncls:
#endif

	if (pfmemalloc && !skb_pfmemalloc_protocol(skb))
		goto drop;

	if (vlan_tx_tag_present(skb)) {
		if (pt_prev) {
			ret = deliver_skb(skb, pt_prev, orig_dev);
			pt_prev = NULL;
		}
		if (vlan_do_receive(&skb))
			goto another_round;
		else if (unlikely(!skb))
			goto unlock;
	}

	rx_handler = rcu_dereference(skb->dev->rx_handler);
	if (rx_handler) {
		if (pt_prev) {
			ret = deliver_skb(skb, pt_prev, orig_dev);
			pt_prev = NULL;
		}
		switch (rx_handler(&skb)) {
		case RX_HANDLER_CONSUMED:
			ret = NET_RX_SUCCESS;
			goto unlock;
		case RX_HANDLER_ANOTHER:
			goto another_round;
		case RX_HANDLER_EXACT:
			deliver_exact = true;
		case RX_HANDLER_PASS:
			break;
		default:
			BUG();
		}
	}

	if (vlan_tx_nonzero_tag_present(skb))
		skb->pkt_type = PACKET_OTHERHOST;

	/* deliver only exact match when indicated */
	null_or_dev = deliver_exact ? skb->dev : NULL;

	type = skb->protocol;
	list_for_each_entry_rcu(ptype,
			&ptype_base[ntohs(type) & PTYPE_HASH_MASK], list) {             // 根据协议送往协议栈处理
		if (ptype->type == type &&
		    (ptype->dev == null_or_dev || ptype->dev == skb->dev ||
		     ptype->dev == orig_dev)) {
			if (pt_prev)
				ret = deliver_skb(skb, pt_prev, orig_dev);
			pt_prev = ptype;
		}
	}

	if (pt_prev) {
		if (unlikely(skb_orphan_frags(skb, GFP_ATOMIC)))
			goto drop;
		else
			ret = pt_prev->func(skb, skb->dev, pt_prev, orig_dev);
	} else {
drop:
		atomic_long_inc(&skb->dev->rx_dropped);
		kfree_skb(skb);
		/* Jamal, now you will not able to escape explaining
		 * me how you were going to use this. :-)
		 */
		ret = NET_RX_DROP;
	}

unlock:
	rcu_read_unlock();
out:
	return ret;
}

协议栈处理

L3 层协议处理 (IPv4)

根据前面的介绍根据 ptype_base 能够找到协议对应的处理函数,对于 IP 协议包就会被送往 ip_rcv 函数中处理。

/*
 * 	Main IP Receive routine.
 */
int ip_rcv(struct sk_buff *skb, struct net_device *dev, struct packet_type *pt, struct net_device *orig_dev)
{
	const struct iphdr *iph;
	u32 len;


	/* When the interface is in promisc. mode, drop all the crap
	 * that it receives, do not try to analyse it.
	 */
	if (skb->pkt_type == PACKET_OTHERHOST)
		goto drop;




	IP_UPD_PO_STATS_BH(dev_net(dev), IPSTATS_MIB_IN, skb->len);

	// 更新统计信息
	if ((skb = skb_share_check(skb, GFP_ATOMIC)) == NULL) {
		IP_INC_STATS_BH(dev_net(dev), IPSTATS_MIB_INDISCARDS);
		goto out;
	}


	if (!pskb_may_pull(skb, sizeof(struct iphdr)))
		goto inhdr_error;


	iph = ip_hdr(skb);

	// 校验
	/*
	 *	RFC1122: 3.2.1.2 MUST silently discard any IP frame that fails the checksum.
	 *
	 *	Is the datagram acceptable?
	 *
	 *	1.	Length at least the size of an ip header
	 *	2.	Version of 4
	 *	3.	Checksums correctly. [Speed optimisation for later, skip loopback checksums]
	 *	4.	Doesn't have a bogus length
	 */


	if (iph->ihl < 5 || iph->version != 4)
		goto inhdr_error;


	if (!pskb_may_pull(skb, iph->ihl*4))
		goto inhdr_error;


	iph = ip_hdr(skb);


	if (unlikely(ip_fast_csum((u8 *)iph, iph->ihl)))
		goto csum_error;


	len = ntohs(iph->tot_len);
	if (skb->len < len) {
		IP_INC_STATS_BH(dev_net(dev), IPSTATS_MIB_INTRUNCATEDPKTS);
		goto drop;
	} else if (len < (iph->ihl*4))
		goto inhdr_error;


	/* Our transport medium may have padded the buffer out. Now we know it
	 * is IP we can trim to the true length of the frame.
	 * Note this now means skb->len holds ntohs(iph->tot_len).
	 */
	if (pskb_trim_rcsum(skb, len)) {
		IP_INC_STATS_BH(dev_net(dev), IPSTATS_MIB_INDISCARDS);
		goto drop;
	}


	/* Remove any debris in the socket control block */
	memset(IPCB(skb), 0, sizeof(struct inet_skb_parm));


	/* Must drop socket now because of tproxy. */
	skb_orphan(skb);


	return NF_HOOK(NFPROTO_IPV4, NF_INET_PRE_ROUTING, skb, dev, NULL,
		       ip_rcv_finish);


csum_error:
	IP_INC_STATS_BH(dev_net(dev), IPSTATS_MIB_CSUMERRORS);
inhdr_error:
	IP_INC_STATS_BH(dev_net(dev), IPSTATS_MIB_INHDRERRORS);
drop:
	kfree_skb(skb);
out:
	return NET_RX_DROP;
}

ip_rcv 中会计算统计信息,进行校验,在最后会以 netfilter hook 的方式调用 ip_rcv_finish() 方法, iptables 规则就都能在 packet 刚进入 IP 层协议的时候被应用。在 netfilter 中完成对数据的处理之后, 如果数据没有被丢掉,就会调用 ip_rcv_finish

ip_rcv_finish 为了能将包送到合适的目的地,需要一个路由 子系统的 dst_entry 变量。路由子系统完成工作后,会更新计数器,然后调用 dst_input(skb),后者会进一步调用 dst_entry 变量中的 input 方法,这个方法是一个函数指针,由路由子系统初始化。例如 ,如果 packet 的最终目的地是本机,路由子系统会将 ip_local_deliver 赋 给 input。

int ip_local_deliver(struct sk_buff *skb)
{
	/*
	 *	Reassemble IP fragments.
	 */


	if (ip_is_fragment(ip_hdr(skb))) {
		if (ip_defrag(skb, IP_DEFRAG_LOCAL_DELIVER))
			return 0;
	}


	return NF_HOOK(NFPROTO_IPV4, NF_INET_LOCAL_IN, skb, skb->dev, NULL,
		       ip_local_deliver_finish);
}

ip_local_deliver 的处理逻辑和之前的类似,如果包没有被丢弃,那么会调用 ip_local_deliver_finish 函数。

ip_local_deliver_finish 中会获取更上一层的处理协议,在上面介绍协议栈的注册的时候提到过,tcp 和 udp 的协议处理函数就注册在 inet_protos 中。这样通过调用相应的函数就能够将 skb 包进一步派送到更上层的协议中。

// https://github.com/torvalds/linux/blob/8bb495e3f02401ee6f76d1b1d77f3ac9f079e376/net/ipv4/ip_input.c#L189-L242
static int ip_local_deliver_finish(struct sk_buff *skb)
{
	struct net *net = dev_net(skb->dev);


	__skb_pull(skb, ip_hdrlen(skb));


	/* Point into the IP datagram, just past the header. */
	skb_reset_transport_header(skb);


	rcu_read_lock();
	{
		int protocol = ip_hdr(skb)->protocol;
		const struct net_protocol *ipprot;
		int raw;


	resubmit:
		raw = raw_local_deliver(skb, protocol);


		ipprot = rcu_dereference(inet_protos[protocol]);
		if (ipprot != NULL) {
			int ret;


			if (!ipprot->no_policy) {
				if (!xfrm4_policy_check(NULL, XFRM_POLICY_IN, skb)) {
					kfree_skb(skb);
					goto out;
				}
				nf_reset(skb);
			}
			ret = ipprot->handler(skb);
			if (ret < 0) {
				protocol = -ret;
				goto resubmit;
			}
			IP_INC_STATS_BH(net, IPSTATS_MIB_INDELIVERS);
		} else {
			if (!raw) {
				if (xfrm4_policy_check(NULL, XFRM_POLICY_IN, skb)) {
					IP_INC_STATS_BH(net, IPSTATS_MIB_INUNKNOWNPROTOS);
					icmp_send(skb, ICMP_DEST_UNREACH,
						  ICMP_PROT_UNREACH, 0);
				}
				kfree_skb(skb);
			} else {
				IP_INC_STATS_BH(net, IPSTATS_MIB_INDELIVERS);
				consume_skb(skb);
			}
		}
	}
 out:
	rcu_read_unlock();


	return 0;
}

L4 层协议处理 (TCP/UDP)

L3 层处理之后会将 skb 送往更上一层的 L4 层进行处理,此处将以为 udp 为例进行介绍,TCP 相关的内容更加负责。

在介绍协议层初始化的时候提到过,UDP 注册的协议处理函数是 udp_rcv, 这个函数非常简单,只有一行,调用 __udp4_lib_rcv() 接收 UDP 报文, 其中指定了协议类型是 IPPROTO_UDP;这是因为 __udp4_lib_rcv() 封装了两种 UDP 协议的处理。

// https://github.com/torvalds/linux/blob/8bb495e3f02401ee6f76d1b1d77f3ac9f079e376/net/ipv4/udp.c#L1765-L1768

int udp_rcv(struct sk_buff *skb)
{
	return __udp4_lib_rcv(skb, &udp_table, IPPROTO_UDP);
}

__udp4_lib_rcv 函数用于处理接收到的UDP数据包。它验证数据包的有效性,检查校验和,并将数据包传递给相应的套接字。如果没有找到匹配的套接字,则丢弃数据包或发送ICMP错误消息。

https://github.com/torvalds/linux/blob/8bb495e3f02401ee6f76d1b1d77f3ac9f079e376/net/ipv4/udp.c#L1671-L1763
int __udp4_lib_rcv(struct sk_buff *skb, struct udp_table *udptable,
		   int proto)
{
	struct sock *sk;
	struct udphdr *uh;
	unsigned short ulen;
	struct rtable *rt = skb_rtable(skb);
	__be32 saddr, daddr;
	struct net *net = dev_net(skb->dev);


	/*
	 *  Validate the packet.
	 */
	if (!pskb_may_pull(skb, sizeof(struct udphdr)))
		goto drop;		/* No space for header. */


	uh   = udp_hdr(skb);
	ulen = ntohs(uh->len);
	saddr = ip_hdr(skb)->saddr;
	daddr = ip_hdr(skb)->daddr;


	if (ulen > skb->len)
		goto short_packet;


	if (proto == IPPROTO_UDP) {
		/* UDP validates ulen. */
		if (ulen < sizeof(*uh) || pskb_trim_rcsum(skb, ulen))
			goto short_packet;
		uh = udp_hdr(skb);
	}


	if (udp4_csum_init(skb, uh, proto))
		goto csum_error;


	if (rt->rt_flags & (RTCF_BROADCAST|RTCF_MULTICAST))
		return __udp4_lib_mcast_deliver(net, skb, uh,
				saddr, daddr, udptable);


	sk = __udp4_lib_lookup_skb(skb, uh->source, uh->dest, udptable);


	if (sk != NULL) {
		int ret = udp_queue_rcv_skb(sk, skb);
		sock_put(sk);


		/* a return value > 0 means to resubmit the input, but
		 * it wants the return to be -protocol, or 0
		 */
		if (ret > 0)
			return -ret;
		return 0;
	}


	if (!xfrm4_policy_check(NULL, XFRM_POLICY_IN, skb))
		goto drop;
	nf_reset(skb);


	/* No socket. Drop packet silently, if checksum is wrong */
	if (udp_lib_checksum_complete(skb))
		goto csum_error;


	UDP_INC_STATS_BH(net, UDP_MIB_NOPORTS, proto == IPPROTO_UDPLITE);
	icmp_send(skb, ICMP_DEST_UNREACH, ICMP_PORT_UNREACH, 0);


	/*
	 * Hmm.  We got an UDP packet to a port to which we
	 * don't wanna listen.  Ignore it.
	 */
	kfree_skb(skb);
	return 0;


short_packet:
	LIMIT_NETDEBUG(KERN_DEBUG "UDP%s: short packet: From %pI4:%u %d/%d to %pI4:%u\n",
		       proto == IPPROTO_UDPLITE ? "Lite" : "",
		       &saddr, ntohs(uh->source),
		       ulen, skb->len,
		       &daddr, ntohs(uh->dest));
	goto drop;


csum_error:
	/*
	 * RFC1122: OK.  Discards the bad packet silently (as far as
	 * the network is concerned, anyway) as per 4.1.3.4 (MUST).
	 */
	LIMIT_NETDEBUG(KERN_DEBUG "UDP%s: bad checksum. From %pI4:%u to %pI4:%u ulen %d\n",
		       proto == IPPROTO_UDPLITE ? "Lite" : "",
		       &saddr, ntohs(uh->source), &daddr, ntohs(uh->dest),
		       ulen);
	UDP_INC_STATS_BH(net, UDP_MIB_CSUMERRORS, proto == IPPROTO_UDPLITE);
drop:
	UDP_INC_STATS_BH(net, UDP_MIB_INERRORS, proto == IPPROTO_UDPLITE);
	kfree_skb(skb);
	return 0;
}

这里具体看一下如何送入 socket 队列。在 __udp4_lib_lookup_skb 函数中会根据源端口和目的端口找到对应的 socket,如果找到了就会调用 udp_queue_rcv_skb 函数将 skb 送入 socket 队列。

// https://github.com/torvalds/linux/blob/8bb495e3f02401ee6f76d1b1d77f3ac9f079e376/net/ipv4/udp.c#L1440-L1547
int udp_queue_rcv_skb(struct sock *sk, struct sk_buff *skb)
{
	struct udp_sock *up = udp_sk(sk);
	int rc;
	int is_udplite = IS_UDPLITE(sk);


	/*
	 *	Charge it to the socket, dropping if the queue is full.
	 */
	if (!xfrm4_policy_check(sk, XFRM_POLICY_IN, skb))
		goto drop;
	nf_reset(skb);


	if (static_key_false(&udp_encap_needed) && up->encap_type) {
		int (*encap_rcv)(struct sock *sk, struct sk_buff *skb);


		/*
		 * This is an encapsulation socket so pass the skb to
		 * the socket's udp_encap_rcv() hook. Otherwise, just
		 * fall through and pass this up the UDP socket.
		 * up->encap_rcv() returns the following value:
		 * =0 if skb was successfully passed to the encap
		 *    handler or was discarded by it.
		 * >0 if skb should be passed on to UDP.
		 * <0 if skb should be resubmitted as proto -N
		 */


		/* if we're overly short, let UDP handle it */
		encap_rcv = ACCESS_ONCE(up->encap_rcv);
		if (skb->len > sizeof(struct udphdr) && encap_rcv != NULL) {
			int ret;


			ret = encap_rcv(sk, skb);
			if (ret <= 0) {
				UDP_INC_STATS_BH(sock_net(sk),
						 UDP_MIB_INDATAGRAMS,
						 is_udplite);
				return -ret;
			}
		}


		/* FALLTHROUGH -- it's a UDP Packet */
	}


	/*
	 * 	UDP-Lite specific tests, ignored on UDP sockets
	 */
	if ((is_udplite & UDPLITE_RECV_CC)  &&  UDP_SKB_CB(skb)->partial_cov) {


		/*
		 * MIB statistics other than incrementing the error count are
		 * disabled for the following two types of errors: these depend
		 * on the application settings, not on the functioning of the
		 * protocol stack as such.
		 *
		 * RFC 3828 here recommends (sec 3.3): "There should also be a
		 * way ... to ... at least let the receiving application block
		 * delivery of packets with coverage values less than a value
		 * provided by the application."
		 */
		if (up->pcrlen == 0) {          /* full coverage was set  */
			LIMIT_NETDEBUG(KERN_WARNING "UDPLite: partial coverage %d while full coverage %d requested\n",
				       UDP_SKB_CB(skb)->cscov, skb->len);
			goto drop;
		}
		/* The next case involves violating the min. coverage requested
		 * by the receiver. This is subtle: if receiver wants x and x is
		 * greater than the buffersize/MTU then receiver will complain
		 * that it wants x while sender emits packets of smaller size y.
		 * Therefore the above ...()->partial_cov statement is essential.
		 */
		if (UDP_SKB_CB(skb)->cscov  <  up->pcrlen) {
			LIMIT_NETDEBUG(KERN_WARNING "UDPLite: coverage %d too small, need min %d\n",
				       UDP_SKB_CB(skb)->cscov, up->pcrlen);
			goto drop;
		}
	}


	if (rcu_access_pointer(sk->sk_filter) &&
	    udp_lib_checksum_complete(skb))
		goto csum_error;




	if (sk_rcvqueues_full(sk, skb, sk->sk_rcvbuf))
		goto drop;


	rc = 0;


	ipv4_pktinfo_prepare(skb);
	bh_lock_sock(sk);
	if (!sock_owned_by_user(sk))
		rc = __udp_queue_rcv_skb(sk, skb);
	else if (sk_add_backlog(sk, skb, sk->sk_rcvbuf)) {
		bh_unlock_sock(sk);
		goto drop;
	}
	bh_unlock_sock(sk);


	return rc;


csum_error:
	UDP_INC_STATS_BH(sock_net(sk), UDP_MIB_CSUMERRORS, is_udplite);
drop:
	UDP_INC_STATS_BH(sock_net(sk), UDP_MIB_INERRORS, is_udplite);
	atomic_inc(&sk->sk_drops);
	kfree_skb(skb);
	return -1;
}

udp_queue_rcv_skb 函数用于将接收到的UDP数据包加入到相应的套接字接收队列中。它会进行一系列检查和处理,包括策略检查、封装处理、UDP-Lite特定检查、校验和验证、队列容量检查等。

顺着调用链 udp_queue_rcv_skb -> __udp_queue_rcv_skb -> sock_queue_rcv_skb, 可以看到最后的 sock_queue_rcv_skb 函数将数据包加入到套接字的接收队列中。最后,所有在这个 socket 上等待数据的进程都收到一个通知通过 sk_data_ready 通知处理函数。

int sock_queue_rcv_skb(struct sock *sk, struct sk_buff *skb)
{
    int err;
    int skb_len;
    unsigned long flags;
    struct sk_buff_head *list = &sk->sk_receive_queue;

    // 检查接收队列是否已满
    if (atomic_read(&sk->sk_rmem_alloc) >= sk->sk_rcvbuf) {
        atomic_inc(&sk->sk_drops); // 增加丢包计数
        trace_sock_rcvqueue_full(sk, skb); // 记录队列已满事件
        return -ENOMEM; // 返回内存不足错误
    }

    // 过滤数据包
    err = sk_filter(sk, skb);
    if (err)
        return err; // 如果过滤失败,返回错误

    // 检查是否有足够的内存来接收数据包
    if (!sk_rmem_schedule(sk, skb, skb->truesize)) {
        atomic_inc(&sk->sk_drops); // 增加丢包计数
        return -ENOBUFS; // 返回缓冲区不足错误
    }

    skb->dev = NULL; // 清除数据包的设备指针
    skb_set_owner_r(skb, sk); // 设置数据包的所有者

    // 缓存数据包长度
    skb_len = skb->len;

    // 确保不泄漏无引用计数的目标
    skb_dst_force(skb);

    // 加锁并将数据包加入接收队列
    spin_lock_irqsave(&list->lock, flags);
    skb->dropcount = atomic_read(&sk->sk_drops); // 设置数据包的丢包计数
    __skb_queue_tail(list, skb); // 将数据包加入队列尾部
    spin_unlock_irqrestore(&list->lock, flags);

    // 如果套接字未关闭,调用数据准备就绪回调函数
    if (!sock_flag(sk, SOCK_DEAD))
        sk->sk_data_ready(sk, skb_len);
    return 0; // 成功返回 0
}

到此,一个数据包就完成了从到达网卡,依次穿过协议栈,到达 socket 的过程。

小结

本文分析了 Linux 网络接收数据的大致流程。当用户执行网 recvfrom 系统调用之后,Linux 就陷入内核态工作了。如果此时接收队列没有数据,进程就会进入睡眠态,被挂起。

Linux 网络在接收数据之前需要做很多的准备工作,包括:

  • 创建 ksoftirqd 线程,设置自己的线程函数,后面需要其处理软中断;
  • 注册协议栈,Linux 要实现很多的协议,如 IP,ARP,ICMP,TCP,UDP 等,每种协议都需要注册自己的协议处理函数;
  • 网卡驱动初始化,每个驱动都有自己的初始化函数,内核调用驱动初始化,在整个过程中会准备好 DMA,把 NAPI 的 poll 函数地址告诉内核。
  • 启动网卡,分配 TX,RX 队列,注册中断对应的处理函数。

当数据到来时,处理步骤总结如下:

  1. 网卡将数据 DMA 到内存的 RingBuffer 中,然后向 CPU 发出中断通知;
  2. CPU 相应中断,调用网卡驱动时注册的中断处理函数;
  3. 中断函数中发出对应的网络软中断;
  4. 内核发现软中断请求到来,先关闭硬中断;
  5. ksoftirqd线程调用驱动的 poll 函数接收数据包;
  6. poll 函数将数据包送到协议栈注册的 ip_rcv 函数中;
  7. ip_rcv 将数据包送到上层协议注册的 udp_rcv 中(如果是 tcp 协议送往 tcp_v4_rcv)。

参考

  1. Linux 网络栈接收数据(RX):原理及内核实现(2022)