tests: Use errno value rather than a specific int
[vpp.git] / test / test_dslite.py
1 #!/usr/bin/env python3
2
3 import socket
4
5 from asfframework import tag_fixme_vpp_workers
6 from framework import VppTestCase
7
8 from scapy.layers.inet import IP, TCP, UDP, ICMP
9 from scapy.layers.inet6 import (
10     IPv6,
11     ICMPv6EchoRequest,
12     ICMPv6EchoReply,
13 )
14 from scapy.layers.l2 import Ether
15 from scapy.data import IP_PROTOS
16 from scapy.packet import Raw
17 from syslog_rfc5424_parser import SyslogMessage, ParseError
18 from syslog_rfc5424_parser.constants import SyslogSeverity
19 from vpp_ip_route import VppIpRoute, VppRoutePath
20
21
22 @tag_fixme_vpp_workers
23 class TestDSlite(VppTestCase):
24     """DS-Lite Test Cases"""
25
26     @classmethod
27     def setUpClass(cls):
28         super(TestDSlite, cls).setUpClass()
29
30         try:
31             cls.nat_addr = "10.0.0.3"
32
33             cls.create_pg_interfaces(range(3))
34             cls.pg0.admin_up()
35             cls.pg0.config_ip4()
36             cls.pg0.resolve_arp()
37             cls.pg1.admin_up()
38             cls.pg1.config_ip6()
39             cls.pg1.generate_remote_hosts(2)
40             cls.pg1.configure_ipv6_neighbors()
41             cls.pg2.admin_up()
42             cls.pg2.config_ip4()
43             cls.pg2.resolve_arp()
44
45         except Exception:
46             super(TestDSlite, cls).tearDownClass()
47             raise
48
49     @classmethod
50     def tearDownClass(cls):
51         super(TestDSlite, cls).tearDownClass()
52
53     def verify_syslog_apmadd(self, data, isaddr, isport, xsaddr, xsport, sv6enc, proto):
54         message = data.decode("utf-8")
55         try:
56             message = SyslogMessage.parse(message)
57         except ParseError as e:
58             self.logger.error(e)
59         else:
60             self.assertEqual(message.severity, SyslogSeverity.info)
61             self.assertEqual(message.appname, "NAT")
62             self.assertEqual(message.msgid, "APMADD")
63             sd_params = message.sd.get("napmap")
64             self.assertTrue(sd_params is not None)
65             self.assertEqual(sd_params.get("IATYP"), "IPv4")
66             self.assertEqual(sd_params.get("ISADDR"), isaddr)
67             self.assertEqual(sd_params.get("ISPORT"), "%d" % isport)
68             self.assertEqual(sd_params.get("XATYP"), "IPv4")
69             self.assertEqual(sd_params.get("XSADDR"), xsaddr)
70             self.assertEqual(sd_params.get("XSPORT"), "%d" % xsport)
71             self.assertEqual(sd_params.get("PROTO"), "%d" % proto)
72             self.assertTrue(sd_params.get("SSUBIX") is not None)
73             self.assertEqual(sd_params.get("SV6ENC"), sv6enc)
74
75     def test_dslite(self):
76         """Test DS-Lite"""
77         self.vapi.dslite_add_del_pool_addr_range(
78             start_addr=self.nat_addr, end_addr=self.nat_addr, is_add=1
79         )
80         aftr_ip4 = "192.0.0.1"
81         aftr_ip6 = "2001:db8:85a3::8a2e:370:1"
82         self.vapi.dslite_set_aftr_addr(ip4_addr=aftr_ip4, ip6_addr=aftr_ip6)
83         self.vapi.syslog_set_sender(self.pg2.local_ip4, self.pg2.remote_ip4)
84
85         # UDP
86         p = (
87             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
88             / IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[0].ip6)
89             / IP(dst=self.pg0.remote_ip4, src="192.168.1.1")
90             / UDP(sport=20000, dport=10000)
91         )
92         self.pg1.add_stream(p)
93         self.pg_enable_capture(self.pg_interfaces)
94         self.pg_start()
95         capture = self.pg0.get_capture(1)
96         capture = capture[0]
97         self.assertFalse(capture.haslayer(IPv6))
98         self.assertEqual(capture[IP].src, self.nat_addr)
99         self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
100         self.assertNotEqual(capture[UDP].sport, 20000)
101         self.assertEqual(capture[UDP].dport, 10000)
102         self.assert_packet_checksums_valid(capture)
103         out_port = capture[UDP].sport
104         capture = self.pg2.get_capture(1)
105         self.verify_syslog_apmadd(
106             capture[0][Raw].load,
107             "192.168.1.1",
108             20000,
109             self.nat_addr,
110             out_port,
111             self.pg1.remote_hosts[0].ip6,
112             IP_PROTOS.udp,
113         )
114
115         p = (
116             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
117             / IP(dst=self.nat_addr, src=self.pg0.remote_ip4)
118             / UDP(sport=10000, dport=out_port)
119         )
120         self.pg0.add_stream(p)
121         self.pg_enable_capture(self.pg_interfaces)
122         self.pg_start()
123         capture = self.pg1.get_capture(1)
124         capture = capture[0]
125         self.assertEqual(capture[IPv6].src, aftr_ip6)
126         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
127         self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
128         self.assertEqual(capture[IP].dst, "192.168.1.1")
129         self.assertEqual(capture[UDP].sport, 10000)
130         self.assertEqual(capture[UDP].dport, 20000)
131         self.assert_packet_checksums_valid(capture)
132
133         # TCP
134         p = (
135             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
136             / IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6)
137             / IP(dst=self.pg0.remote_ip4, src="192.168.1.1")
138             / TCP(sport=20001, dport=10001)
139         )
140         self.pg1.add_stream(p)
141         self.pg_enable_capture(self.pg_interfaces)
142         self.pg_start()
143         capture = self.pg0.get_capture(1)
144         capture = capture[0]
145         self.assertFalse(capture.haslayer(IPv6))
146         self.assertEqual(capture[IP].src, self.nat_addr)
147         self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
148         self.assertNotEqual(capture[TCP].sport, 20001)
149         self.assertEqual(capture[TCP].dport, 10001)
150         self.assert_packet_checksums_valid(capture)
151         out_port = capture[TCP].sport
152
153         p = (
154             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
155             / IP(dst=self.nat_addr, src=self.pg0.remote_ip4)
156             / TCP(sport=10001, dport=out_port)
157         )
158         self.pg0.add_stream(p)
159         self.pg_enable_capture(self.pg_interfaces)
160         self.pg_start()
161         capture = self.pg1.get_capture(1)
162         capture = capture[0]
163         self.assertEqual(capture[IPv6].src, aftr_ip6)
164         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
165         self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
166         self.assertEqual(capture[IP].dst, "192.168.1.1")
167         self.assertEqual(capture[TCP].sport, 10001)
168         self.assertEqual(capture[TCP].dport, 20001)
169         self.assert_packet_checksums_valid(capture)
170
171         # ICMP
172         p = (
173             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
174             / IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6)
175             / IP(dst=self.pg0.remote_ip4, src="192.168.1.1")
176             / ICMP(id=4000, type="echo-request")
177         )
178         self.pg1.add_stream(p)
179         self.pg_enable_capture(self.pg_interfaces)
180         self.pg_start()
181         capture = self.pg0.get_capture(1)
182         capture = capture[0]
183         self.assertFalse(capture.haslayer(IPv6))
184         self.assertEqual(capture[IP].src, self.nat_addr)
185         self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
186         self.assertNotEqual(capture[ICMP].id, 4000)
187         self.assert_packet_checksums_valid(capture)
188         out_id = capture[ICMP].id
189
190         p = (
191             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
192             / IP(dst=self.nat_addr, src=self.pg0.remote_ip4)
193             / ICMP(id=out_id, type="echo-reply")
194         )
195         self.pg0.add_stream(p)
196         self.pg_enable_capture(self.pg_interfaces)
197         self.pg_start()
198         capture = self.pg1.get_capture(1)
199         capture = capture[0]
200         self.assertEqual(capture[IPv6].src, aftr_ip6)
201         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
202         self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
203         self.assertEqual(capture[IP].dst, "192.168.1.1")
204         self.assertEqual(capture[ICMP].id, 4000)
205         self.assert_packet_checksums_valid(capture)
206
207         # ping DS-Lite AFTR tunnel endpoint address
208         p = (
209             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
210             / IPv6(src=self.pg1.remote_hosts[1].ip6, dst=aftr_ip6)
211             / ICMPv6EchoRequest()
212         )
213         self.pg1.add_stream(p)
214         self.pg_enable_capture(self.pg_interfaces)
215         self.pg_start()
216         capture = self.pg1.get_capture(1)
217         capture = capture[0]
218         self.assertEqual(capture[IPv6].src, aftr_ip6)
219         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
220         self.assertTrue(capture.haslayer(ICMPv6EchoReply))
221
222         b4s = self.statistics.get_counter("/dslite/total-b4s")
223         self.assertEqual(b4s[0][0], 2)
224         sessions = self.statistics.get_counter("/dslite/total-sessions")
225         self.assertEqual(sessions[0][0], 3)
226
227     def tearDown(self):
228         super(TestDSlite, self).tearDown()
229
230     def show_commands_at_teardown(self):
231         self.logger.info(self.vapi.cli("show dslite pool"))
232         self.logger.info(self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
233         self.logger.info(self.vapi.cli("show dslite sessions"))
234
235
236 class TestDSliteCE(VppTestCase):
237     """DS-Lite CE Test Cases"""
238
239     @classmethod
240     def setUpConstants(cls):
241         super(TestDSliteCE, cls).setUpConstants()
242         cls.vpp_cmdline.extend(["dslite", "{", "ce", "}"])
243
244     @classmethod
245     def setUpClass(cls):
246         super(TestDSliteCE, cls).setUpClass()
247
248         try:
249             cls.create_pg_interfaces(range(2))
250             cls.pg0.admin_up()
251             cls.pg0.config_ip4()
252             cls.pg0.resolve_arp()
253             cls.pg1.admin_up()
254             cls.pg1.config_ip6()
255             cls.pg1.generate_remote_hosts(1)
256             cls.pg1.configure_ipv6_neighbors()
257
258         except Exception:
259             super(TestDSliteCE, cls).tearDownClass()
260             raise
261
262     @classmethod
263     def tearDownClass(cls):
264         super(TestDSliteCE, cls).tearDownClass()
265
266     def test_dslite_ce(self):
267         """Test DS-Lite CE"""
268         b4_ip4 = "192.0.0.2"
269         b4_ip6 = "2001:db8:62aa::375e:f4c1:1"
270         self.vapi.dslite_set_b4_addr(ip4_addr=b4_ip4, ip6_addr=b4_ip6)
271
272         aftr_ip4 = "192.0.0.1"
273         aftr_ip6 = "2001:db8:85a3::8a2e:370:1"
274         aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
275         self.vapi.dslite_set_aftr_addr(ip4_addr=aftr_ip4, ip6_addr=aftr_ip6)
276
277         r1 = VppIpRoute(
278             self,
279             aftr_ip6,
280             128,
281             [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)],
282         )
283         r1.add_vpp_config()
284
285         # UDP encapsulation
286         p = (
287             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
288             / IP(dst=self.pg1.remote_ip4, src=self.pg0.remote_ip4)
289             / UDP(sport=10000, dport=20000)
290         )
291         self.pg0.add_stream(p)
292         self.pg_enable_capture(self.pg_interfaces)
293         self.pg_start()
294         capture = self.pg1.get_capture(1)
295         capture = capture[0]
296         self.assertEqual(capture[IPv6].src, b4_ip6)
297         self.assertEqual(capture[IPv6].dst, aftr_ip6)
298         self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
299         self.assertEqual(capture[IP].dst, self.pg1.remote_ip4)
300         self.assertEqual(capture[UDP].sport, 10000)
301         self.assertEqual(capture[UDP].dport, 20000)
302         self.assert_packet_checksums_valid(capture)
303
304         # UDP decapsulation
305         p = (
306             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
307             / IPv6(dst=b4_ip6, src=aftr_ip6)
308             / IP(dst=self.pg0.remote_ip4, src=self.pg1.remote_ip4)
309             / UDP(sport=20000, dport=10000)
310         )
311         self.pg1.add_stream(p)
312         self.pg_enable_capture(self.pg_interfaces)
313         self.pg_start()
314         capture = self.pg0.get_capture(1)
315         capture = capture[0]
316         self.assertFalse(capture.haslayer(IPv6))
317         self.assertEqual(capture[IP].src, self.pg1.remote_ip4)
318         self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
319         self.assertEqual(capture[UDP].sport, 20000)
320         self.assertEqual(capture[UDP].dport, 10000)
321         self.assert_packet_checksums_valid(capture)
322
323         # ping DS-Lite B4 tunnel endpoint address
324         p = (
325             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
326             / IPv6(src=self.pg1.remote_hosts[0].ip6, dst=b4_ip6)
327             / ICMPv6EchoRequest()
328         )
329         self.pg1.add_stream(p)
330         self.pg_enable_capture(self.pg_interfaces)
331         self.pg_start()
332         capture = self.pg1.get_capture(1)
333         capture = capture[0]
334         self.assertEqual(capture[IPv6].src, b4_ip6)
335         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
336         self.assertTrue(capture.haslayer(ICMPv6EchoReply))
337
338     def tearDown(self):
339         super(TestDSliteCE, self).tearDown()
340
341     def show_commands_at_teardown(self):
342         self.logger.info(self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
343         self.logger.info(self.vapi.cli("show dslite b4-tunnel-endpoint-address"))