BPF_PROG_TEST_RUNでXDPプログラムの挙動をテストする

eBPFには BPF_PROG_TEST_RUN と呼ばれる機能がある。これを活用すると、XDPやtc-bpf(8)向けに実装したパケット処理プログラムの挙動をテストすることができる。

BPF_PROG_TEST_RUN?

BPF_PROG_TEST_RUNbpf syscall経由で使用できる1機能。テストしたいeBPFプログラムとパケットのバイト列を入力として与えると、実行結果の返り値と処理後のパケットバイト列、および処理にかかった時間が出力として得られる。このとき、テスト対象のプログラムは実際にネットワークインタフェースにアタッチされるわけではないので、実行環境に影響を与えることなくテストを遂行できる。

lwn.net

libbpfbpf_prog_test_run() というラッパ関数が用意されているので、実用上はこれを経由して使用するのが便利。

libbpf/bpf.c at d5b146fec50d7aa126fe98323aeaee688d4af289 · libbpf/libbpf · GitHub

int bpf_prog_test_run(int prog_fd, int repeat, void *data, __u32 size,
              void *data_out, __u32 *size_out, __u32 *retval,
              __u32 *duration)

入力のバイト列 (XDPで言うところのstruct xdp_md *data) を *data で与えると、返り値と処理後のバイト列がそれぞれ *retvalおよび*data_out経由で得られる。

使用例

ここではPythonからBPF_PROG_TEST_RUNを実行してみる。 BCCこのPRがマージされると、Pythonからlibbpfの関数を呼び出せるようになる。今回はこのブランチをビルドしてテストに使用した。

$ uname -a
Linux xdp 4.19.15-300.fc29.x86_64 #1 SMP Mon Jan 14 16:32:35 UTC 2019 x86_64 x86_64 x86_64 GNU/Linux
$ cat /etc/fedora-release 
Fedora release 29 (Twenty Nine)
$ python3 -V
Python 3.7.2

テスト対象として、IPv4の80/tcp宛パケットをドロップするXDPプログラムを仮定する。

#include <uapi/linux/if_ether.h>
#include <uapi/linux/in.h>
#include <uapi/linux/ip.h>
#include <uapi/linux/tcp.h>

int drop_ipv4_tcp_80(struct xdp_md *ctx) {
  void *data = (void *)(long)ctx->data;
  void *data_end = (void *)(long)ctx->data_end;

  struct ethhdr *eth;
  struct iphdr *iph;
  struct tcphdr *th;

  eth = data;
  if ((void *)(eth + 1) > data_end)
    return XDP_DROP;

  if (eth->h_proto != htons(ETH_P_IP))
    return XDP_PASS;

  iph = (struct iphdr *)(eth + 1);
  if ((void *)(iph + 1) > data_end)
    return XDP_DROP;

  if (iph->ihl != 5 || iph->frag_off & htons(0x2000 | 0x1FFF))
    return XDP_PASS;

  if (iph->protocol != IPPROTO_TCP) {
    return XDP_PASS;
  }

  th = (struct tcphdr *)(iph + 1);
  if ((void *)(th + 1) > data_end)
    return XDP_DROP;

  if (th->dest == htons(80))
    return XDP_DROP;

  return XDP_PASS;
}

次に、このプログラムを bpf_prog_test_run でテストするコードをPythonで用意する。今回はテストケースを unittest で記述した。

import unittest

from bcc import BPF, libbcc
import ctypes
from scapy.all import *


class PacketDropTestCase(unittest.TestCase):
    bpf = None
    func = None

    DATA_OUT_LEN = 1514

    def _run_test(self, data, data_out_expect, retval_expect, repeat=1):
        size = len(data)
        data = ctypes.create_string_buffer(raw(data), size)
        data_out = ctypes.create_string_buffer(self.DATA_OUT_LEN)
        size_out = ctypes.c_uint32()
        retval = ctypes.c_uint32()
        duration = ctypes.c_uint32()

        ret = libbcc.lib.bpf_prog_test_run(self.func.fd, repeat,
                                           ctypes.byref(data), size,
                                           ctypes.byref(data_out),
                                           ctypes.byref(size_out),
                                           ctypes.byref(retval),
                                           ctypes.byref(duration))
        self.assertEqual(ret, 0)

        self.assertEqual(retval.value, retval_expect)
        if data_out_expect:
            self.assertEqual(data_out[:size_out.value], raw(data_out_expect))

    def setUp(self):
        self.bpf = BPF(src_file=b"drop_ipv4_tcp_80.c")
        self.func = self.bpf.load_func(b"drop_ipv4_tcp_80", BPF.XDP)

    def test_ipv4_tcp_80(self):
        packet_in = Ether() / IP() / TCP(dport=80)
        self._run_test(packet_in, None, BPF.XDP_DROP)

    def test_ipv4_udp_80(self):
        packet_in = Ether() / IP() / UDP(dport=80)
        self._run_test(packet_in, packet_in, BPF.XDP_PASS)

    def test_ipv4_tcp_443(self):
        packet_in = Ether() / IP() / TCP(dport=443)
        self._run_test(packet_in, packet_in, BPF.XDP_PASS)

    def test_ipv6_tcp_80(self):
        packet_in = Ether() / IPv6() / TCP(dport=80)
        self._run_test(packet_in, packet_in, BPF.XDP_PASS)


if __name__ == '__main__':
    unittest.main()

これを実行すると、テスト対象のプログラムがどうやら正しく実装されていそうなことが分かる。

$ sudo python3 test_xdp_prog.py 
....
----------------------------------------------------------------------
Ran 4 tests in 1.538s

OK

続いてテスト対象のプログラムに意図的にバグを仕込んだ上でテストを再実行する。

$ diff -u drop_ipv4_tcp_80.c{.orig,}
--- drop_ipv4_tcp_80.c.orig     2019-01-19 08:32:41.388966124 +0000
+++ drop_ipv4_tcp_80.c  2019-01-19 08:33:08.758135901 +0000
@@ -26,6 +26,7 @@
     return XDP_PASS;
 
   if (iph->protocol != IPPROTO_TCP) {
+    iph->ttl -= 1;
     return XDP_PASS;
   }
 
@@ -33,7 +34,7 @@
   if ((void *)(th + 1) > data_end)
     return XDP_DROP;
 
-  if (th->dest == htons(80))
+  if (th->dest == 80)
     return XDP_DROP;
 
   return XDP_PASS;
$ sudo python3 test_xdp_prog.py 
.FF.
======================================================================
FAIL: test_ipv4_tcp_80 (__main__.PacketDropTestCase)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "test_xdp_prog.py", line 40, in test_ipv4_tcp_80
    self._run_test(packet_in, None, BPF.XDP_DROP)
  File "test_xdp_prog.py", line 30, in _run_test
    self.assertEqual(retval.value, retval_expect)
AssertionError: 2 != 1

======================================================================
FAIL: test_ipv4_udp_80 (__main__.PacketDropTestCase)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "test_xdp_prog.py", line 44, in test_ipv4_udp_80
    self._run_test(packet_in, packet_in, BPF.XDP_PASS)
  File "test_xdp_prog.py", line 32, in _run_test
    self.assertEqual(data_out[:size_out.value], raw(data_out_expect))
AssertionError: b'\xf[77 chars]0\x00?\x11|\xce\x7f\x00\x00\x01\x7f\x00\x00\x0[20 chars]x01W' != b'\xf[77 chars]0\x00@\x11|\xce\x7f\x00\x00\x01\x7f\x00\x00\x0[20 chars]x01W'

----------------------------------------------------------------------
Ran 4 tests in 1.510s

FAILED (failures=2)

先程までパスしていたテストケースが通らなくなっており、無事問題を発見することができた。