tests: replace pycodestyle with black
[vpp.git] / test / test_dslite.py
1 #!/usr/bin/env python3
2
3 import socket
4 import unittest
5 import struct
6 import random
7
8 from framework import tag_fixme_vpp_workers
9 from framework import VppTestCase, VppTestRunner
10
11 import scapy.compat
12 from scapy.layers.inet import IP, TCP, UDP, ICMP
13 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
14 from scapy.layers.inet6 import (
15     IPv6,
16     ICMPv6EchoRequest,
17     ICMPv6EchoReply,
18     ICMPv6ND_NS,
19     ICMPv6ND_NA,
20     ICMPv6NDOptDstLLAddr,
21     fragment6,
22 )
23 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
24 from scapy.layers.l2 import Ether, ARP, GRE
25 from scapy.data import IP_PROTOS
26 from scapy.packet import bind_layers, Raw
27 from util import ppp
28 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
29 from time import sleep
30 from util import ip4_range
31 from vpp_papi import mac_pton
32 from syslog_rfc5424_parser import SyslogMessage, ParseError
33 from syslog_rfc5424_parser.constants import SyslogFacility, SyslogSeverity
34 from io import BytesIO
35 from vpp_papi import VppEnum
36 from vpp_ip_route import VppIpRoute, VppRoutePath, FibPathType
37 from vpp_neighbor import VppNeighbor
38 from scapy.all import (
39     bind_layers,
40     Packet,
41     ByteEnumField,
42     ShortField,
43     IPField,
44     IntField,
45     LongField,
46     XByteField,
47     FlagsField,
48     FieldLenField,
49     PacketListField,
50 )
51 from ipaddress import IPv6Network
52
53
54 @tag_fixme_vpp_workers
55 class TestDSlite(VppTestCase):
56     """DS-Lite Test Cases"""
57
58     @classmethod
59     def setUpClass(cls):
60         super(TestDSlite, cls).setUpClass()
61
62         try:
63             cls.nat_addr = "10.0.0.3"
64
65             cls.create_pg_interfaces(range(3))
66             cls.pg0.admin_up()
67             cls.pg0.config_ip4()
68             cls.pg0.resolve_arp()
69             cls.pg1.admin_up()
70             cls.pg1.config_ip6()
71             cls.pg1.generate_remote_hosts(2)
72             cls.pg1.configure_ipv6_neighbors()
73             cls.pg2.admin_up()
74             cls.pg2.config_ip4()
75             cls.pg2.resolve_arp()
76
77         except Exception:
78             super(TestDSlite, cls).tearDownClass()
79             raise
80
81     @classmethod
82     def tearDownClass(cls):
83         super(TestDSlite, cls).tearDownClass()
84
85     def verify_syslog_apmadd(self, data, isaddr, isport, xsaddr, xsport, sv6enc, proto):
86         message = data.decode("utf-8")
87         try:
88             message = SyslogMessage.parse(message)
89         except ParseError as e:
90             self.logger.error(e)
91         else:
92             self.assertEqual(message.severity, SyslogSeverity.info)
93             self.assertEqual(message.appname, "NAT")
94             self.assertEqual(message.msgid, "APMADD")
95             sd_params = message.sd.get("napmap")
96             self.assertTrue(sd_params is not None)
97             self.assertEqual(sd_params.get("IATYP"), "IPv4")
98             self.assertEqual(sd_params.get("ISADDR"), isaddr)
99             self.assertEqual(sd_params.get("ISPORT"), "%d" % isport)
100             self.assertEqual(sd_params.get("XATYP"), "IPv4")
101             self.assertEqual(sd_params.get("XSADDR"), xsaddr)
102             self.assertEqual(sd_params.get("XSPORT"), "%d" % xsport)
103             self.assertEqual(sd_params.get("PROTO"), "%d" % proto)
104             self.assertTrue(sd_params.get("SSUBIX") is not None)
105             self.assertEqual(sd_params.get("SV6ENC"), sv6enc)
106
107     def test_dslite(self):
108         """Test DS-Lite"""
109         self.vapi.dslite_add_del_pool_addr_range(
110             start_addr=self.nat_addr, end_addr=self.nat_addr, is_add=1
111         )
112         aftr_ip4 = "192.0.0.1"
113         aftr_ip6 = "2001:db8:85a3::8a2e:370:1"
114         self.vapi.dslite_set_aftr_addr(ip4_addr=aftr_ip4, ip6_addr=aftr_ip6)
115         self.vapi.syslog_set_sender(self.pg2.local_ip4, self.pg2.remote_ip4)
116
117         # UDP
118         p = (
119             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
120             / IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[0].ip6)
121             / IP(dst=self.pg0.remote_ip4, src="192.168.1.1")
122             / UDP(sport=20000, dport=10000)
123         )
124         self.pg1.add_stream(p)
125         self.pg_enable_capture(self.pg_interfaces)
126         self.pg_start()
127         capture = self.pg0.get_capture(1)
128         capture = capture[0]
129         self.assertFalse(capture.haslayer(IPv6))
130         self.assertEqual(capture[IP].src, self.nat_addr)
131         self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
132         self.assertNotEqual(capture[UDP].sport, 20000)
133         self.assertEqual(capture[UDP].dport, 10000)
134         self.assert_packet_checksums_valid(capture)
135         out_port = capture[UDP].sport
136         capture = self.pg2.get_capture(1)
137         self.verify_syslog_apmadd(
138             capture[0][Raw].load,
139             "192.168.1.1",
140             20000,
141             self.nat_addr,
142             out_port,
143             self.pg1.remote_hosts[0].ip6,
144             IP_PROTOS.udp,
145         )
146
147         p = (
148             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
149             / IP(dst=self.nat_addr, src=self.pg0.remote_ip4)
150             / UDP(sport=10000, dport=out_port)
151         )
152         self.pg0.add_stream(p)
153         self.pg_enable_capture(self.pg_interfaces)
154         self.pg_start()
155         capture = self.pg1.get_capture(1)
156         capture = capture[0]
157         self.assertEqual(capture[IPv6].src, aftr_ip6)
158         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
159         self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
160         self.assertEqual(capture[IP].dst, "192.168.1.1")
161         self.assertEqual(capture[UDP].sport, 10000)
162         self.assertEqual(capture[UDP].dport, 20000)
163         self.assert_packet_checksums_valid(capture)
164
165         # TCP
166         p = (
167             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
168             / IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6)
169             / IP(dst=self.pg0.remote_ip4, src="192.168.1.1")
170             / TCP(sport=20001, dport=10001)
171         )
172         self.pg1.add_stream(p)
173         self.pg_enable_capture(self.pg_interfaces)
174         self.pg_start()
175         capture = self.pg0.get_capture(1)
176         capture = capture[0]
177         self.assertFalse(capture.haslayer(IPv6))
178         self.assertEqual(capture[IP].src, self.nat_addr)
179         self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
180         self.assertNotEqual(capture[TCP].sport, 20001)
181         self.assertEqual(capture[TCP].dport, 10001)
182         self.assert_packet_checksums_valid(capture)
183         out_port = capture[TCP].sport
184
185         p = (
186             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
187             / IP(dst=self.nat_addr, src=self.pg0.remote_ip4)
188             / TCP(sport=10001, dport=out_port)
189         )
190         self.pg0.add_stream(p)
191         self.pg_enable_capture(self.pg_interfaces)
192         self.pg_start()
193         capture = self.pg1.get_capture(1)
194         capture = capture[0]
195         self.assertEqual(capture[IPv6].src, aftr_ip6)
196         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
197         self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
198         self.assertEqual(capture[IP].dst, "192.168.1.1")
199         self.assertEqual(capture[TCP].sport, 10001)
200         self.assertEqual(capture[TCP].dport, 20001)
201         self.assert_packet_checksums_valid(capture)
202
203         # ICMP
204         p = (
205             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
206             / IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6)
207             / IP(dst=self.pg0.remote_ip4, src="192.168.1.1")
208             / ICMP(id=4000, type="echo-request")
209         )
210         self.pg1.add_stream(p)
211         self.pg_enable_capture(self.pg_interfaces)
212         self.pg_start()
213         capture = self.pg0.get_capture(1)
214         capture = capture[0]
215         self.assertFalse(capture.haslayer(IPv6))
216         self.assertEqual(capture[IP].src, self.nat_addr)
217         self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
218         self.assertNotEqual(capture[ICMP].id, 4000)
219         self.assert_packet_checksums_valid(capture)
220         out_id = capture[ICMP].id
221
222         p = (
223             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
224             / IP(dst=self.nat_addr, src=self.pg0.remote_ip4)
225             / ICMP(id=out_id, type="echo-reply")
226         )
227         self.pg0.add_stream(p)
228         self.pg_enable_capture(self.pg_interfaces)
229         self.pg_start()
230         capture = self.pg1.get_capture(1)
231         capture = capture[0]
232         self.assertEqual(capture[IPv6].src, aftr_ip6)
233         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
234         self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
235         self.assertEqual(capture[IP].dst, "192.168.1.1")
236         self.assertEqual(capture[ICMP].id, 4000)
237         self.assert_packet_checksums_valid(capture)
238
239         # ping DS-Lite AFTR tunnel endpoint address
240         p = (
241             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
242             / IPv6(src=self.pg1.remote_hosts[1].ip6, dst=aftr_ip6)
243             / ICMPv6EchoRequest()
244         )
245         self.pg1.add_stream(p)
246         self.pg_enable_capture(self.pg_interfaces)
247         self.pg_start()
248         capture = self.pg1.get_capture(1)
249         capture = capture[0]
250         self.assertEqual(capture[IPv6].src, aftr_ip6)
251         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
252         self.assertTrue(capture.haslayer(ICMPv6EchoReply))
253
254         b4s = self.statistics.get_counter("/dslite/total-b4s")
255         self.assertEqual(b4s[0][0], 2)
256         sessions = self.statistics.get_counter("/dslite/total-sessions")
257         self.assertEqual(sessions[0][0], 3)
258
259     def tearDown(self):
260         super(TestDSlite, self).tearDown()
261
262     def show_commands_at_teardown(self):
263         self.logger.info(self.vapi.cli("show dslite pool"))
264         self.logger.info(self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
265         self.logger.info(self.vapi.cli("show dslite sessions"))
266
267
268 class TestDSliteCE(VppTestCase):
269     """DS-Lite CE Test Cases"""
270
271     @classmethod
272     def setUpConstants(cls):
273         super(TestDSliteCE, cls).setUpConstants()
274         cls.vpp_cmdline.extend(["dslite", "{", "ce", "}"])
275
276     @classmethod
277     def setUpClass(cls):
278         super(TestDSliteCE, cls).setUpClass()
279
280         try:
281             cls.create_pg_interfaces(range(2))
282             cls.pg0.admin_up()
283             cls.pg0.config_ip4()
284             cls.pg0.resolve_arp()
285             cls.pg1.admin_up()
286             cls.pg1.config_ip6()
287             cls.pg1.generate_remote_hosts(1)
288             cls.pg1.configure_ipv6_neighbors()
289
290         except Exception:
291             super(TestDSliteCE, cls).tearDownClass()
292             raise
293
294     @classmethod
295     def tearDownClass(cls):
296         super(TestDSliteCE, cls).tearDownClass()
297
298     def test_dslite_ce(self):
299         """Test DS-Lite CE"""
300         b4_ip4 = "192.0.0.2"
301         b4_ip6 = "2001:db8:62aa::375e:f4c1:1"
302         self.vapi.dslite_set_b4_addr(ip4_addr=b4_ip4, ip6_addr=b4_ip6)
303
304         aftr_ip4 = "192.0.0.1"
305         aftr_ip6 = "2001:db8:85a3::8a2e:370:1"
306         aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
307         self.vapi.dslite_set_aftr_addr(ip4_addr=aftr_ip4, ip6_addr=aftr_ip6)
308
309         r1 = VppIpRoute(
310             self,
311             aftr_ip6,
312             128,
313             [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)],
314         )
315         r1.add_vpp_config()
316
317         # UDP encapsulation
318         p = (
319             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
320             / IP(dst=self.pg1.remote_ip4, src=self.pg0.remote_ip4)
321             / UDP(sport=10000, dport=20000)
322         )
323         self.pg0.add_stream(p)
324         self.pg_enable_capture(self.pg_interfaces)
325         self.pg_start()
326         capture = self.pg1.get_capture(1)
327         capture = capture[0]
328         self.assertEqual(capture[IPv6].src, b4_ip6)
329         self.assertEqual(capture[IPv6].dst, aftr_ip6)
330         self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
331         self.assertEqual(capture[IP].dst, self.pg1.remote_ip4)
332         self.assertEqual(capture[UDP].sport, 10000)
333         self.assertEqual(capture[UDP].dport, 20000)
334         self.assert_packet_checksums_valid(capture)
335
336         # UDP decapsulation
337         p = (
338             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
339             / IPv6(dst=b4_ip6, src=aftr_ip6)
340             / IP(dst=self.pg0.remote_ip4, src=self.pg1.remote_ip4)
341             / UDP(sport=20000, dport=10000)
342         )
343         self.pg1.add_stream(p)
344         self.pg_enable_capture(self.pg_interfaces)
345         self.pg_start()
346         capture = self.pg0.get_capture(1)
347         capture = capture[0]
348         self.assertFalse(capture.haslayer(IPv6))
349         self.assertEqual(capture[IP].src, self.pg1.remote_ip4)
350         self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
351         self.assertEqual(capture[UDP].sport, 20000)
352         self.assertEqual(capture[UDP].dport, 10000)
353         self.assert_packet_checksums_valid(capture)
354
355         # ping DS-Lite B4 tunnel endpoint address
356         p = (
357             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
358             / IPv6(src=self.pg1.remote_hosts[0].ip6, dst=b4_ip6)
359             / ICMPv6EchoRequest()
360         )
361         self.pg1.add_stream(p)
362         self.pg_enable_capture(self.pg_interfaces)
363         self.pg_start()
364         capture = self.pg1.get_capture(1)
365         capture = capture[0]
366         self.assertEqual(capture[IPv6].src, b4_ip6)
367         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
368         self.assertTrue(capture.haslayer(ICMPv6EchoReply))
369
370     def tearDown(self):
371         super(TestDSliteCE, self).tearDown()
372
373     def show_commands_at_teardown(self):
374         self.logger.info(self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
375         self.logger.info(self.vapi.cli("show dslite b4-tunnel-endpoint-address"))