npt66: icmp6 alg to handle icmp6 error messages
[vpp.git] / test / test_npt66.py
1 #!/usr/bin/env python3
2
3 import unittest
4 import ipaddress
5 from framework import VppTestCase, VppTestRunner
6
7 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6DestUnreach
8 from scapy.layers.l2 import Ether
9 from scapy.packet import Raw
10
11
12 class TestNPT66(VppTestCase):
13     """NPTv6 Test Case"""
14
15     extra_vpp_plugin_config = [
16         "plugin npt66_plugin.so {enable}",
17     ]
18
19     def setUp(self):
20         super(TestNPT66, self).setUp()
21
22         # create 2 pg interfaces
23         self.create_pg_interfaces(range(2))
24
25         for i in self.pg_interfaces:
26             i.admin_up()
27             i.config_ip6()
28             i.resolve_ndp()
29
30     def tearDown(self):
31         for i in self.pg_interfaces:
32             i.unconfig_ip6()
33             i.admin_down()
34         super(TestNPT66, self).tearDown()
35
36     def send_and_verify(self, internal, reply_icmp_error=False):
37         sendif = self.pg0
38         recvif = self.pg1
39         local_mac = self.pg0.local_mac
40         remote_mac = self.pg0.remote_mac
41         src = ipaddress.ip_interface(internal).ip + 1
42         dst = self.pg1.remote_ip6
43
44         p = (
45             Ether(dst=local_mac, src=remote_mac)
46             / IPv6(src=src, dst=dst)
47             / ICMPv6EchoRequest()
48             / Raw(b"Request")
49         )
50         # print('Sending packet')
51         # p.show2()
52         rxs = self.send_and_expect(sendif, p, recvif)
53         for rx in rxs:
54             # print('Received packet')
55             # rx.show2()
56             original_cksum = rx[ICMPv6EchoRequest].cksum
57             del rx[ICMPv6EchoRequest].cksum
58             rx = rx.__class__(bytes(rx))
59             self.assertEqual(original_cksum, rx[ICMPv6EchoRequest].cksum)
60
61             # Generate a replies
62             if reply_icmp_error:
63                 # print('Generating an ICMP error message')
64                 reply = (
65                     Ether(dst=rx[Ether].src, src=local_mac)
66                     / IPv6(src=rx[IPv6].dst, dst=rx[IPv6].src)
67                     / ICMPv6DestUnreach()
68                     / rx[IPv6]
69                 )
70                 # print('Sending ICMP error message reply')
71                 # reply.show2()
72                 replies = self.send_and_expect(recvif, reply, sendif)
73                 for r in replies:
74                     # print('Received ICMP error message reply on the other side')
75                     # r.show2()
76                     self.assertEqual(str(p[IPv6].src), r[IPv6].dst)
77                     original_cksum = r[ICMPv6EchoRequest].cksum
78                     del r[ICMPv6EchoRequest].cksum
79                     r = r.__class__(bytes(r))
80                     self.assertEqual(original_cksum, r[ICMPv6EchoRequest].cksum)
81
82             else:
83                 reply = (
84                     Ether(dst=rx[Ether].src, src=local_mac)
85                     / IPv6(src=rx[IPv6].dst, dst=rx[IPv6].src)
86                     / ICMPv6EchoRequest()
87                     / Raw(b"Reply")
88                 )
89
90                 replies = self.send_and_expect(recvif, reply, sendif)
91                 for r in replies:
92                     r.show2()
93                     self.assertEqual(str(p[IPv6].src), r[IPv6].dst)
94                     original_cksum = r[ICMPv6EchoRequest].cksum
95                     del r[ICMPv6EchoRequest].cksum
96                     r = r.__class__(bytes(r))
97                     self.assertEqual(original_cksum, r[ICMPv6EchoRequest].cksum)
98
99     def do_test(self, internal, external, reply_icmp_error=False):
100         """Add NPT66 binding and send packet"""
101         self.vapi.npt66_binding_add_del(
102             sw_if_index=self.pg1.sw_if_index,
103             internal=internal,
104             external=external,
105             is_add=True,
106         )
107         ## TODO use route api
108         self.vapi.cli(f"ip route add {internal} via {self.pg0.remote_ip6}")
109
110         self.send_and_verify(internal, reply_icmp_error=reply_icmp_error)
111
112         self.vapi.npt66_binding_add_del(
113             sw_if_index=self.pg1.sw_if_index,
114             internal=internal,
115             external=external,
116             is_add=False,
117         )
118
119     def test_npt66_simple(self):
120         """Send and receive a packet through NPT66"""
121
122         self.do_test("fd00:0000:0000::/48", "2001:4650:c3ed::/48")
123         self.do_test("fc00:1::/48", "2001:db8:1::/48")
124         self.do_test("fc00:1234::/32", "2001:db8:1::/32")
125         self.do_test("fc00:1234::/63", "2001:db8:1::/56")
126
127     def test_npt66_icmp6(self):
128         """Send and receive a packet through NPT66"""
129
130         # Test ICMP6 error packets
131         self.do_test(
132             "fd00:0000:0000::/48", "2001:4650:c3ed::/48", reply_icmp_error=True
133         )
134         self.do_test("fc00:1::/48", "2001:db8:1::/48", reply_icmp_error=True)
135         self.do_test("fc00:1234::/32", "2001:db8:1::/32", reply_icmp_error=True)
136         self.do_test("fc00:1234::/63", "2001:db8:1::/56", reply_icmp_error=True)
137
138
139 if __name__ == "__main__":
140     unittest.main(testRunner=VppTestRunner)