npt66: checksum applied to src address instead of dst address on rx
[vpp.git] / test / test_npt66.py
1 #!/usr/bin/env python3
2
3 import unittest
4 import ipaddress
5 from framework import VppTestCase, VppTestRunner
6
7 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
8 from scapy.layers.l2 import Ether
9 from scapy.packet import Raw
10
11
12 class TestNPT66(VppTestCase):
13     """NPTv6 Test Case"""
14
15     def setUp(self):
16         super(TestNPT66, self).setUp()
17
18         # create 2 pg interfaces
19         self.create_pg_interfaces(range(2))
20
21         for i in self.pg_interfaces:
22             i.admin_up()
23             i.config_ip6()
24             i.resolve_ndp()
25
26     def tearDown(self):
27         for i in self.pg_interfaces:
28             i.unconfig_ip6()
29             i.admin_down()
30         super(TestNPT66, self).tearDown()
31
32     def send_and_verify(self, internal):
33         sendif = self.pg0
34         recvif = self.pg1
35         local_mac = self.pg0.local_mac
36         remote_mac = self.pg0.remote_mac
37         src = ipaddress.ip_interface(internal).ip + 1
38         dst = self.pg1.remote_ip6
39
40         p = (
41             Ether(dst=local_mac, src=remote_mac)
42             / IPv6(src=src, dst=dst)
43             / ICMPv6EchoRequest()
44             / Raw(b"Request")
45         )
46         rxs = self.send_and_expect(sendif, p, recvif)
47         for rx in rxs:
48             original_cksum = rx[ICMPv6EchoRequest].cksum
49             del rx[ICMPv6EchoRequest].cksum
50             rx = rx.__class__(bytes(rx))
51             self.assertEqual(original_cksum, rx[ICMPv6EchoRequest].cksum)
52
53             # Generate a replies
54             reply = (
55                 Ether(dst=rx[Ether].src, src=local_mac)
56                 / IPv6(src=rx[IPv6].dst, dst=rx[IPv6].src)
57                 / ICMPv6EchoRequest()
58                 / Raw(b"Reply")
59             )
60
61             replies = self.send_and_expect(recvif, reply, sendif)
62             for r in replies:
63                 self.assertEqual(str(p[IPv6].src), r[IPv6].dst)
64                 original_cksum = r[ICMPv6EchoRequest].cksum
65                 del r[ICMPv6EchoRequest].cksum
66                 r = r.__class__(bytes(r))
67                 self.assertEqual(original_cksum, r[ICMPv6EchoRequest].cksum)
68
69     def do_test(self, internal, external):
70         self.vapi.npt66_binding_add_del(
71             sw_if_index=self.pg1.sw_if_index,
72             internal=internal,
73             external=external,
74             is_add=True,
75         )
76         ## TODO use route api
77         self.vapi.cli(f"ip route add {internal} via {self.pg0.remote_ip6}")
78
79         self.send_and_verify(internal)
80
81         self.vapi.npt66_binding_add_del(
82             sw_if_index=self.pg1.sw_if_index,
83             internal=internal,
84             external=external,
85             is_add=False,
86         )
87
88     def test_npt66_simple(self):
89         """Send and receive a packet through NPT66"""
90
91         self.do_test("fd00:0000:0000::/48", "2001:4650:c3ed::/48")
92         self.do_test("fc00:1::/48", "2001:db8:1::/48")
93         self.do_test("fc00:1234::/32", "2001:db8:1::/32")
94         self.do_test("fc00:1234::/63", "2001:db8:1::/56")
95
96
97 if __name__ == "__main__":
98     unittest.main(testRunner=VppTestRunner)