dfa7d7c7c21fd758dda817a00d209600e939c27d
[vpp.git] / src / plugins / nat / test / test_dslite.py
1 #!/usr/bin/env python3
2
3 import socket
4 import unittest
5 import struct
6 import random
7
8 from framework import VppTestCase, VppTestRunner, running_extended_tests
9
10 import scapy.compat
11 from scapy.layers.inet import IP, TCP, UDP, ICMP
12 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
13 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply, \
14     ICMPv6ND_NS, ICMPv6ND_NA, ICMPv6NDOptDstLLAddr, fragment6
15 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
16 from scapy.layers.l2 import Ether, ARP, GRE
17 from scapy.data import IP_PROTOS
18 from scapy.packet import bind_layers, Raw
19 from util import ppp
20 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
21 from time import sleep
22 from util import ip4_range
23 from vpp_papi import mac_pton
24 from syslog_rfc5424_parser import SyslogMessage, ParseError
25 from syslog_rfc5424_parser.constants import SyslogFacility, SyslogSeverity
26 from io import BytesIO
27 from vpp_papi import VppEnum
28 from vpp_ip_route import VppIpRoute, VppRoutePath, FibPathType
29 from vpp_neighbor import VppNeighbor
30 from scapy.all import bind_layers, Packet, ByteEnumField, ShortField, \
31     IPField, IntField, LongField, XByteField, FlagsField, FieldLenField, \
32     PacketListField
33 from ipaddress import IPv6Network
34
35
36 class TestDSlite(VppTestCase):
37     """ DS-Lite Test Cases """
38
39     @classmethod
40     def setUpClass(cls):
41         super(TestDSlite, cls).setUpClass()
42
43         try:
44             cls.nat_addr = '10.0.0.3'
45
46             cls.create_pg_interfaces(range(3))
47             cls.pg0.admin_up()
48             cls.pg0.config_ip4()
49             cls.pg0.resolve_arp()
50             cls.pg1.admin_up()
51             cls.pg1.config_ip6()
52             cls.pg1.generate_remote_hosts(2)
53             cls.pg1.configure_ipv6_neighbors()
54             cls.pg2.admin_up()
55             cls.pg2.config_ip4()
56             cls.pg2.resolve_arp()
57
58         except Exception:
59             super(TestDSlite, cls).tearDownClass()
60             raise
61
62     @classmethod
63     def tearDownClass(cls):
64         super(TestDSlite, cls).tearDownClass()
65
66     def verify_syslog_apmadd(self, data, isaddr, isport, xsaddr, xsport,
67                              sv6enc, proto):
68         message = data.decode('utf-8')
69         try:
70             message = SyslogMessage.parse(message)
71         except ParseError as e:
72             self.logger.error(e)
73         else:
74             self.assertEqual(message.severity, SyslogSeverity.info)
75             self.assertEqual(message.appname, 'NAT')
76             self.assertEqual(message.msgid, 'APMADD')
77             sd_params = message.sd.get('napmap')
78             self.assertTrue(sd_params is not None)
79             self.assertEqual(sd_params.get('IATYP'), 'IPv4')
80             self.assertEqual(sd_params.get('ISADDR'), isaddr)
81             self.assertEqual(sd_params.get('ISPORT'), "%d" % isport)
82             self.assertEqual(sd_params.get('XATYP'), 'IPv4')
83             self.assertEqual(sd_params.get('XSADDR'), xsaddr)
84             self.assertEqual(sd_params.get('XSPORT'), "%d" % xsport)
85             self.assertEqual(sd_params.get('PROTO'), "%d" % proto)
86             self.assertTrue(sd_params.get('SSUBIX') is not None)
87             self.assertEqual(sd_params.get('SV6ENC'), sv6enc)
88
89     def test_dslite(self):
90         """ Test DS-Lite """
91         nat_config = self.vapi.nat_show_config()
92         self.assertEqual(0, nat_config.dslite_ce)
93
94         self.vapi.dslite_add_del_pool_addr_range(start_addr=self.nat_addr,
95                                                  end_addr=self.nat_addr,
96                                                  is_add=1)
97         aftr_ip4 = '192.0.0.1'
98         aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
99         self.vapi.dslite_set_aftr_addr(ip4_addr=aftr_ip4, ip6_addr=aftr_ip6)
100         self.vapi.syslog_set_sender(self.pg2.local_ip4, self.pg2.remote_ip4)
101
102         # UDP
103         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
104              IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[0].ip6) /
105              IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
106              UDP(sport=20000, dport=10000))
107         self.pg1.add_stream(p)
108         self.pg_enable_capture(self.pg_interfaces)
109         self.pg_start()
110         capture = self.pg0.get_capture(1)
111         capture = capture[0]
112         self.assertFalse(capture.haslayer(IPv6))
113         self.assertEqual(capture[IP].src, self.nat_addr)
114         self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
115         self.assertNotEqual(capture[UDP].sport, 20000)
116         self.assertEqual(capture[UDP].dport, 10000)
117         self.assert_packet_checksums_valid(capture)
118         out_port = capture[UDP].sport
119         capture = self.pg2.get_capture(1)
120         self.verify_syslog_apmadd(capture[0][Raw].load, '192.168.1.1',
121                                   20000, self.nat_addr, out_port,
122                                   self.pg1.remote_hosts[0].ip6, IP_PROTOS.udp)
123
124         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
125              IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
126              UDP(sport=10000, dport=out_port))
127         self.pg0.add_stream(p)
128         self.pg_enable_capture(self.pg_interfaces)
129         self.pg_start()
130         capture = self.pg1.get_capture(1)
131         capture = capture[0]
132         self.assertEqual(capture[IPv6].src, aftr_ip6)
133         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
134         self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
135         self.assertEqual(capture[IP].dst, '192.168.1.1')
136         self.assertEqual(capture[UDP].sport, 10000)
137         self.assertEqual(capture[UDP].dport, 20000)
138         self.assert_packet_checksums_valid(capture)
139
140         # TCP
141         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
142              IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
143              IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
144              TCP(sport=20001, dport=10001))
145         self.pg1.add_stream(p)
146         self.pg_enable_capture(self.pg_interfaces)
147         self.pg_start()
148         capture = self.pg0.get_capture(1)
149         capture = capture[0]
150         self.assertFalse(capture.haslayer(IPv6))
151         self.assertEqual(capture[IP].src, self.nat_addr)
152         self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
153         self.assertNotEqual(capture[TCP].sport, 20001)
154         self.assertEqual(capture[TCP].dport, 10001)
155         self.assert_packet_checksums_valid(capture)
156         out_port = capture[TCP].sport
157
158         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
159              IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
160              TCP(sport=10001, dport=out_port))
161         self.pg0.add_stream(p)
162         self.pg_enable_capture(self.pg_interfaces)
163         self.pg_start()
164         capture = self.pg1.get_capture(1)
165         capture = capture[0]
166         self.assertEqual(capture[IPv6].src, aftr_ip6)
167         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
168         self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
169         self.assertEqual(capture[IP].dst, '192.168.1.1')
170         self.assertEqual(capture[TCP].sport, 10001)
171         self.assertEqual(capture[TCP].dport, 20001)
172         self.assert_packet_checksums_valid(capture)
173
174         # ICMP
175         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
176              IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
177              IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
178              ICMP(id=4000, type='echo-request'))
179         self.pg1.add_stream(p)
180         self.pg_enable_capture(self.pg_interfaces)
181         self.pg_start()
182         capture = self.pg0.get_capture(1)
183         capture = capture[0]
184         self.assertFalse(capture.haslayer(IPv6))
185         self.assertEqual(capture[IP].src, self.nat_addr)
186         self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
187         self.assertNotEqual(capture[ICMP].id, 4000)
188         self.assert_packet_checksums_valid(capture)
189         out_id = capture[ICMP].id
190
191         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
192              IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
193              ICMP(id=out_id, type='echo-reply'))
194         self.pg0.add_stream(p)
195         self.pg_enable_capture(self.pg_interfaces)
196         self.pg_start()
197         capture = self.pg1.get_capture(1)
198         capture = capture[0]
199         self.assertEqual(capture[IPv6].src, aftr_ip6)
200         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
201         self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
202         self.assertEqual(capture[IP].dst, '192.168.1.1')
203         self.assertEqual(capture[ICMP].id, 4000)
204         self.assert_packet_checksums_valid(capture)
205
206         # ping DS-Lite AFTR tunnel endpoint address
207         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
208              IPv6(src=self.pg1.remote_hosts[1].ip6, dst=aftr_ip6) /
209              ICMPv6EchoRequest())
210         self.pg1.add_stream(p)
211         self.pg_enable_capture(self.pg_interfaces)
212         self.pg_start()
213         capture = self.pg1.get_capture(1)
214         capture = capture[0]
215         self.assertEqual(capture[IPv6].src, aftr_ip6)
216         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
217         self.assertTrue(capture.haslayer(ICMPv6EchoReply))
218
219         b4s = self.statistics.get_counter('/dslite/total-b4s')
220         self.assertEqual(b4s[0][0], 2)
221         sessions = self.statistics.get_counter('/dslite/total-sessions')
222         self.assertEqual(sessions[0][0], 3)
223
224     def tearDown(self):
225         super(TestDSlite, self).tearDown()
226
227     def show_commands_at_teardown(self):
228         self.logger.info(self.vapi.cli("show dslite pool"))
229         self.logger.info(
230             self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
231         self.logger.info(self.vapi.cli("show dslite sessions"))
232
233
234 class TestDSliteCE():
235     """ DS-Lite CE Test Cases """
236
237     @classmethod
238     def setUpConstants(cls):
239         super(TestDSliteCE, cls).setUpConstants()
240         cls.vpp_cmdline.extend(["nat", "{", "dslite ce", "}"])
241
242     @classmethod
243     def setUpClass(cls):
244         super(TestDSliteCE, cls).setUpClass()
245
246         try:
247             cls.create_pg_interfaces(range(2))
248             cls.pg0.admin_up()
249             cls.pg0.config_ip4()
250             cls.pg0.resolve_arp()
251             cls.pg1.admin_up()
252             cls.pg1.config_ip6()
253             cls.pg1.generate_remote_hosts(1)
254             cls.pg1.configure_ipv6_neighbors()
255
256         except Exception:
257             super(TestDSliteCE, cls).tearDownClass()
258             raise
259
260     @classmethod
261     def tearDownClass(cls):
262         super(TestDSliteCE, cls).tearDownClass()
263
264     def test_dslite_ce(self):
265         """ Test DS-Lite CE """
266
267         nat_config = self.vapi.nat_show_config()
268         self.assertEqual(1, nat_config.dslite_ce)
269
270         b4_ip4 = '192.0.0.2'
271         b4_ip6 = '2001:db8:62aa::375e:f4c1:1'
272         self.vapi.dslite_set_b4_addr(ip4_addr=b4_ip4, ip6_addr=b4_ip6)
273
274         aftr_ip4 = '192.0.0.1'
275         aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
276         aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
277         self.vapi.dslite_set_aftr_addr(ip4_addr=aftr_ip4, ip6_addr=aftr_ip6)
278
279         r1 = VppIpRoute(self, aftr_ip6, 128,
280                         [VppRoutePath(self.pg1.remote_ip6,
281                                       self.pg1.sw_if_index)])
282         r1.add_vpp_config()
283
284         # UDP encapsulation
285         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
286              IP(dst=self.pg1.remote_ip4, src=self.pg0.remote_ip4) /
287              UDP(sport=10000, dport=20000))
288         self.pg0.add_stream(p)
289         self.pg_enable_capture(self.pg_interfaces)
290         self.pg_start()
291         capture = self.pg1.get_capture(1)
292         capture = capture[0]
293         self.assertEqual(capture[IPv6].src, b4_ip6)
294         self.assertEqual(capture[IPv6].dst, aftr_ip6)
295         self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
296         self.assertEqual(capture[IP].dst, self.pg1.remote_ip4)
297         self.assertEqual(capture[UDP].sport, 10000)
298         self.assertEqual(capture[UDP].dport, 20000)
299         self.assert_packet_checksums_valid(capture)
300
301         # UDP decapsulation
302         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
303              IPv6(dst=b4_ip6, src=aftr_ip6) /
304              IP(dst=self.pg0.remote_ip4, src=self.pg1.remote_ip4) /
305              UDP(sport=20000, dport=10000))
306         self.pg1.add_stream(p)
307         self.pg_enable_capture(self.pg_interfaces)
308         self.pg_start()
309         capture = self.pg0.get_capture(1)
310         capture = capture[0]
311         self.assertFalse(capture.haslayer(IPv6))
312         self.assertEqual(capture[IP].src, self.pg1.remote_ip4)
313         self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
314         self.assertEqual(capture[UDP].sport, 20000)
315         self.assertEqual(capture[UDP].dport, 10000)
316         self.assert_packet_checksums_valid(capture)
317
318         # ping DS-Lite B4 tunnel endpoint address
319         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
320              IPv6(src=self.pg1.remote_hosts[0].ip6, dst=b4_ip6) /
321              ICMPv6EchoRequest())
322         self.pg1.add_stream(p)
323         self.pg_enable_capture(self.pg_interfaces)
324         self.pg_start()
325         capture = self.pg1.get_capture(1)
326         capture = capture[0]
327         self.assertEqual(capture[IPv6].src, b4_ip6)
328         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
329         self.assertTrue(capture.haslayer(ICMPv6EchoReply))
330
331     def tearDown(self):
332         super(TestDSliteCE, self).tearDown()
333
334     def show_commands_at_teardown(self):
335         self.logger.info(
336             self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
337         self.logger.info(
338             self.vapi.cli("show dslite b4-tunnel-endpoint-address"))