8 from framework import tag_fixme_vpp_workers
9 from framework import VppTestCase, VppTestRunner
12 from scapy.layers.inet import IP, TCP, UDP, ICMP
13 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
14 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply, \
15 ICMPv6ND_NS, ICMPv6ND_NA, ICMPv6NDOptDstLLAddr, fragment6
16 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
17 from scapy.layers.l2 import Ether, ARP, GRE
18 from scapy.data import IP_PROTOS
19 from scapy.packet import bind_layers, Raw
21 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
22 from time import sleep
23 from util import ip4_range
24 from vpp_papi import mac_pton
25 from syslog_rfc5424_parser import SyslogMessage, ParseError
26 from syslog_rfc5424_parser.constants import SyslogFacility, SyslogSeverity
27 from io import BytesIO
28 from vpp_papi import VppEnum
29 from vpp_ip_route import VppIpRoute, VppRoutePath, FibPathType
30 from vpp_neighbor import VppNeighbor
31 from scapy.all import bind_layers, Packet, ByteEnumField, ShortField, \
32 IPField, IntField, LongField, XByteField, FlagsField, FieldLenField, \
34 from ipaddress import IPv6Network
37 @tag_fixme_vpp_workers
38 class TestDSlite(VppTestCase):
39 """ DS-Lite Test Cases """
43 super(TestDSlite, cls).setUpClass()
46 cls.nat_addr = '10.0.0.3'
48 cls.create_pg_interfaces(range(3))
54 cls.pg1.generate_remote_hosts(2)
55 cls.pg1.configure_ipv6_neighbors()
61 super(TestDSlite, cls).tearDownClass()
65 def tearDownClass(cls):
66 super(TestDSlite, cls).tearDownClass()
68 def verify_syslog_apmadd(self, data, isaddr, isport, xsaddr, xsport,
70 message = data.decode('utf-8')
72 message = SyslogMessage.parse(message)
73 except ParseError as e:
76 self.assertEqual(message.severity, SyslogSeverity.info)
77 self.assertEqual(message.appname, 'NAT')
78 self.assertEqual(message.msgid, 'APMADD')
79 sd_params = message.sd.get('napmap')
80 self.assertTrue(sd_params is not None)
81 self.assertEqual(sd_params.get('IATYP'), 'IPv4')
82 self.assertEqual(sd_params.get('ISADDR'), isaddr)
83 self.assertEqual(sd_params.get('ISPORT'), "%d" % isport)
84 self.assertEqual(sd_params.get('XATYP'), 'IPv4')
85 self.assertEqual(sd_params.get('XSADDR'), xsaddr)
86 self.assertEqual(sd_params.get('XSPORT'), "%d" % xsport)
87 self.assertEqual(sd_params.get('PROTO'), "%d" % proto)
88 self.assertTrue(sd_params.get('SSUBIX') is not None)
89 self.assertEqual(sd_params.get('SV6ENC'), sv6enc)
91 def test_dslite(self):
93 self.vapi.dslite_add_del_pool_addr_range(start_addr=self.nat_addr,
94 end_addr=self.nat_addr,
96 aftr_ip4 = '192.0.0.1'
97 aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
98 self.vapi.dslite_set_aftr_addr(ip4_addr=aftr_ip4, ip6_addr=aftr_ip6)
99 self.vapi.syslog_set_sender(self.pg2.local_ip4, self.pg2.remote_ip4)
102 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
103 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[0].ip6) /
104 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
105 UDP(sport=20000, dport=10000))
106 self.pg1.add_stream(p)
107 self.pg_enable_capture(self.pg_interfaces)
109 capture = self.pg0.get_capture(1)
111 self.assertFalse(capture.haslayer(IPv6))
112 self.assertEqual(capture[IP].src, self.nat_addr)
113 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
114 self.assertNotEqual(capture[UDP].sport, 20000)
115 self.assertEqual(capture[UDP].dport, 10000)
116 self.assert_packet_checksums_valid(capture)
117 out_port = capture[UDP].sport
118 capture = self.pg2.get_capture(1)
119 self.verify_syslog_apmadd(capture[0][Raw].load, '192.168.1.1',
120 20000, self.nat_addr, out_port,
121 self.pg1.remote_hosts[0].ip6, IP_PROTOS.udp)
123 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
124 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
125 UDP(sport=10000, dport=out_port))
126 self.pg0.add_stream(p)
127 self.pg_enable_capture(self.pg_interfaces)
129 capture = self.pg1.get_capture(1)
131 self.assertEqual(capture[IPv6].src, aftr_ip6)
132 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
133 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
134 self.assertEqual(capture[IP].dst, '192.168.1.1')
135 self.assertEqual(capture[UDP].sport, 10000)
136 self.assertEqual(capture[UDP].dport, 20000)
137 self.assert_packet_checksums_valid(capture)
140 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
141 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
142 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
143 TCP(sport=20001, dport=10001))
144 self.pg1.add_stream(p)
145 self.pg_enable_capture(self.pg_interfaces)
147 capture = self.pg0.get_capture(1)
149 self.assertFalse(capture.haslayer(IPv6))
150 self.assertEqual(capture[IP].src, self.nat_addr)
151 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
152 self.assertNotEqual(capture[TCP].sport, 20001)
153 self.assertEqual(capture[TCP].dport, 10001)
154 self.assert_packet_checksums_valid(capture)
155 out_port = capture[TCP].sport
157 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
158 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
159 TCP(sport=10001, dport=out_port))
160 self.pg0.add_stream(p)
161 self.pg_enable_capture(self.pg_interfaces)
163 capture = self.pg1.get_capture(1)
165 self.assertEqual(capture[IPv6].src, aftr_ip6)
166 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
167 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
168 self.assertEqual(capture[IP].dst, '192.168.1.1')
169 self.assertEqual(capture[TCP].sport, 10001)
170 self.assertEqual(capture[TCP].dport, 20001)
171 self.assert_packet_checksums_valid(capture)
174 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
175 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
176 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
177 ICMP(id=4000, type='echo-request'))
178 self.pg1.add_stream(p)
179 self.pg_enable_capture(self.pg_interfaces)
181 capture = self.pg0.get_capture(1)
183 self.assertFalse(capture.haslayer(IPv6))
184 self.assertEqual(capture[IP].src, self.nat_addr)
185 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
186 self.assertNotEqual(capture[ICMP].id, 4000)
187 self.assert_packet_checksums_valid(capture)
188 out_id = capture[ICMP].id
190 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
191 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
192 ICMP(id=out_id, type='echo-reply'))
193 self.pg0.add_stream(p)
194 self.pg_enable_capture(self.pg_interfaces)
196 capture = self.pg1.get_capture(1)
198 self.assertEqual(capture[IPv6].src, aftr_ip6)
199 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
200 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
201 self.assertEqual(capture[IP].dst, '192.168.1.1')
202 self.assertEqual(capture[ICMP].id, 4000)
203 self.assert_packet_checksums_valid(capture)
205 # ping DS-Lite AFTR tunnel endpoint address
206 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
207 IPv6(src=self.pg1.remote_hosts[1].ip6, dst=aftr_ip6) /
209 self.pg1.add_stream(p)
210 self.pg_enable_capture(self.pg_interfaces)
212 capture = self.pg1.get_capture(1)
214 self.assertEqual(capture[IPv6].src, aftr_ip6)
215 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
216 self.assertTrue(capture.haslayer(ICMPv6EchoReply))
218 b4s = self.statistics.get_counter('/dslite/total-b4s')
219 self.assertEqual(b4s[0][0], 2)
220 sessions = self.statistics.get_counter('/dslite/total-sessions')
221 self.assertEqual(sessions[0][0], 3)
224 super(TestDSlite, self).tearDown()
226 def show_commands_at_teardown(self):
227 self.logger.info(self.vapi.cli("show dslite pool"))
229 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
230 self.logger.info(self.vapi.cli("show dslite sessions"))
233 class TestDSliteCE(VppTestCase):
234 """ DS-Lite CE Test Cases """
237 def setUpConstants(cls):
238 super(TestDSliteCE, cls).setUpConstants()
239 cls.vpp_cmdline.extend(["dslite", "{", "ce", "}"])
243 super(TestDSliteCE, cls).setUpClass()
246 cls.create_pg_interfaces(range(2))
249 cls.pg0.resolve_arp()
252 cls.pg1.generate_remote_hosts(1)
253 cls.pg1.configure_ipv6_neighbors()
256 super(TestDSliteCE, cls).tearDownClass()
260 def tearDownClass(cls):
261 super(TestDSliteCE, cls).tearDownClass()
263 def test_dslite_ce(self):
264 """ Test DS-Lite CE """
266 b4_ip6 = '2001:db8:62aa::375e:f4c1:1'
267 self.vapi.dslite_set_b4_addr(ip4_addr=b4_ip4, ip6_addr=b4_ip6)
269 aftr_ip4 = '192.0.0.1'
270 aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
271 aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
272 self.vapi.dslite_set_aftr_addr(ip4_addr=aftr_ip4, ip6_addr=aftr_ip6)
274 r1 = VppIpRoute(self, aftr_ip6, 128,
275 [VppRoutePath(self.pg1.remote_ip6,
276 self.pg1.sw_if_index)])
280 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
281 IP(dst=self.pg1.remote_ip4, src=self.pg0.remote_ip4) /
282 UDP(sport=10000, dport=20000))
283 self.pg0.add_stream(p)
284 self.pg_enable_capture(self.pg_interfaces)
286 capture = self.pg1.get_capture(1)
288 self.assertEqual(capture[IPv6].src, b4_ip6)
289 self.assertEqual(capture[IPv6].dst, aftr_ip6)
290 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
291 self.assertEqual(capture[IP].dst, self.pg1.remote_ip4)
292 self.assertEqual(capture[UDP].sport, 10000)
293 self.assertEqual(capture[UDP].dport, 20000)
294 self.assert_packet_checksums_valid(capture)
297 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
298 IPv6(dst=b4_ip6, src=aftr_ip6) /
299 IP(dst=self.pg0.remote_ip4, src=self.pg1.remote_ip4) /
300 UDP(sport=20000, dport=10000))
301 self.pg1.add_stream(p)
302 self.pg_enable_capture(self.pg_interfaces)
304 capture = self.pg0.get_capture(1)
306 self.assertFalse(capture.haslayer(IPv6))
307 self.assertEqual(capture[IP].src, self.pg1.remote_ip4)
308 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
309 self.assertEqual(capture[UDP].sport, 20000)
310 self.assertEqual(capture[UDP].dport, 10000)
311 self.assert_packet_checksums_valid(capture)
313 # ping DS-Lite B4 tunnel endpoint address
314 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
315 IPv6(src=self.pg1.remote_hosts[0].ip6, dst=b4_ip6) /
317 self.pg1.add_stream(p)
318 self.pg_enable_capture(self.pg_interfaces)
320 capture = self.pg1.get_capture(1)
322 self.assertEqual(capture[IPv6].src, b4_ip6)
323 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
324 self.assertTrue(capture.haslayer(ICMPv6EchoReply))
327 super(TestDSliteCE, self).tearDown()
329 def show_commands_at_teardown(self):
331 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
333 self.vapi.cli("show dslite b4-tunnel-endpoint-address"))