tests: tag the tests that do not work with multi-worker configuration
[vpp.git] / src / plugins / nat / test / test_dslite.py
1 #!/usr/bin/env python3
2
3 import socket
4 import unittest
5 import struct
6 import random
7
8 from framework import tag_fixme_vpp_workers
9 from framework import VppTestCase, VppTestRunner, running_extended_tests
10
11 import scapy.compat
12 from scapy.layers.inet import IP, TCP, UDP, ICMP
13 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
14 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply, \
15     ICMPv6ND_NS, ICMPv6ND_NA, ICMPv6NDOptDstLLAddr, fragment6
16 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
17 from scapy.layers.l2 import Ether, ARP, GRE
18 from scapy.data import IP_PROTOS
19 from scapy.packet import bind_layers, Raw
20 from util import ppp
21 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
22 from time import sleep
23 from util import ip4_range
24 from vpp_papi import mac_pton
25 from syslog_rfc5424_parser import SyslogMessage, ParseError
26 from syslog_rfc5424_parser.constants import SyslogFacility, SyslogSeverity
27 from io import BytesIO
28 from vpp_papi import VppEnum
29 from vpp_ip_route import VppIpRoute, VppRoutePath, FibPathType
30 from vpp_neighbor import VppNeighbor
31 from scapy.all import bind_layers, Packet, ByteEnumField, ShortField, \
32     IPField, IntField, LongField, XByteField, FlagsField, FieldLenField, \
33     PacketListField
34 from ipaddress import IPv6Network
35
36
37 @tag_fixme_vpp_workers
38 class TestDSlite(VppTestCase):
39     """ DS-Lite Test Cases """
40
41     @classmethod
42     def setUpClass(cls):
43         super(TestDSlite, cls).setUpClass()
44
45         try:
46             cls.nat_addr = '10.0.0.3'
47
48             cls.create_pg_interfaces(range(3))
49             cls.pg0.admin_up()
50             cls.pg0.config_ip4()
51             cls.pg0.resolve_arp()
52             cls.pg1.admin_up()
53             cls.pg1.config_ip6()
54             cls.pg1.generate_remote_hosts(2)
55             cls.pg1.configure_ipv6_neighbors()
56             cls.pg2.admin_up()
57             cls.pg2.config_ip4()
58             cls.pg2.resolve_arp()
59
60         except Exception:
61             super(TestDSlite, cls).tearDownClass()
62             raise
63
64     @classmethod
65     def tearDownClass(cls):
66         super(TestDSlite, cls).tearDownClass()
67
68     def verify_syslog_apmadd(self, data, isaddr, isport, xsaddr, xsport,
69                              sv6enc, proto):
70         message = data.decode('utf-8')
71         try:
72             message = SyslogMessage.parse(message)
73         except ParseError as e:
74             self.logger.error(e)
75         else:
76             self.assertEqual(message.severity, SyslogSeverity.info)
77             self.assertEqual(message.appname, 'NAT')
78             self.assertEqual(message.msgid, 'APMADD')
79             sd_params = message.sd.get('napmap')
80             self.assertTrue(sd_params is not None)
81             self.assertEqual(sd_params.get('IATYP'), 'IPv4')
82             self.assertEqual(sd_params.get('ISADDR'), isaddr)
83             self.assertEqual(sd_params.get('ISPORT'), "%d" % isport)
84             self.assertEqual(sd_params.get('XATYP'), 'IPv4')
85             self.assertEqual(sd_params.get('XSADDR'), xsaddr)
86             self.assertEqual(sd_params.get('XSPORT'), "%d" % xsport)
87             self.assertEqual(sd_params.get('PROTO'), "%d" % proto)
88             self.assertTrue(sd_params.get('SSUBIX') is not None)
89             self.assertEqual(sd_params.get('SV6ENC'), sv6enc)
90
91     def test_dslite(self):
92         """ Test DS-Lite """
93         nat_config = self.vapi.nat_show_config()
94         self.assertEqual(0, nat_config.dslite_ce)
95
96         self.vapi.dslite_add_del_pool_addr_range(start_addr=self.nat_addr,
97                                                  end_addr=self.nat_addr,
98                                                  is_add=1)
99         aftr_ip4 = '192.0.0.1'
100         aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
101         self.vapi.dslite_set_aftr_addr(ip4_addr=aftr_ip4, ip6_addr=aftr_ip6)
102         self.vapi.syslog_set_sender(self.pg2.local_ip4, self.pg2.remote_ip4)
103
104         # UDP
105         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
106              IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[0].ip6) /
107              IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
108              UDP(sport=20000, dport=10000))
109         self.pg1.add_stream(p)
110         self.pg_enable_capture(self.pg_interfaces)
111         self.pg_start()
112         capture = self.pg0.get_capture(1)
113         capture = capture[0]
114         self.assertFalse(capture.haslayer(IPv6))
115         self.assertEqual(capture[IP].src, self.nat_addr)
116         self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
117         self.assertNotEqual(capture[UDP].sport, 20000)
118         self.assertEqual(capture[UDP].dport, 10000)
119         self.assert_packet_checksums_valid(capture)
120         out_port = capture[UDP].sport
121         capture = self.pg2.get_capture(1)
122         self.verify_syslog_apmadd(capture[0][Raw].load, '192.168.1.1',
123                                   20000, self.nat_addr, out_port,
124                                   self.pg1.remote_hosts[0].ip6, IP_PROTOS.udp)
125
126         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
127              IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
128              UDP(sport=10000, dport=out_port))
129         self.pg0.add_stream(p)
130         self.pg_enable_capture(self.pg_interfaces)
131         self.pg_start()
132         capture = self.pg1.get_capture(1)
133         capture = capture[0]
134         self.assertEqual(capture[IPv6].src, aftr_ip6)
135         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
136         self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
137         self.assertEqual(capture[IP].dst, '192.168.1.1')
138         self.assertEqual(capture[UDP].sport, 10000)
139         self.assertEqual(capture[UDP].dport, 20000)
140         self.assert_packet_checksums_valid(capture)
141
142         # TCP
143         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
144              IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
145              IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
146              TCP(sport=20001, dport=10001))
147         self.pg1.add_stream(p)
148         self.pg_enable_capture(self.pg_interfaces)
149         self.pg_start()
150         capture = self.pg0.get_capture(1)
151         capture = capture[0]
152         self.assertFalse(capture.haslayer(IPv6))
153         self.assertEqual(capture[IP].src, self.nat_addr)
154         self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
155         self.assertNotEqual(capture[TCP].sport, 20001)
156         self.assertEqual(capture[TCP].dport, 10001)
157         self.assert_packet_checksums_valid(capture)
158         out_port = capture[TCP].sport
159
160         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
161              IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
162              TCP(sport=10001, dport=out_port))
163         self.pg0.add_stream(p)
164         self.pg_enable_capture(self.pg_interfaces)
165         self.pg_start()
166         capture = self.pg1.get_capture(1)
167         capture = capture[0]
168         self.assertEqual(capture[IPv6].src, aftr_ip6)
169         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
170         self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
171         self.assertEqual(capture[IP].dst, '192.168.1.1')
172         self.assertEqual(capture[TCP].sport, 10001)
173         self.assertEqual(capture[TCP].dport, 20001)
174         self.assert_packet_checksums_valid(capture)
175
176         # ICMP
177         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
178              IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
179              IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
180              ICMP(id=4000, type='echo-request'))
181         self.pg1.add_stream(p)
182         self.pg_enable_capture(self.pg_interfaces)
183         self.pg_start()
184         capture = self.pg0.get_capture(1)
185         capture = capture[0]
186         self.assertFalse(capture.haslayer(IPv6))
187         self.assertEqual(capture[IP].src, self.nat_addr)
188         self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
189         self.assertNotEqual(capture[ICMP].id, 4000)
190         self.assert_packet_checksums_valid(capture)
191         out_id = capture[ICMP].id
192
193         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
194              IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
195              ICMP(id=out_id, type='echo-reply'))
196         self.pg0.add_stream(p)
197         self.pg_enable_capture(self.pg_interfaces)
198         self.pg_start()
199         capture = self.pg1.get_capture(1)
200         capture = capture[0]
201         self.assertEqual(capture[IPv6].src, aftr_ip6)
202         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
203         self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
204         self.assertEqual(capture[IP].dst, '192.168.1.1')
205         self.assertEqual(capture[ICMP].id, 4000)
206         self.assert_packet_checksums_valid(capture)
207
208         # ping DS-Lite AFTR tunnel endpoint address
209         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
210              IPv6(src=self.pg1.remote_hosts[1].ip6, dst=aftr_ip6) /
211              ICMPv6EchoRequest())
212         self.pg1.add_stream(p)
213         self.pg_enable_capture(self.pg_interfaces)
214         self.pg_start()
215         capture = self.pg1.get_capture(1)
216         capture = capture[0]
217         self.assertEqual(capture[IPv6].src, aftr_ip6)
218         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
219         self.assertTrue(capture.haslayer(ICMPv6EchoReply))
220
221         b4s = self.statistics.get_counter('/dslite/total-b4s')
222         self.assertEqual(b4s[0][0], 2)
223         sessions = self.statistics.get_counter('/dslite/total-sessions')
224         self.assertEqual(sessions[0][0], 3)
225
226     def tearDown(self):
227         super(TestDSlite, self).tearDown()
228
229     def show_commands_at_teardown(self):
230         self.logger.info(self.vapi.cli("show dslite pool"))
231         self.logger.info(
232             self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
233         self.logger.info(self.vapi.cli("show dslite sessions"))
234
235
236 class TestDSliteCE(VppTestCase):
237     """ DS-Lite CE Test Cases """
238
239     @classmethod
240     def setUpConstants(cls):
241         super(TestDSliteCE, cls).setUpConstants()
242         cls.vpp_cmdline.extend(["dslite", "{", "ce", "}"])
243
244     @classmethod
245     def setUpClass(cls):
246         super(TestDSliteCE, cls).setUpClass()
247
248         try:
249             cls.create_pg_interfaces(range(2))
250             cls.pg0.admin_up()
251             cls.pg0.config_ip4()
252             cls.pg0.resolve_arp()
253             cls.pg1.admin_up()
254             cls.pg1.config_ip6()
255             cls.pg1.generate_remote_hosts(1)
256             cls.pg1.configure_ipv6_neighbors()
257
258         except Exception:
259             super(TestDSliteCE, cls).tearDownClass()
260             raise
261
262     @classmethod
263     def tearDownClass(cls):
264         super(TestDSliteCE, cls).tearDownClass()
265
266     def test_dslite_ce(self):
267         """ Test DS-Lite CE """
268
269         # TODO: add message to retrieve dslite config
270         # nat_config = self.vapi.nat_show_config()
271         # self.assertEqual(1, nat_config.dslite_ce)
272
273         b4_ip4 = '192.0.0.2'
274         b4_ip6 = '2001:db8:62aa::375e:f4c1:1'
275         self.vapi.dslite_set_b4_addr(ip4_addr=b4_ip4, ip6_addr=b4_ip6)
276
277         aftr_ip4 = '192.0.0.1'
278         aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
279         aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
280         self.vapi.dslite_set_aftr_addr(ip4_addr=aftr_ip4, ip6_addr=aftr_ip6)
281
282         r1 = VppIpRoute(self, aftr_ip6, 128,
283                         [VppRoutePath(self.pg1.remote_ip6,
284                                       self.pg1.sw_if_index)])
285         r1.add_vpp_config()
286
287         # UDP encapsulation
288         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
289              IP(dst=self.pg1.remote_ip4, src=self.pg0.remote_ip4) /
290              UDP(sport=10000, dport=20000))
291         self.pg0.add_stream(p)
292         self.pg_enable_capture(self.pg_interfaces)
293         self.pg_start()
294         capture = self.pg1.get_capture(1)
295         capture = capture[0]
296         self.assertEqual(capture[IPv6].src, b4_ip6)
297         self.assertEqual(capture[IPv6].dst, aftr_ip6)
298         self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
299         self.assertEqual(capture[IP].dst, self.pg1.remote_ip4)
300         self.assertEqual(capture[UDP].sport, 10000)
301         self.assertEqual(capture[UDP].dport, 20000)
302         self.assert_packet_checksums_valid(capture)
303
304         # UDP decapsulation
305         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
306              IPv6(dst=b4_ip6, src=aftr_ip6) /
307              IP(dst=self.pg0.remote_ip4, src=self.pg1.remote_ip4) /
308              UDP(sport=20000, dport=10000))
309         self.pg1.add_stream(p)
310         self.pg_enable_capture(self.pg_interfaces)
311         self.pg_start()
312         capture = self.pg0.get_capture(1)
313         capture = capture[0]
314         self.assertFalse(capture.haslayer(IPv6))
315         self.assertEqual(capture[IP].src, self.pg1.remote_ip4)
316         self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
317         self.assertEqual(capture[UDP].sport, 20000)
318         self.assertEqual(capture[UDP].dport, 10000)
319         self.assert_packet_checksums_valid(capture)
320
321         # ping DS-Lite B4 tunnel endpoint address
322         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
323              IPv6(src=self.pg1.remote_hosts[0].ip6, dst=b4_ip6) /
324              ICMPv6EchoRequest())
325         self.pg1.add_stream(p)
326         self.pg_enable_capture(self.pg_interfaces)
327         self.pg_start()
328         capture = self.pg1.get_capture(1)
329         capture = capture[0]
330         self.assertEqual(capture[IPv6].src, b4_ip6)
331         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
332         self.assertTrue(capture.haslayer(ICMPv6EchoReply))
333
334     def tearDown(self):
335         super(TestDSliteCE, self).tearDown()
336
337     def show_commands_at_teardown(self):
338         self.logger.info(
339             self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
340         self.logger.info(
341             self.vapi.cli("show dslite b4-tunnel-endpoint-address"))