8f5995e61fa1c2af2bd990498474839bdb7c2c9f
[vpp.git] / test / test_dslite.py
1 #!/usr/bin/env python3
2
3 import socket
4 import unittest
5 import struct
6 import random
7
8 from framework import tag_fixme_vpp_workers
9 from framework import VppTestCase, VppTestRunner
10
11 import scapy.compat
12 from scapy.layers.inet import IP, TCP, UDP, ICMP
13 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
14 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply, \
15     ICMPv6ND_NS, ICMPv6ND_NA, ICMPv6NDOptDstLLAddr, fragment6
16 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
17 from scapy.layers.l2 import Ether, ARP, GRE
18 from scapy.data import IP_PROTOS
19 from scapy.packet import bind_layers, Raw
20 from util import ppp
21 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
22 from time import sleep
23 from util import ip4_range
24 from vpp_papi import mac_pton
25 from syslog_rfc5424_parser import SyslogMessage, ParseError
26 from syslog_rfc5424_parser.constants import SyslogFacility, SyslogSeverity
27 from io import BytesIO
28 from vpp_papi import VppEnum
29 from vpp_ip_route import VppIpRoute, VppRoutePath, FibPathType
30 from vpp_neighbor import VppNeighbor
31 from scapy.all import bind_layers, Packet, ByteEnumField, ShortField, \
32     IPField, IntField, LongField, XByteField, FlagsField, FieldLenField, \
33     PacketListField
34 from ipaddress import IPv6Network
35
36
37 @tag_fixme_vpp_workers
38 class TestDSlite(VppTestCase):
39     """ DS-Lite Test Cases """
40
41     @classmethod
42     def setUpClass(cls):
43         super(TestDSlite, cls).setUpClass()
44
45         try:
46             cls.nat_addr = '10.0.0.3'
47
48             cls.create_pg_interfaces(range(3))
49             cls.pg0.admin_up()
50             cls.pg0.config_ip4()
51             cls.pg0.resolve_arp()
52             cls.pg1.admin_up()
53             cls.pg1.config_ip6()
54             cls.pg1.generate_remote_hosts(2)
55             cls.pg1.configure_ipv6_neighbors()
56             cls.pg2.admin_up()
57             cls.pg2.config_ip4()
58             cls.pg2.resolve_arp()
59
60         except Exception:
61             super(TestDSlite, cls).tearDownClass()
62             raise
63
64     @classmethod
65     def tearDownClass(cls):
66         super(TestDSlite, cls).tearDownClass()
67
68     def verify_syslog_apmadd(self, data, isaddr, isport, xsaddr, xsport,
69                              sv6enc, proto):
70         message = data.decode('utf-8')
71         try:
72             message = SyslogMessage.parse(message)
73         except ParseError as e:
74             self.logger.error(e)
75         else:
76             self.assertEqual(message.severity, SyslogSeverity.info)
77             self.assertEqual(message.appname, 'NAT')
78             self.assertEqual(message.msgid, 'APMADD')
79             sd_params = message.sd.get('napmap')
80             self.assertTrue(sd_params is not None)
81             self.assertEqual(sd_params.get('IATYP'), 'IPv4')
82             self.assertEqual(sd_params.get('ISADDR'), isaddr)
83             self.assertEqual(sd_params.get('ISPORT'), "%d" % isport)
84             self.assertEqual(sd_params.get('XATYP'), 'IPv4')
85             self.assertEqual(sd_params.get('XSADDR'), xsaddr)
86             self.assertEqual(sd_params.get('XSPORT'), "%d" % xsport)
87             self.assertEqual(sd_params.get('PROTO'), "%d" % proto)
88             self.assertTrue(sd_params.get('SSUBIX') is not None)
89             self.assertEqual(sd_params.get('SV6ENC'), sv6enc)
90
91     def test_dslite(self):
92         """ Test DS-Lite """
93         nat_config = self.vapi.nat_show_config()
94         self.assertEqual(0, nat_config.dslite_ce)
95
96         self.vapi.dslite_add_del_pool_addr_range(start_addr=self.nat_addr,
97                                                  end_addr=self.nat_addr,
98                                                  is_add=1)
99         aftr_ip4 = '192.0.0.1'
100         aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
101         self.vapi.dslite_set_aftr_addr(ip4_addr=aftr_ip4, ip6_addr=aftr_ip6)
102         self.vapi.syslog_set_sender(self.pg2.local_ip4, self.pg2.remote_ip4)
103
104         # UDP
105         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
106              IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[0].ip6) /
107              IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
108              UDP(sport=20000, dport=10000))
109         self.pg1.add_stream(p)
110         self.pg_enable_capture(self.pg_interfaces)
111         self.pg_start()
112         capture = self.pg0.get_capture(1)
113         capture = capture[0]
114         self.assertFalse(capture.haslayer(IPv6))
115         self.assertEqual(capture[IP].src, self.nat_addr)
116         self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
117         self.assertNotEqual(capture[UDP].sport, 20000)
118         self.assertEqual(capture[UDP].dport, 10000)
119         self.assert_packet_checksums_valid(capture)
120         out_port = capture[UDP].sport
121         capture = self.pg2.get_capture(1)
122         self.verify_syslog_apmadd(capture[0][Raw].load, '192.168.1.1',
123                                   20000, self.nat_addr, out_port,
124                                   self.pg1.remote_hosts[0].ip6, IP_PROTOS.udp)
125
126         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
127              IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
128              UDP(sport=10000, dport=out_port))
129         self.pg0.add_stream(p)
130         self.pg_enable_capture(self.pg_interfaces)
131         self.pg_start()
132         capture = self.pg1.get_capture(1)
133         capture = capture[0]
134         self.assertEqual(capture[IPv6].src, aftr_ip6)
135         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
136         self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
137         self.assertEqual(capture[IP].dst, '192.168.1.1')
138         self.assertEqual(capture[UDP].sport, 10000)
139         self.assertEqual(capture[UDP].dport, 20000)
140         self.assert_packet_checksums_valid(capture)
141
142         # TCP
143         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
144              IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
145              IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
146              TCP(sport=20001, dport=10001))
147         self.pg1.add_stream(p)
148         self.pg_enable_capture(self.pg_interfaces)
149         self.pg_start()
150         capture = self.pg0.get_capture(1)
151         capture = capture[0]
152         self.assertFalse(capture.haslayer(IPv6))
153         self.assertEqual(capture[IP].src, self.nat_addr)
154         self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
155         self.assertNotEqual(capture[TCP].sport, 20001)
156         self.assertEqual(capture[TCP].dport, 10001)
157         self.assert_packet_checksums_valid(capture)
158         out_port = capture[TCP].sport
159
160         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
161              IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
162              TCP(sport=10001, dport=out_port))
163         self.pg0.add_stream(p)
164         self.pg_enable_capture(self.pg_interfaces)
165         self.pg_start()
166         capture = self.pg1.get_capture(1)
167         capture = capture[0]
168         self.assertEqual(capture[IPv6].src, aftr_ip6)
169         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
170         self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
171         self.assertEqual(capture[IP].dst, '192.168.1.1')
172         self.assertEqual(capture[TCP].sport, 10001)
173         self.assertEqual(capture[TCP].dport, 20001)
174         self.assert_packet_checksums_valid(capture)
175
176         # ICMP
177         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
178              IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
179              IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
180              ICMP(id=4000, type='echo-request'))
181         self.pg1.add_stream(p)
182         self.pg_enable_capture(self.pg_interfaces)
183         self.pg_start()
184         capture = self.pg0.get_capture(1)
185         capture = capture[0]
186         self.assertFalse(capture.haslayer(IPv6))
187         self.assertEqual(capture[IP].src, self.nat_addr)
188         self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
189         self.assertNotEqual(capture[ICMP].id, 4000)
190         self.assert_packet_checksums_valid(capture)
191         out_id = capture[ICMP].id
192
193         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
194              IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
195              ICMP(id=out_id, type='echo-reply'))
196         self.pg0.add_stream(p)
197         self.pg_enable_capture(self.pg_interfaces)
198         self.pg_start()
199         capture = self.pg1.get_capture(1)
200         capture = capture[0]
201         self.assertEqual(capture[IPv6].src, aftr_ip6)
202         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
203         self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
204         self.assertEqual(capture[IP].dst, '192.168.1.1')
205         self.assertEqual(capture[ICMP].id, 4000)
206         self.assert_packet_checksums_valid(capture)
207
208         # ping DS-Lite AFTR tunnel endpoint address
209         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
210              IPv6(src=self.pg1.remote_hosts[1].ip6, dst=aftr_ip6) /
211              ICMPv6EchoRequest())
212         self.pg1.add_stream(p)
213         self.pg_enable_capture(self.pg_interfaces)
214         self.pg_start()
215         capture = self.pg1.get_capture(1)
216         capture = capture[0]
217         self.assertEqual(capture[IPv6].src, aftr_ip6)
218         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
219         self.assertTrue(capture.haslayer(ICMPv6EchoReply))
220
221         b4s = self.statistics.get_counter('/dslite/total-b4s')
222         self.assertEqual(b4s[0][0], 2)
223         sessions = self.statistics.get_counter('/dslite/total-sessions')
224         self.assertEqual(sessions[0][0], 3)
225
226     def tearDown(self):
227         super(TestDSlite, self).tearDown()
228
229     def show_commands_at_teardown(self):
230         self.logger.info(self.vapi.cli("show dslite pool"))
231         self.logger.info(
232             self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
233         self.logger.info(self.vapi.cli("show dslite sessions"))
234
235
236 class TestDSliteCE(VppTestCase):
237     """ DS-Lite CE Test Cases """
238
239     @classmethod
240     def setUpConstants(cls):
241         super(TestDSliteCE, cls).setUpConstants()
242         cls.vpp_cmdline.extend(["dslite", "{", "ce", "}"])
243
244     @classmethod
245     def setUpClass(cls):
246         super(TestDSliteCE, cls).setUpClass()
247
248         try:
249             cls.create_pg_interfaces(range(2))
250             cls.pg0.admin_up()
251             cls.pg0.config_ip4()
252             cls.pg0.resolve_arp()
253             cls.pg1.admin_up()
254             cls.pg1.config_ip6()
255             cls.pg1.generate_remote_hosts(1)
256             cls.pg1.configure_ipv6_neighbors()
257
258         except Exception:
259             super(TestDSliteCE, cls).tearDownClass()
260             raise
261
262     @classmethod
263     def tearDownClass(cls):
264         super(TestDSliteCE, cls).tearDownClass()
265
266     def test_dslite_ce(self):
267         """ Test DS-Lite CE """
268
269         # TODO: add message to retrieve dslite config
270         # nat_config = self.vapi.nat_show_config()
271         # self.assertEqual(1, nat_config.dslite_ce)
272
273         b4_ip4 = '192.0.0.2'
274         b4_ip6 = '2001:db8:62aa::375e:f4c1:1'
275         self.vapi.dslite_set_b4_addr(ip4_addr=b4_ip4, ip6_addr=b4_ip6)
276
277         aftr_ip4 = '192.0.0.1'
278         aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
279         aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
280         self.vapi.dslite_set_aftr_addr(ip4_addr=aftr_ip4, ip6_addr=aftr_ip6)
281
282         r1 = VppIpRoute(self, aftr_ip6, 128,
283                         [VppRoutePath(self.pg1.remote_ip6,
284                                       self.pg1.sw_if_index)])
285         r1.add_vpp_config()
286
287         # UDP encapsulation
288         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
289              IP(dst=self.pg1.remote_ip4, src=self.pg0.remote_ip4) /
290              UDP(sport=10000, dport=20000))
291         self.pg0.add_stream(p)
292         self.pg_enable_capture(self.pg_interfaces)
293         self.pg_start()
294         capture = self.pg1.get_capture(1)
295         capture = capture[0]
296         self.assertEqual(capture[IPv6].src, b4_ip6)
297         self.assertEqual(capture[IPv6].dst, aftr_ip6)
298         self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
299         self.assertEqual(capture[IP].dst, self.pg1.remote_ip4)
300         self.assertEqual(capture[UDP].sport, 10000)
301         self.assertEqual(capture[UDP].dport, 20000)
302         self.assert_packet_checksums_valid(capture)
303
304         # UDP decapsulation
305         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
306              IPv6(dst=b4_ip6, src=aftr_ip6) /
307              IP(dst=self.pg0.remote_ip4, src=self.pg1.remote_ip4) /
308              UDP(sport=20000, dport=10000))
309         self.pg1.add_stream(p)
310         self.pg_enable_capture(self.pg_interfaces)
311         self.pg_start()
312         capture = self.pg0.get_capture(1)
313         capture = capture[0]
314         self.assertFalse(capture.haslayer(IPv6))
315         self.assertEqual(capture[IP].src, self.pg1.remote_ip4)
316         self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
317         self.assertEqual(capture[UDP].sport, 20000)
318         self.assertEqual(capture[UDP].dport, 10000)
319         self.assert_packet_checksums_valid(capture)
320
321         # ping DS-Lite B4 tunnel endpoint address
322         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
323              IPv6(src=self.pg1.remote_hosts[0].ip6, dst=b4_ip6) /
324              ICMPv6EchoRequest())
325         self.pg1.add_stream(p)
326         self.pg_enable_capture(self.pg_interfaces)
327         self.pg_start()
328         capture = self.pg1.get_capture(1)
329         capture = capture[0]
330         self.assertEqual(capture[IPv6].src, b4_ip6)
331         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
332         self.assertTrue(capture.haslayer(ICMPv6EchoReply))
333
334     def tearDown(self):
335         super(TestDSliteCE, self).tearDown()
336
337     def show_commands_at_teardown(self):
338         self.logger.info(
339             self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
340         self.logger.info(
341             self.vapi.cli("show dslite b4-tunnel-endpoint-address"))