nat: nat44-ed cleanup & fixes
[vpp.git] / test / test_dslite.py
1 #!/usr/bin/env python3
2
3 import socket
4 import unittest
5 import struct
6 import random
7
8 from framework import tag_fixme_vpp_workers
9 from framework import VppTestCase, VppTestRunner
10
11 import scapy.compat
12 from scapy.layers.inet import IP, TCP, UDP, ICMP
13 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
14 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply, \
15     ICMPv6ND_NS, ICMPv6ND_NA, ICMPv6NDOptDstLLAddr, fragment6
16 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
17 from scapy.layers.l2 import Ether, ARP, GRE
18 from scapy.data import IP_PROTOS
19 from scapy.packet import bind_layers, Raw
20 from util import ppp
21 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
22 from time import sleep
23 from util import ip4_range
24 from vpp_papi import mac_pton
25 from syslog_rfc5424_parser import SyslogMessage, ParseError
26 from syslog_rfc5424_parser.constants import SyslogFacility, SyslogSeverity
27 from io import BytesIO
28 from vpp_papi import VppEnum
29 from vpp_ip_route import VppIpRoute, VppRoutePath, FibPathType
30 from vpp_neighbor import VppNeighbor
31 from scapy.all import bind_layers, Packet, ByteEnumField, ShortField, \
32     IPField, IntField, LongField, XByteField, FlagsField, FieldLenField, \
33     PacketListField
34 from ipaddress import IPv6Network
35
36
37 @tag_fixme_vpp_workers
38 class TestDSlite(VppTestCase):
39     """ DS-Lite Test Cases """
40
41     @classmethod
42     def setUpClass(cls):
43         super(TestDSlite, cls).setUpClass()
44
45         try:
46             cls.nat_addr = '10.0.0.3'
47
48             cls.create_pg_interfaces(range(3))
49             cls.pg0.admin_up()
50             cls.pg0.config_ip4()
51             cls.pg0.resolve_arp()
52             cls.pg1.admin_up()
53             cls.pg1.config_ip6()
54             cls.pg1.generate_remote_hosts(2)
55             cls.pg1.configure_ipv6_neighbors()
56             cls.pg2.admin_up()
57             cls.pg2.config_ip4()
58             cls.pg2.resolve_arp()
59
60         except Exception:
61             super(TestDSlite, cls).tearDownClass()
62             raise
63
64     @classmethod
65     def tearDownClass(cls):
66         super(TestDSlite, cls).tearDownClass()
67
68     def verify_syslog_apmadd(self, data, isaddr, isport, xsaddr, xsport,
69                              sv6enc, proto):
70         message = data.decode('utf-8')
71         try:
72             message = SyslogMessage.parse(message)
73         except ParseError as e:
74             self.logger.error(e)
75         else:
76             self.assertEqual(message.severity, SyslogSeverity.info)
77             self.assertEqual(message.appname, 'NAT')
78             self.assertEqual(message.msgid, 'APMADD')
79             sd_params = message.sd.get('napmap')
80             self.assertTrue(sd_params is not None)
81             self.assertEqual(sd_params.get('IATYP'), 'IPv4')
82             self.assertEqual(sd_params.get('ISADDR'), isaddr)
83             self.assertEqual(sd_params.get('ISPORT'), "%d" % isport)
84             self.assertEqual(sd_params.get('XATYP'), 'IPv4')
85             self.assertEqual(sd_params.get('XSADDR'), xsaddr)
86             self.assertEqual(sd_params.get('XSPORT'), "%d" % xsport)
87             self.assertEqual(sd_params.get('PROTO'), "%d" % proto)
88             self.assertTrue(sd_params.get('SSUBIX') is not None)
89             self.assertEqual(sd_params.get('SV6ENC'), sv6enc)
90
91     def test_dslite(self):
92         """ Test DS-Lite """
93         self.vapi.dslite_add_del_pool_addr_range(start_addr=self.nat_addr,
94                                                  end_addr=self.nat_addr,
95                                                  is_add=1)
96         aftr_ip4 = '192.0.0.1'
97         aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
98         self.vapi.dslite_set_aftr_addr(ip4_addr=aftr_ip4, ip6_addr=aftr_ip6)
99         self.vapi.syslog_set_sender(self.pg2.local_ip4, self.pg2.remote_ip4)
100
101         # UDP
102         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
103              IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[0].ip6) /
104              IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
105              UDP(sport=20000, dport=10000))
106         self.pg1.add_stream(p)
107         self.pg_enable_capture(self.pg_interfaces)
108         self.pg_start()
109         capture = self.pg0.get_capture(1)
110         capture = capture[0]
111         self.assertFalse(capture.haslayer(IPv6))
112         self.assertEqual(capture[IP].src, self.nat_addr)
113         self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
114         self.assertNotEqual(capture[UDP].sport, 20000)
115         self.assertEqual(capture[UDP].dport, 10000)
116         self.assert_packet_checksums_valid(capture)
117         out_port = capture[UDP].sport
118         capture = self.pg2.get_capture(1)
119         self.verify_syslog_apmadd(capture[0][Raw].load, '192.168.1.1',
120                                   20000, self.nat_addr, out_port,
121                                   self.pg1.remote_hosts[0].ip6, IP_PROTOS.udp)
122
123         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
124              IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
125              UDP(sport=10000, dport=out_port))
126         self.pg0.add_stream(p)
127         self.pg_enable_capture(self.pg_interfaces)
128         self.pg_start()
129         capture = self.pg1.get_capture(1)
130         capture = capture[0]
131         self.assertEqual(capture[IPv6].src, aftr_ip6)
132         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
133         self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
134         self.assertEqual(capture[IP].dst, '192.168.1.1')
135         self.assertEqual(capture[UDP].sport, 10000)
136         self.assertEqual(capture[UDP].dport, 20000)
137         self.assert_packet_checksums_valid(capture)
138
139         # TCP
140         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
141              IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
142              IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
143              TCP(sport=20001, dport=10001))
144         self.pg1.add_stream(p)
145         self.pg_enable_capture(self.pg_interfaces)
146         self.pg_start()
147         capture = self.pg0.get_capture(1)
148         capture = capture[0]
149         self.assertFalse(capture.haslayer(IPv6))
150         self.assertEqual(capture[IP].src, self.nat_addr)
151         self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
152         self.assertNotEqual(capture[TCP].sport, 20001)
153         self.assertEqual(capture[TCP].dport, 10001)
154         self.assert_packet_checksums_valid(capture)
155         out_port = capture[TCP].sport
156
157         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
158              IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
159              TCP(sport=10001, dport=out_port))
160         self.pg0.add_stream(p)
161         self.pg_enable_capture(self.pg_interfaces)
162         self.pg_start()
163         capture = self.pg1.get_capture(1)
164         capture = capture[0]
165         self.assertEqual(capture[IPv6].src, aftr_ip6)
166         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
167         self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
168         self.assertEqual(capture[IP].dst, '192.168.1.1')
169         self.assertEqual(capture[TCP].sport, 10001)
170         self.assertEqual(capture[TCP].dport, 20001)
171         self.assert_packet_checksums_valid(capture)
172
173         # ICMP
174         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
175              IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
176              IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
177              ICMP(id=4000, type='echo-request'))
178         self.pg1.add_stream(p)
179         self.pg_enable_capture(self.pg_interfaces)
180         self.pg_start()
181         capture = self.pg0.get_capture(1)
182         capture = capture[0]
183         self.assertFalse(capture.haslayer(IPv6))
184         self.assertEqual(capture[IP].src, self.nat_addr)
185         self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
186         self.assertNotEqual(capture[ICMP].id, 4000)
187         self.assert_packet_checksums_valid(capture)
188         out_id = capture[ICMP].id
189
190         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
191              IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
192              ICMP(id=out_id, type='echo-reply'))
193         self.pg0.add_stream(p)
194         self.pg_enable_capture(self.pg_interfaces)
195         self.pg_start()
196         capture = self.pg1.get_capture(1)
197         capture = capture[0]
198         self.assertEqual(capture[IPv6].src, aftr_ip6)
199         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
200         self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
201         self.assertEqual(capture[IP].dst, '192.168.1.1')
202         self.assertEqual(capture[ICMP].id, 4000)
203         self.assert_packet_checksums_valid(capture)
204
205         # ping DS-Lite AFTR tunnel endpoint address
206         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
207              IPv6(src=self.pg1.remote_hosts[1].ip6, dst=aftr_ip6) /
208              ICMPv6EchoRequest())
209         self.pg1.add_stream(p)
210         self.pg_enable_capture(self.pg_interfaces)
211         self.pg_start()
212         capture = self.pg1.get_capture(1)
213         capture = capture[0]
214         self.assertEqual(capture[IPv6].src, aftr_ip6)
215         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
216         self.assertTrue(capture.haslayer(ICMPv6EchoReply))
217
218         b4s = self.statistics.get_counter('/dslite/total-b4s')
219         self.assertEqual(b4s[0][0], 2)
220         sessions = self.statistics.get_counter('/dslite/total-sessions')
221         self.assertEqual(sessions[0][0], 3)
222
223     def tearDown(self):
224         super(TestDSlite, self).tearDown()
225
226     def show_commands_at_teardown(self):
227         self.logger.info(self.vapi.cli("show dslite pool"))
228         self.logger.info(
229             self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
230         self.logger.info(self.vapi.cli("show dslite sessions"))
231
232
233 class TestDSliteCE(VppTestCase):
234     """ DS-Lite CE Test Cases """
235
236     @classmethod
237     def setUpConstants(cls):
238         super(TestDSliteCE, cls).setUpConstants()
239         cls.vpp_cmdline.extend(["dslite", "{", "ce", "}"])
240
241     @classmethod
242     def setUpClass(cls):
243         super(TestDSliteCE, cls).setUpClass()
244
245         try:
246             cls.create_pg_interfaces(range(2))
247             cls.pg0.admin_up()
248             cls.pg0.config_ip4()
249             cls.pg0.resolve_arp()
250             cls.pg1.admin_up()
251             cls.pg1.config_ip6()
252             cls.pg1.generate_remote_hosts(1)
253             cls.pg1.configure_ipv6_neighbors()
254
255         except Exception:
256             super(TestDSliteCE, cls).tearDownClass()
257             raise
258
259     @classmethod
260     def tearDownClass(cls):
261         super(TestDSliteCE, cls).tearDownClass()
262
263     def test_dslite_ce(self):
264         """ Test DS-Lite CE """
265         b4_ip4 = '192.0.0.2'
266         b4_ip6 = '2001:db8:62aa::375e:f4c1:1'
267         self.vapi.dslite_set_b4_addr(ip4_addr=b4_ip4, ip6_addr=b4_ip6)
268
269         aftr_ip4 = '192.0.0.1'
270         aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
271         aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
272         self.vapi.dslite_set_aftr_addr(ip4_addr=aftr_ip4, ip6_addr=aftr_ip6)
273
274         r1 = VppIpRoute(self, aftr_ip6, 128,
275                         [VppRoutePath(self.pg1.remote_ip6,
276                                       self.pg1.sw_if_index)])
277         r1.add_vpp_config()
278
279         # UDP encapsulation
280         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
281              IP(dst=self.pg1.remote_ip4, src=self.pg0.remote_ip4) /
282              UDP(sport=10000, dport=20000))
283         self.pg0.add_stream(p)
284         self.pg_enable_capture(self.pg_interfaces)
285         self.pg_start()
286         capture = self.pg1.get_capture(1)
287         capture = capture[0]
288         self.assertEqual(capture[IPv6].src, b4_ip6)
289         self.assertEqual(capture[IPv6].dst, aftr_ip6)
290         self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
291         self.assertEqual(capture[IP].dst, self.pg1.remote_ip4)
292         self.assertEqual(capture[UDP].sport, 10000)
293         self.assertEqual(capture[UDP].dport, 20000)
294         self.assert_packet_checksums_valid(capture)
295
296         # UDP decapsulation
297         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
298              IPv6(dst=b4_ip6, src=aftr_ip6) /
299              IP(dst=self.pg0.remote_ip4, src=self.pg1.remote_ip4) /
300              UDP(sport=20000, dport=10000))
301         self.pg1.add_stream(p)
302         self.pg_enable_capture(self.pg_interfaces)
303         self.pg_start()
304         capture = self.pg0.get_capture(1)
305         capture = capture[0]
306         self.assertFalse(capture.haslayer(IPv6))
307         self.assertEqual(capture[IP].src, self.pg1.remote_ip4)
308         self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
309         self.assertEqual(capture[UDP].sport, 20000)
310         self.assertEqual(capture[UDP].dport, 10000)
311         self.assert_packet_checksums_valid(capture)
312
313         # ping DS-Lite B4 tunnel endpoint address
314         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
315              IPv6(src=self.pg1.remote_hosts[0].ip6, dst=b4_ip6) /
316              ICMPv6EchoRequest())
317         self.pg1.add_stream(p)
318         self.pg_enable_capture(self.pg_interfaces)
319         self.pg_start()
320         capture = self.pg1.get_capture(1)
321         capture = capture[0]
322         self.assertEqual(capture[IPv6].src, b4_ip6)
323         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
324         self.assertTrue(capture.haslayer(ICMPv6EchoReply))
325
326     def tearDown(self):
327         super(TestDSliteCE, self).tearDown()
328
329     def show_commands_at_teardown(self):
330         self.logger.info(
331             self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
332         self.logger.info(
333             self.vapi.cli("show dslite b4-tunnel-endpoint-address"))