tests: Add support for getting corefile patterns on FreeBSD
[vpp.git] / test / test_npt66.py
1 #!/usr/bin/env python3
2
3 import unittest
4 import ipaddress
5 from framework import VppTestCase
6 from asfframework import VppTestRunner
7
8 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6DestUnreach
9 from scapy.layers.l2 import Ether
10 from scapy.packet import Raw
11
12
13 class TestNPT66(VppTestCase):
14     """NPTv6 Test Case"""
15
16     extra_vpp_plugin_config = [
17         "plugin npt66_plugin.so {enable}",
18     ]
19
20     def setUp(self):
21         super(TestNPT66, self).setUp()
22
23         # create 2 pg interfaces
24         self.create_pg_interfaces(range(2))
25
26         for i in self.pg_interfaces:
27             i.admin_up()
28             i.config_ip6()
29             i.resolve_ndp()
30
31     def tearDown(self):
32         for i in self.pg_interfaces:
33             i.unconfig_ip6()
34             i.admin_down()
35         super(TestNPT66, self).tearDown()
36
37     def send_and_verify(self, internal, reply_icmp_error=False):
38         sendif = self.pg0
39         recvif = self.pg1
40         local_mac = self.pg0.local_mac
41         remote_mac = self.pg0.remote_mac
42         src = ipaddress.ip_interface(internal).ip + 1
43         dst = self.pg1.remote_ip6
44
45         p = (
46             Ether(dst=local_mac, src=remote_mac)
47             / IPv6(src=src, dst=dst)
48             / ICMPv6EchoRequest()
49             / Raw(b"Request")
50         )
51         # print('Sending packet')
52         # p.show2()
53         rxs = self.send_and_expect(sendif, p, recvif)
54         for rx in rxs:
55             # print('Received packet')
56             # rx.show2()
57             original_cksum = rx[ICMPv6EchoRequest].cksum
58             del rx[ICMPv6EchoRequest].cksum
59             rx = rx.__class__(bytes(rx))
60             self.assertEqual(original_cksum, rx[ICMPv6EchoRequest].cksum)
61
62             # Generate a replies
63             if reply_icmp_error:
64                 # print('Generating an ICMP error message')
65                 reply = (
66                     Ether(dst=rx[Ether].src, src=local_mac)
67                     / IPv6(src=rx[IPv6].dst, dst=rx[IPv6].src)
68                     / ICMPv6DestUnreach()
69                     / rx[IPv6]
70                 )
71                 # print('Sending ICMP error message reply')
72                 # reply.show2()
73                 replies = self.send_and_expect(recvif, reply, sendif)
74                 for r in replies:
75                     # print('Received ICMP error message reply on the other side')
76                     # r.show2()
77                     self.assertEqual(str(p[IPv6].src), r[IPv6].dst)
78                     original_cksum = r[ICMPv6EchoRequest].cksum
79                     del r[ICMPv6EchoRequest].cksum
80                     r = r.__class__(bytes(r))
81                     self.assertEqual(original_cksum, r[ICMPv6EchoRequest].cksum)
82
83             else:
84                 reply = (
85                     Ether(dst=rx[Ether].src, src=local_mac)
86                     / IPv6(src=rx[IPv6].dst, dst=rx[IPv6].src)
87                     / ICMPv6EchoRequest()
88                     / Raw(b"Reply")
89                 )
90
91                 replies = self.send_and_expect(recvif, reply, sendif)
92                 for r in replies:
93                     # r.show2()
94                     self.assertEqual(str(p[IPv6].src), r[IPv6].dst)
95                     original_cksum = r[ICMPv6EchoRequest].cksum
96                     del r[ICMPv6EchoRequest].cksum
97                     r = r.__class__(bytes(r))
98                     self.assertEqual(original_cksum, r[ICMPv6EchoRequest].cksum)
99
100     def do_test(self, internal, external, reply_icmp_error=False):
101         """Add NPT66 binding and send packet"""
102         self.vapi.npt66_binding_add_del(
103             sw_if_index=self.pg1.sw_if_index,
104             internal=internal,
105             external=external,
106             is_add=True,
107         )
108         ## TODO use route api
109         self.vapi.cli(f"ip route add {internal} via {self.pg0.remote_ip6}")
110
111         self.send_and_verify(internal, reply_icmp_error=reply_icmp_error)
112
113         self.vapi.npt66_binding_add_del(
114             sw_if_index=self.pg1.sw_if_index,
115             internal=internal,
116             external=external,
117             is_add=False,
118         )
119
120     def test_npt66_simple(self):
121         """Send and receive a packet through NPT66"""
122
123         self.do_test("fd00:0000:0000::/48", "2001:4650:c3ed::/48")
124         self.do_test("fc00:1::/48", "2001:db8:1::/48")
125         self.do_test("fc00:1234::/32", "2001:db8:1::/32")
126         self.do_test("fc00:1234::/63", "2001:db8:1::/56")
127
128     def test_npt66_icmp6(self):
129         """Send and receive a packet through NPT66"""
130
131         # Test ICMP6 error packets
132         self.do_test(
133             "fd00:0000:0000::/48", "2001:4650:c3ed::/48", reply_icmp_error=True
134         )
135         self.do_test("fc00:1::/48", "2001:db8:1::/48", reply_icmp_error=True)
136         self.do_test("fc00:1234::/32", "2001:db8:1::/32", reply_icmp_error=True)
137         self.do_test("fc00:1234::/63", "2001:db8:1::/56", reply_icmp_error=True)
138
139
140 if __name__ == "__main__":
141     unittest.main(testRunner=VppTestRunner)