BPF_PROG_TEST_RUNでXDPプログラムの挙動をテストする
eBPFには BPF_PROG_TEST_RUN
と呼ばれる機能がある。これを活用すると、XDPやtc-bpf(8)
向けに実装したパケット処理プログラムの挙動をテストすることができる。
BPF_PROG_TEST_RUN?
BPF_PROG_TEST_RUN
はbpf
syscall経由で使用できる1機能。テストしたいeBPFプログラムとパケットのバイト列を入力として与えると、実行結果の返り値と処理後のパケットバイト列、および処理にかかった時間が出力として得られる。このとき、テスト対象のプログラムは実際にネットワークインタフェースにアタッチされるわけではないので、実行環境に影響を与えることなくテストを遂行できる。
libbpfに bpf_prog_test_run()
というラッパ関数が用意されているので、実用上はこれを経由して使用するのが便利。
libbpf/bpf.c at d5b146fec50d7aa126fe98323aeaee688d4af289 · libbpf/libbpf · GitHub
int bpf_prog_test_run(int prog_fd, int repeat, void *data, __u32 size, void *data_out, __u32 *size_out, __u32 *retval, __u32 *duration)
入力のバイト列 (XDPで言うところのstruct xdp_md *
のdata
) を *data
で与えると、返り値と処理後のバイト列がそれぞれ *retval
および*data_out
経由で得られる。
使用例
ここではPythonからBPF_PROG_TEST_RUN
を実行してみる。
BCCのこのPRがマージされると、Pythonからlibbpfの関数を呼び出せるようになる。今回はこのブランチをビルドしてテストに使用した。
$ uname -a Linux xdp 4.19.15-300.fc29.x86_64 #1 SMP Mon Jan 14 16:32:35 UTC 2019 x86_64 x86_64 x86_64 GNU/Linux $ cat /etc/fedora-release Fedora release 29 (Twenty Nine) $ python3 -V Python 3.7.2
テスト対象として、IPv4の80/tcp宛パケットをドロップするXDPプログラムを仮定する。
#include <uapi/linux/if_ether.h> #include <uapi/linux/in.h> #include <uapi/linux/ip.h> #include <uapi/linux/tcp.h> int drop_ipv4_tcp_80(struct xdp_md *ctx) { void *data = (void *)(long)ctx->data; void *data_end = (void *)(long)ctx->data_end; struct ethhdr *eth; struct iphdr *iph; struct tcphdr *th; eth = data; if ((void *)(eth + 1) > data_end) return XDP_DROP; if (eth->h_proto != htons(ETH_P_IP)) return XDP_PASS; iph = (struct iphdr *)(eth + 1); if ((void *)(iph + 1) > data_end) return XDP_DROP; if (iph->ihl != 5 || iph->frag_off & htons(0x2000 | 0x1FFF)) return XDP_PASS; if (iph->protocol != IPPROTO_TCP) { return XDP_PASS; } th = (struct tcphdr *)(iph + 1); if ((void *)(th + 1) > data_end) return XDP_DROP; if (th->dest == htons(80)) return XDP_DROP; return XDP_PASS; }
次に、このプログラムを bpf_prog_test_run
でテストするコードをPythonで用意する。今回はテストケースを unittest
で記述した。
import unittest from bcc import BPF, libbcc import ctypes from scapy.all import * class PacketDropTestCase(unittest.TestCase): bpf = None func = None DATA_OUT_LEN = 1514 def _run_test(self, data, data_out_expect, retval_expect, repeat=1): size = len(data) data = ctypes.create_string_buffer(raw(data), size) data_out = ctypes.create_string_buffer(self.DATA_OUT_LEN) size_out = ctypes.c_uint32() retval = ctypes.c_uint32() duration = ctypes.c_uint32() ret = libbcc.lib.bpf_prog_test_run(self.func.fd, repeat, ctypes.byref(data), size, ctypes.byref(data_out), ctypes.byref(size_out), ctypes.byref(retval), ctypes.byref(duration)) self.assertEqual(ret, 0) self.assertEqual(retval.value, retval_expect) if data_out_expect: self.assertEqual(data_out[:size_out.value], raw(data_out_expect)) def setUp(self): self.bpf = BPF(src_file=b"drop_ipv4_tcp_80.c") self.func = self.bpf.load_func(b"drop_ipv4_tcp_80", BPF.XDP) def test_ipv4_tcp_80(self): packet_in = Ether() / IP() / TCP(dport=80) self._run_test(packet_in, None, BPF.XDP_DROP) def test_ipv4_udp_80(self): packet_in = Ether() / IP() / UDP(dport=80) self._run_test(packet_in, packet_in, BPF.XDP_PASS) def test_ipv4_tcp_443(self): packet_in = Ether() / IP() / TCP(dport=443) self._run_test(packet_in, packet_in, BPF.XDP_PASS) def test_ipv6_tcp_80(self): packet_in = Ether() / IPv6() / TCP(dport=80) self._run_test(packet_in, packet_in, BPF.XDP_PASS) if __name__ == '__main__': unittest.main()
これを実行すると、テスト対象のプログラムがどうやら正しく実装されていそうなことが分かる。
$ sudo python3 test_xdp_prog.py .... ---------------------------------------------------------------------- Ran 4 tests in 1.538s OK
続いてテスト対象のプログラムに意図的にバグを仕込んだ上でテストを再実行する。
$ diff -u drop_ipv4_tcp_80.c{.orig,} --- drop_ipv4_tcp_80.c.orig 2019-01-19 08:32:41.388966124 +0000 +++ drop_ipv4_tcp_80.c 2019-01-19 08:33:08.758135901 +0000 @@ -26,6 +26,7 @@ return XDP_PASS; if (iph->protocol != IPPROTO_TCP) { + iph->ttl -= 1; return XDP_PASS; } @@ -33,7 +34,7 @@ if ((void *)(th + 1) > data_end) return XDP_DROP; - if (th->dest == htons(80)) + if (th->dest == 80) return XDP_DROP; return XDP_PASS;
$ sudo python3 test_xdp_prog.py .FF. ====================================================================== FAIL: test_ipv4_tcp_80 (__main__.PacketDropTestCase) ---------------------------------------------------------------------- Traceback (most recent call last): File "test_xdp_prog.py", line 40, in test_ipv4_tcp_80 self._run_test(packet_in, None, BPF.XDP_DROP) File "test_xdp_prog.py", line 30, in _run_test self.assertEqual(retval.value, retval_expect) AssertionError: 2 != 1 ====================================================================== FAIL: test_ipv4_udp_80 (__main__.PacketDropTestCase) ---------------------------------------------------------------------- Traceback (most recent call last): File "test_xdp_prog.py", line 44, in test_ipv4_udp_80 self._run_test(packet_in, packet_in, BPF.XDP_PASS) File "test_xdp_prog.py", line 32, in _run_test self.assertEqual(data_out[:size_out.value], raw(data_out_expect)) AssertionError: b'\xf[77 chars]0\x00?\x11|\xce\x7f\x00\x00\x01\x7f\x00\x00\x0[20 chars]x01W' != b'\xf[77 chars]0\x00@\x11|\xce\x7f\x00\x00\x01\x7f\x00\x00\x0[20 chars]x01W' ---------------------------------------------------------------------- Ran 4 tests in 1.510s FAILED (failures=2)
先程までパスしていたテストケースが通らなくなっており、無事問題を発見することができた。