11 from framework import tag_fixme_vpp_workers
12 from framework import VppTestCase, VppTestRunner, running_extended_tests
13 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
14 from scapy.data import IP_PROTOS
15 from scapy.layers.inet import IP, TCP, UDP, ICMP
16 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
17 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
18 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply, \
19 ICMPv6ND_NS, ICMPv6ND_NA, ICMPv6NDOptDstLLAddr, fragment6
20 from scapy.layers.l2 import Ether, GRE
21 from scapy.packet import Raw
22 from syslog_rfc5424_parser import SyslogMessage, ParseError
23 from syslog_rfc5424_parser.constants import SyslogSeverity
24 from util import ppc, ppp
25 from vpp_papi import VppEnum
28 @tag_fixme_vpp_workers
29 class TestNAT64(VppTestCase):
30 """ NAT64 Test Cases """
33 def SYSLOG_SEVERITY(self):
34 return VppEnum.vl_api_syslog_severity_t
37 def config_flags(self):
38 return VppEnum.vl_api_nat_config_flags_t
42 super(TestNAT64, cls).setUpClass()
44 cls.tcp_port_in = 6303
45 cls.tcp_port_out = 6303
46 cls.udp_port_in = 6304
47 cls.udp_port_out = 6304
49 cls.icmp_id_out = 6305
50 cls.tcp_external_port = 80
51 cls.nat_addr = '10.0.0.3'
52 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
54 cls.vrf1_nat_addr = '10.0.10.3'
55 cls.ipfix_src_port = 4739
56 cls.ipfix_domain_id = 1
58 cls.create_pg_interfaces(range(6))
59 cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
60 cls.ip6_interfaces.append(cls.pg_interfaces[2])
61 cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
63 cls.vapi.ip_table_add_del(is_add=1,
64 table={'table_id': cls.vrf1_id,
67 cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
69 cls.pg0.generate_remote_hosts(2)
71 for i in cls.ip6_interfaces:
74 i.configure_ipv6_neighbors()
76 for i in cls.ip4_interfaces:
85 cls.pg3.configure_ipv6_neighbors()
91 def tearDownClass(cls):
92 super(TestNAT64, cls).tearDownClass()
95 super(TestNAT64, self).setUp()
96 self.vapi.nat64_plugin_enable_disable(enable=1,
97 bib_buckets=128, st_buckets=256)
100 super(TestNAT64, self).tearDown()
101 if not self.vpp_dead:
102 self.vapi.nat64_plugin_enable_disable(enable=0)
104 def show_commands_at_teardown(self):
105 self.logger.info(self.vapi.cli("show nat64 pool"))
106 self.logger.info(self.vapi.cli("show nat64 interfaces"))
107 self.logger.info(self.vapi.cli("show nat64 prefix"))
108 self.logger.info(self.vapi.cli("show nat64 bib all"))
109 self.logger.info(self.vapi.cli("show nat64 session table all"))
111 def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
113 Create IPv6 packet stream for inside network
115 :param in_if: Inside interface
116 :param out_if: Outside interface
117 :param ttl: Hop Limit of generated packets
118 :param pref: NAT64 prefix
119 :param plen: NAT64 prefix length
123 dst = ''.join(['64:ff9b::', out_if.remote_ip4])
125 dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
128 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
129 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
130 TCP(sport=self.tcp_port_in, dport=20))
134 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
135 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
136 UDP(sport=self.udp_port_in, dport=20))
140 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
141 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
142 ICMPv6EchoRequest(id=self.icmp_id_in))
147 def create_stream_out(self, out_if, dst_ip=None, ttl=64,
148 use_inside_ports=False):
150 Create packet stream for outside network
152 :param out_if: Outside interface
153 :param dst_ip: Destination IP address (Default use global NAT address)
154 :param ttl: TTL of generated packets
155 :param use_inside_ports: Use inside NAT ports as destination ports
156 instead of outside ports
159 dst_ip = self.nat_addr
160 if not use_inside_ports:
161 tcp_port = self.tcp_port_out
162 udp_port = self.udp_port_out
163 icmp_id = self.icmp_id_out
165 tcp_port = self.tcp_port_in
166 udp_port = self.udp_port_in
167 icmp_id = self.icmp_id_in
170 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
171 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
172 TCP(dport=tcp_port, sport=20))
176 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
177 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
178 UDP(dport=udp_port, sport=20))
182 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
183 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
184 ICMP(id=icmp_id, type='echo-reply'))
189 def verify_capture_out(self, capture, nat_ip=None, same_port=False,
190 dst_ip=None, is_ip6=False, ignore_port=False):
192 Verify captured packets on outside network
194 :param capture: Captured packets
195 :param nat_ip: Translated IP address (Default use global NAT address)
196 :param same_port: Source port number is not translated (Default False)
197 :param dst_ip: Destination IP address (Default do not verify)
198 :param is_ip6: If L3 protocol is IPv6 (Default False)
202 ICMP46 = ICMPv6EchoRequest
207 nat_ip = self.nat_addr
208 for packet in capture:
211 self.assert_packet_checksums_valid(packet)
212 self.assertEqual(packet[IP46].src, nat_ip)
213 if dst_ip is not None:
214 self.assertEqual(packet[IP46].dst, dst_ip)
215 if packet.haslayer(TCP):
219 packet[TCP].sport, self.tcp_port_in)
222 packet[TCP].sport, self.tcp_port_in)
223 self.tcp_port_out = packet[TCP].sport
224 self.assert_packet_checksums_valid(packet)
225 elif packet.haslayer(UDP):
229 packet[UDP].sport, self.udp_port_in)
232 packet[UDP].sport, self.udp_port_in)
233 self.udp_port_out = packet[UDP].sport
238 packet[ICMP46].id, self.icmp_id_in)
241 packet[ICMP46].id, self.icmp_id_in)
242 self.icmp_id_out = packet[ICMP46].id
243 self.assert_packet_checksums_valid(packet)
245 self.logger.error(ppp("Unexpected or invalid packet "
246 "(outside network):", packet))
249 def verify_capture_in_ip6(self, capture, src_ip, dst_ip):
251 Verify captured IPv6 packets on inside network
253 :param capture: Captured packets
254 :param src_ip: Source IP
255 :param dst_ip: Destination IP address
257 for packet in capture:
259 self.assertEqual(packet[IPv6].src, src_ip)
260 self.assertEqual(packet[IPv6].dst, dst_ip)
261 self.assert_packet_checksums_valid(packet)
262 if packet.haslayer(TCP):
263 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
264 elif packet.haslayer(UDP):
265 self.assertEqual(packet[UDP].dport, self.udp_port_in)
267 self.assertEqual(packet[ICMPv6EchoReply].id,
270 self.logger.error(ppp("Unexpected or invalid packet "
271 "(inside network):", packet))
274 def create_stream_frag(self, src_if, dst, sport, dport, data,
275 proto=IP_PROTOS.tcp, echo_reply=False):
277 Create fragmented packet stream
279 :param src_if: Source interface
280 :param dst: Destination IPv4 address
281 :param sport: Source port
282 :param dport: Destination port
283 :param data: Payload data
284 :param proto: protocol (TCP, UDP, ICMP)
285 :param echo_reply: use echo_reply if protocol is ICMP
288 if proto == IP_PROTOS.tcp:
289 p = (IP(src=src_if.remote_ip4, dst=dst) /
290 TCP(sport=sport, dport=dport) /
292 p = p.__class__(scapy.compat.raw(p))
293 chksum = p[TCP].chksum
294 proto_header = TCP(sport=sport, dport=dport, chksum=chksum)
295 elif proto == IP_PROTOS.udp:
296 proto_header = UDP(sport=sport, dport=dport)
297 elif proto == IP_PROTOS.icmp:
299 proto_header = ICMP(id=sport, type='echo-request')
301 proto_header = ICMP(id=sport, type='echo-reply')
303 raise Exception("Unsupported protocol")
304 id = random.randint(0, 65535)
306 if proto == IP_PROTOS.tcp:
309 raw = Raw(data[0:16])
310 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
311 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id) /
315 if proto == IP_PROTOS.tcp:
316 raw = Raw(data[4:20])
318 raw = Raw(data[16:32])
319 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
320 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id,
324 if proto == IP_PROTOS.tcp:
328 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
329 IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=proto,
335 def create_stream_frag_ip6(self, src_if, dst, sport, dport, data,
336 pref=None, plen=0, frag_size=128):
338 Create fragmented packet stream
340 :param src_if: Source interface
341 :param dst: Destination IPv4 address
342 :param sport: Source TCP port
343 :param dport: Destination TCP port
344 :param data: Payload data
345 :param pref: NAT64 prefix
346 :param plen: NAT64 prefix length
347 :param fragsize: size of fragments
351 dst_ip6 = ''.join(['64:ff9b::', dst])
353 dst_ip6 = self.compose_ip6(dst, pref, plen)
355 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
356 IPv6(src=src_if.remote_ip6, dst=dst_ip6) /
357 IPv6ExtHdrFragment(id=random.randint(0, 65535)) /
358 TCP(sport=sport, dport=dport) /
361 return fragment6(p, frag_size)
363 def reass_frags_and_verify(self, frags, src, dst):
365 Reassemble and verify fragmented packet
367 :param frags: Captured fragments
368 :param src: Source IPv4 address to verify
369 :param dst: Destination IPv4 address to verify
371 :returns: Reassembled IPv4 packet
375 self.assertEqual(p[IP].src, src)
376 self.assertEqual(p[IP].dst, dst)
377 self.assert_ip_checksum_valid(p)
378 buffer.seek(p[IP].frag * 8)
379 buffer.write(bytes(p[IP].payload))
380 ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst,
381 proto=frags[0][IP].proto)
382 if ip.proto == IP_PROTOS.tcp:
383 p = (ip / TCP(buffer.getvalue()))
384 self.logger.debug(ppp("Reassembled:", p))
385 self.assert_tcp_checksum_valid(p)
386 elif ip.proto == IP_PROTOS.udp:
387 p = (ip / UDP(buffer.getvalue()[:8]) /
388 Raw(buffer.getvalue()[8:]))
389 elif ip.proto == IP_PROTOS.icmp:
390 p = (ip / ICMP(buffer.getvalue()))
393 def reass_frags_and_verify_ip6(self, frags, src, dst):
395 Reassemble and verify fragmented packet
397 :param frags: Captured fragments
398 :param src: Source IPv6 address to verify
399 :param dst: Destination IPv6 address to verify
401 :returns: Reassembled IPv6 packet
405 self.assertEqual(p[IPv6].src, src)
406 self.assertEqual(p[IPv6].dst, dst)
407 buffer.seek(p[IPv6ExtHdrFragment].offset * 8)
408 buffer.write(bytes(p[IPv6ExtHdrFragment].payload))
409 ip = IPv6(src=frags[0][IPv6].src, dst=frags[0][IPv6].dst,
410 nh=frags[0][IPv6ExtHdrFragment].nh)
411 if ip.nh == IP_PROTOS.tcp:
412 p = (ip / TCP(buffer.getvalue()))
413 elif ip.nh == IP_PROTOS.udp:
414 p = (ip / UDP(buffer.getvalue()))
415 self.logger.debug(ppp("Reassembled:", p))
416 self.assert_packet_checksums_valid(p)
419 def verify_ipfix_max_bibs(self, data, limit):
421 Verify IPFIX maximum BIB entries exceeded event
423 :param data: Decoded IPFIX data records
424 :param limit: Number of maximum BIB entries that can be created.
426 self.assertEqual(1, len(data))
429 self.assertEqual(scapy.compat.orb(record[230]), 13)
430 # natQuotaExceededEvent
431 self.assertEqual(struct.pack("I", 2), record[466])
433 self.assertEqual(struct.pack("I", limit), record[472])
435 def verify_ipfix_bib(self, data, is_create, src_addr):
437 Verify IPFIX NAT64 BIB create and delete events
439 :param data: Decoded IPFIX data records
440 :param is_create: Create event if nonzero value otherwise delete event
441 :param src_addr: IPv6 source address
443 self.assertEqual(1, len(data))
447 self.assertEqual(scapy.compat.orb(record[230]), 10)
449 self.assertEqual(scapy.compat.orb(record[230]), 11)
451 self.assertEqual(src_addr, str(ipaddress.IPv6Address(record[27])))
452 # postNATSourceIPv4Address
453 self.assertEqual(self.nat_addr_n, record[225])
455 self.assertEqual(IP_PROTOS.tcp, scapy.compat.orb(record[4]))
457 self.assertEqual(struct.pack("!I", 0), record[234])
458 # sourceTransportPort
459 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
460 # postNAPTSourceTransportPort
461 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
463 def verify_ipfix_nat64_ses(self, data, is_create, src_addr, dst_addr,
466 Verify IPFIX NAT64 session create and delete events
468 :param data: Decoded IPFIX data records
469 :param is_create: Create event if nonzero value otherwise delete event
470 :param src_addr: IPv6 source address
471 :param dst_addr: IPv4 destination address
472 :param dst_port: destination TCP port
474 self.assertEqual(1, len(data))
478 self.assertEqual(scapy.compat.orb(record[230]), 6)
480 self.assertEqual(scapy.compat.orb(record[230]), 7)
482 self.assertEqual(src_addr, str(ipaddress.IPv6Address(record[27])))
483 # destinationIPv6Address
484 self.assertEqual(socket.inet_pton(socket.AF_INET6,
485 self.compose_ip6(dst_addr,
489 # postNATSourceIPv4Address
490 self.assertEqual(self.nat_addr_n, record[225])
491 # postNATDestinationIPv4Address
492 self.assertEqual(socket.inet_pton(socket.AF_INET, dst_addr),
495 self.assertEqual(IP_PROTOS.tcp, scapy.compat.orb(record[4]))
497 self.assertEqual(struct.pack("!I", 0), record[234])
498 # sourceTransportPort
499 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
500 # postNAPTSourceTransportPort
501 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
502 # destinationTransportPort
503 self.assertEqual(struct.pack("!H", dst_port), record[11])
504 # postNAPTDestinationTransportPort
505 self.assertEqual(struct.pack("!H", dst_port), record[228])
507 def verify_syslog_sess(self, data, is_add=True, is_ip6=False):
508 message = data.decode('utf-8')
510 message = SyslogMessage.parse(message)
511 except ParseError as e:
515 self.assertEqual(message.severity, SyslogSeverity.info)
516 self.assertEqual(message.appname, 'NAT')
517 self.assertEqual(message.msgid, 'SADD' if is_add else 'SDEL')
518 sd_params = message.sd.get('nsess')
519 self.assertTrue(sd_params is not None)
521 self.assertEqual(sd_params.get('IATYP'), 'IPv6')
522 self.assertEqual(sd_params.get('ISADDR'), self.pg0.remote_ip6)
524 self.assertEqual(sd_params.get('IATYP'), 'IPv4')
525 self.assertEqual(sd_params.get('ISADDR'), self.pg0.remote_ip4)
526 self.assertTrue(sd_params.get('SSUBIX') is not None)
527 self.assertEqual(sd_params.get('ISPORT'), "%d" % self.tcp_port_in)
528 self.assertEqual(sd_params.get('XATYP'), 'IPv4')
529 self.assertEqual(sd_params.get('XSADDR'), self.nat_addr)
530 self.assertEqual(sd_params.get('XSPORT'), "%d" % self.tcp_port_out)
531 self.assertEqual(sd_params.get('PROTO'), "%d" % IP_PROTOS.tcp)
532 self.assertEqual(sd_params.get('SVLAN'), '0')
533 self.assertEqual(sd_params.get('XDADDR'), self.pg1.remote_ip4)
534 self.assertEqual(sd_params.get('XDPORT'),
535 "%d" % self.tcp_external_port)
537 def compose_ip6(self, ip4, pref, plen):
539 Compose IPv4-embedded IPv6 addresses
541 :param ip4: IPv4 address
542 :param pref: IPv6 prefix
543 :param plen: IPv6 prefix length
544 :returns: IPv4-embedded IPv6 addresses
546 pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
547 ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
562 pref_n[10] = ip4_n[3]
566 pref_n[10] = ip4_n[2]
567 pref_n[11] = ip4_n[3]
570 pref_n[10] = ip4_n[1]
571 pref_n[11] = ip4_n[2]
572 pref_n[12] = ip4_n[3]
574 pref_n[12] = ip4_n[0]
575 pref_n[13] = ip4_n[1]
576 pref_n[14] = ip4_n[2]
577 pref_n[15] = ip4_n[3]
578 packed_pref_n = b''.join([scapy.compat.chb(x) for x in pref_n])
579 return socket.inet_ntop(socket.AF_INET6, packed_pref_n)
581 def verify_ipfix_max_sessions(self, data, limit):
583 Verify IPFIX maximum session entries exceeded event
585 :param data: Decoded IPFIX data records
586 :param limit: Number of maximum session entries that can be created.
588 self.assertEqual(1, len(data))
591 self.assertEqual(scapy.compat.orb(record[230]), 13)
592 # natQuotaExceededEvent
593 self.assertEqual(struct.pack("I", 1), record[466])
595 self.assertEqual(struct.pack("I", limit), record[471])
597 def test_nat64_inside_interface_handles_neighbor_advertisement(self):
598 """ NAT64 inside interface handles Neighbor Advertisement """
600 flags = self.config_flags.NAT_IS_INSIDE
601 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
602 sw_if_index=self.pg5.sw_if_index)
605 ping = (Ether(dst=self.pg5.local_mac, src=self.pg5.remote_mac) /
606 IPv6(src=self.pg5.remote_ip6, dst=self.pg5.local_ip6) /
609 self.pg5.add_stream(pkts)
610 self.pg_enable_capture(self.pg_interfaces)
613 # Wait for Neighbor Solicitation
614 capture = self.pg5.get_capture(len(pkts))
617 self.assertEqual(packet[IPv6].src, self.pg5.local_ip6)
618 self.assertEqual(packet.haslayer(ICMPv6ND_NS), 1)
619 tgt = packet[ICMPv6ND_NS].tgt
621 self.logger.error(ppp("Unexpected or invalid packet:", packet))
624 # Send Neighbor Advertisement
625 p = (Ether(dst=self.pg5.local_mac, src=self.pg5.remote_mac) /
626 IPv6(src=self.pg5.remote_ip6, dst=self.pg5.local_ip6) /
627 ICMPv6ND_NA(tgt=tgt) /
628 ICMPv6NDOptDstLLAddr(lladdr=self.pg5.remote_mac))
630 self.pg5.add_stream(pkts)
631 self.pg_enable_capture(self.pg_interfaces)
634 # Try to send ping again
636 self.pg5.add_stream(pkts)
637 self.pg_enable_capture(self.pg_interfaces)
640 # Wait for ping reply
641 capture = self.pg5.get_capture(len(pkts))
644 self.assertEqual(packet[IPv6].src, self.pg5.local_ip6)
645 self.assertEqual(packet[IPv6].dst, self.pg5.remote_ip6)
646 self.assertEqual(packet.haslayer(ICMPv6EchoReply), 1)
648 self.logger.error(ppp("Unexpected or invalid packet:", packet))
652 """ Add/delete address to NAT64 pool """
655 self.vapi.nat64_add_del_pool_addr_range(start_addr=nat_addr,
657 vrf_id=0xFFFFFFFF, is_add=1)
659 addresses = self.vapi.nat64_pool_addr_dump()
660 self.assertEqual(len(addresses), 1)
661 self.assertEqual(str(addresses[0].address), nat_addr)
663 self.vapi.nat64_add_del_pool_addr_range(start_addr=nat_addr,
665 vrf_id=0xFFFFFFFF, is_add=0)
667 addresses = self.vapi.nat64_pool_addr_dump()
668 self.assertEqual(len(addresses), 0)
670 def test_interface(self):
671 """ Enable/disable NAT64 feature on the interface """
672 flags = self.config_flags.NAT_IS_INSIDE
673 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
674 sw_if_index=self.pg0.sw_if_index)
675 self.vapi.nat64_add_del_interface(is_add=1, flags=0,
676 sw_if_index=self.pg1.sw_if_index)
678 interfaces = self.vapi.nat64_interface_dump()
679 self.assertEqual(len(interfaces), 2)
682 for intf in interfaces:
683 if intf.sw_if_index == self.pg0.sw_if_index:
684 self.assertEqual(intf.flags, self.config_flags.NAT_IS_INSIDE)
686 elif intf.sw_if_index == self.pg1.sw_if_index:
687 self.assertEqual(intf.flags, self.config_flags.NAT_IS_OUTSIDE)
689 self.assertTrue(pg0_found)
690 self.assertTrue(pg1_found)
692 features = self.vapi.cli("show interface features pg0")
693 self.assertIn('nat64-in2out', features)
694 features = self.vapi.cli("show interface features pg1")
695 self.assertIn('nat64-out2in', features)
697 self.vapi.nat64_add_del_interface(is_add=0, flags=flags,
698 sw_if_index=self.pg0.sw_if_index)
699 self.vapi.nat64_add_del_interface(is_add=0, flags=flags,
700 sw_if_index=self.pg1.sw_if_index)
702 interfaces = self.vapi.nat64_interface_dump()
703 self.assertEqual(len(interfaces), 0)
705 def test_static_bib(self):
706 """ Add/delete static BIB entry """
707 in_addr = '2001:db8:85a3::8a2e:370:7334'
708 out_addr = '10.1.1.3'
711 proto = IP_PROTOS.tcp
713 self.vapi.nat64_add_del_static_bib(i_addr=in_addr, o_addr=out_addr,
714 i_port=in_port, o_port=out_port,
715 proto=proto, vrf_id=0, is_add=1)
716 bib = self.vapi.nat64_bib_dump(proto=IP_PROTOS.tcp)
719 if bibe.flags & self.config_flags.NAT_IS_STATIC:
721 self.assertEqual(str(bibe.i_addr), in_addr)
722 self.assertEqual(str(bibe.o_addr), out_addr)
723 self.assertEqual(bibe.i_port, in_port)
724 self.assertEqual(bibe.o_port, out_port)
725 self.assertEqual(static_bib_num, 1)
726 bibs = self.statistics.get_counter('/nat64/total-bibs')
727 self.assertEqual(bibs[0][0], 1)
729 self.vapi.nat64_add_del_static_bib(i_addr=in_addr, o_addr=out_addr,
730 i_port=in_port, o_port=out_port,
731 proto=proto, vrf_id=0, is_add=0)
732 bib = self.vapi.nat64_bib_dump(proto=IP_PROTOS.tcp)
735 if bibe.flags & self.config_flags.NAT_IS_STATIC:
737 self.assertEqual(static_bib_num, 0)
738 bibs = self.statistics.get_counter('/nat64/total-bibs')
739 self.assertEqual(bibs[0][0], 0)
741 def test_set_timeouts(self):
742 """ Set NAT64 timeouts """
743 # verify default values
744 timeouts = self.vapi.nat64_get_timeouts()
745 self.assertEqual(timeouts.udp, 300)
746 self.assertEqual(timeouts.icmp, 60)
747 self.assertEqual(timeouts.tcp_transitory, 240)
748 self.assertEqual(timeouts.tcp_established, 7440)
750 # set and verify custom values
751 self.vapi.nat64_set_timeouts(udp=200, tcp_established=7450,
752 tcp_transitory=250, icmp=30)
753 timeouts = self.vapi.nat64_get_timeouts()
754 self.assertEqual(timeouts.udp, 200)
755 self.assertEqual(timeouts.icmp, 30)
756 self.assertEqual(timeouts.tcp_transitory, 250)
757 self.assertEqual(timeouts.tcp_established, 7450)
759 def test_dynamic(self):
760 """ NAT64 dynamic translation test """
761 self.tcp_port_in = 6303
762 self.udp_port_in = 6304
763 self.icmp_id_in = 6305
765 ses_num_start = self.nat64_get_ses_num()
767 self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
768 end_addr=self.nat_addr,
771 flags = self.config_flags.NAT_IS_INSIDE
772 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
773 sw_if_index=self.pg0.sw_if_index)
774 self.vapi.nat64_add_del_interface(is_add=1, flags=0,
775 sw_if_index=self.pg1.sw_if_index)
778 tcpn = self.statistics.get_counter('/nat64/in2out/tcp')[0]
779 udpn = self.statistics.get_counter('/nat64/in2out/udp')[0]
780 icmpn = self.statistics.get_counter('/nat64/in2out/icmp')[0]
781 drops = self.statistics.get_counter('/nat64/in2out/drops')[0]
783 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
784 self.pg0.add_stream(pkts)
785 self.pg_enable_capture(self.pg_interfaces)
787 capture = self.pg1.get_capture(len(pkts))
788 self.verify_capture_out(capture, nat_ip=self.nat_addr,
789 dst_ip=self.pg1.remote_ip4)
791 if_idx = self.pg0.sw_if_index
792 cnt = self.statistics.get_counter('/nat64/in2out/tcp')[0]
793 self.assertEqual(cnt[if_idx] - tcpn[if_idx], 1)
794 cnt = self.statistics.get_counter('/nat64/in2out/udp')[0]
795 self.assertEqual(cnt[if_idx] - udpn[if_idx], 1)
796 cnt = self.statistics.get_counter('/nat64/in2out/icmp')[0]
797 self.assertEqual(cnt[if_idx] - icmpn[if_idx], 1)
798 cnt = self.statistics.get_counter('/nat64/in2out/drops')[0]
799 self.assertEqual(cnt[if_idx] - drops[if_idx], 0)
802 tcpn = self.statistics.get_counter('/nat64/out2in/tcp')[0]
803 udpn = self.statistics.get_counter('/nat64/out2in/udp')[0]
804 icmpn = self.statistics.get_counter('/nat64/out2in/icmp')[0]
805 drops = self.statistics.get_counter('/nat64/out2in/drops')[0]
807 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
808 self.pg1.add_stream(pkts)
809 self.pg_enable_capture(self.pg_interfaces)
811 capture = self.pg0.get_capture(len(pkts))
812 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
813 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
815 if_idx = self.pg1.sw_if_index
816 cnt = self.statistics.get_counter('/nat64/out2in/tcp')[0]
817 self.assertEqual(cnt[if_idx] - tcpn[if_idx], 2)
818 cnt = self.statistics.get_counter('/nat64/out2in/udp')[0]
819 self.assertEqual(cnt[if_idx] - udpn[if_idx], 1)
820 cnt = self.statistics.get_counter('/nat64/out2in/icmp')[0]
821 self.assertEqual(cnt[if_idx] - icmpn[if_idx], 1)
822 cnt = self.statistics.get_counter('/nat64/out2in/drops')[0]
823 self.assertEqual(cnt[if_idx] - drops[if_idx], 0)
825 bibs = self.statistics.get_counter('/nat64/total-bibs')
826 self.assertEqual(bibs[0][0], 3)
827 sessions = self.statistics.get_counter('/nat64/total-sessions')
828 self.assertEqual(sessions[0][0], 3)
831 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
832 self.pg0.add_stream(pkts)
833 self.pg_enable_capture(self.pg_interfaces)
835 capture = self.pg1.get_capture(len(pkts))
836 self.verify_capture_out(capture, nat_ip=self.nat_addr,
837 dst_ip=self.pg1.remote_ip4)
840 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
841 self.pg1.add_stream(pkts)
842 self.pg_enable_capture(self.pg_interfaces)
844 capture = self.pg0.get_capture(len(pkts))
845 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
847 ses_num_end = self.nat64_get_ses_num()
849 self.assertEqual(ses_num_end - ses_num_start, 3)
851 # tenant with specific VRF
852 self.vapi.nat64_add_del_pool_addr_range(start_addr=self.vrf1_nat_addr,
853 end_addr=self.vrf1_nat_addr,
854 vrf_id=self.vrf1_id, is_add=1)
855 flags = self.config_flags.NAT_IS_INSIDE
856 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
857 sw_if_index=self.pg2.sw_if_index)
859 pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
860 self.pg2.add_stream(pkts)
861 self.pg_enable_capture(self.pg_interfaces)
863 capture = self.pg1.get_capture(len(pkts))
864 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
865 dst_ip=self.pg1.remote_ip4)
867 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
868 self.pg1.add_stream(pkts)
869 self.pg_enable_capture(self.pg_interfaces)
871 capture = self.pg2.get_capture(len(pkts))
872 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
874 def test_static(self):
875 """ NAT64 static translation test """
876 self.tcp_port_in = 60303
877 self.udp_port_in = 60304
878 self.icmp_id_in = 60305
879 self.tcp_port_out = 60303
880 self.udp_port_out = 60304
881 self.icmp_id_out = 60305
883 ses_num_start = self.nat64_get_ses_num()
885 self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
886 end_addr=self.nat_addr,
889 flags = self.config_flags.NAT_IS_INSIDE
890 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
891 sw_if_index=self.pg0.sw_if_index)
892 self.vapi.nat64_add_del_interface(is_add=1, flags=0,
893 sw_if_index=self.pg1.sw_if_index)
895 self.vapi.nat64_add_del_static_bib(i_addr=self.pg0.remote_ip6,
896 o_addr=self.nat_addr,
897 i_port=self.tcp_port_in,
898 o_port=self.tcp_port_out,
899 proto=IP_PROTOS.tcp, vrf_id=0,
901 self.vapi.nat64_add_del_static_bib(i_addr=self.pg0.remote_ip6,
902 o_addr=self.nat_addr,
903 i_port=self.udp_port_in,
904 o_port=self.udp_port_out,
905 proto=IP_PROTOS.udp, vrf_id=0,
907 self.vapi.nat64_add_del_static_bib(i_addr=self.pg0.remote_ip6,
908 o_addr=self.nat_addr,
909 i_port=self.icmp_id_in,
910 o_port=self.icmp_id_out,
911 proto=IP_PROTOS.icmp, vrf_id=0,
915 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
916 self.pg0.add_stream(pkts)
917 self.pg_enable_capture(self.pg_interfaces)
919 capture = self.pg1.get_capture(len(pkts))
920 self.verify_capture_out(capture, nat_ip=self.nat_addr,
921 dst_ip=self.pg1.remote_ip4, same_port=True)
924 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
925 self.pg1.add_stream(pkts)
926 self.pg_enable_capture(self.pg_interfaces)
928 capture = self.pg0.get_capture(len(pkts))
929 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
930 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
932 ses_num_end = self.nat64_get_ses_num()
934 self.assertEqual(ses_num_end - ses_num_start, 3)
936 def test_session_timeout(self):
937 """ NAT64 session timeout """
938 self.icmp_id_in = 1234
939 self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
940 end_addr=self.nat_addr,
943 flags = self.config_flags.NAT_IS_INSIDE
944 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
945 sw_if_index=self.pg0.sw_if_index)
946 self.vapi.nat64_add_del_interface(is_add=1, flags=0,
947 sw_if_index=self.pg1.sw_if_index)
948 self.vapi.nat64_set_timeouts(udp=300, tcp_established=5,
952 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
953 self.pg0.add_stream(pkts)
954 self.pg_enable_capture(self.pg_interfaces)
956 capture = self.pg1.get_capture(len(pkts))
958 ses_num_before_timeout = self.nat64_get_ses_num()
960 self.virtual_sleep(15)
962 # ICMP and TCP session after timeout
963 ses_num_after_timeout = self.nat64_get_ses_num()
964 self.assertEqual(ses_num_before_timeout - ses_num_after_timeout, 2)
966 def test_icmp_error(self):
967 """ NAT64 ICMP Error message translation """
968 self.tcp_port_in = 6303
969 self.udp_port_in = 6304
970 self.icmp_id_in = 6305
972 self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
973 end_addr=self.nat_addr,
976 flags = self.config_flags.NAT_IS_INSIDE
977 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
978 sw_if_index=self.pg0.sw_if_index)
979 self.vapi.nat64_add_del_interface(is_add=1, flags=0,
980 sw_if_index=self.pg1.sw_if_index)
982 # send some packets to create sessions
983 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
984 self.pg0.add_stream(pkts)
985 self.pg_enable_capture(self.pg_interfaces)
987 capture_ip4 = self.pg1.get_capture(len(pkts))
988 self.verify_capture_out(capture_ip4,
989 nat_ip=self.nat_addr,
990 dst_ip=self.pg1.remote_ip4)
992 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
993 self.pg1.add_stream(pkts)
994 self.pg_enable_capture(self.pg_interfaces)
996 capture_ip6 = self.pg0.get_capture(len(pkts))
997 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
998 self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
1002 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1003 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
1004 ICMPv6DestUnreach(code=1) /
1005 packet[IPv6] for packet in capture_ip6]
1006 self.pg0.add_stream(pkts)
1007 self.pg_enable_capture(self.pg_interfaces)
1009 capture = self.pg1.get_capture(len(pkts))
1010 for packet in capture:
1012 self.assertEqual(packet[IP].src, self.nat_addr)
1013 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1014 self.assertEqual(packet[ICMP].type, 3)
1015 self.assertEqual(packet[ICMP].code, 13)
1016 inner = packet[IPerror]
1017 self.assertEqual(inner.src, self.pg1.remote_ip4)
1018 self.assertEqual(inner.dst, self.nat_addr)
1019 self.assert_packet_checksums_valid(packet)
1020 if inner.haslayer(TCPerror):
1021 self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
1022 elif inner.haslayer(UDPerror):
1023 self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
1025 self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
1027 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1031 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1032 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1033 ICMP(type=3, code=13) /
1034 packet[IP] for packet in capture_ip4]
1035 self.pg1.add_stream(pkts)
1036 self.pg_enable_capture(self.pg_interfaces)
1038 capture = self.pg0.get_capture(len(pkts))
1039 for packet in capture:
1041 self.assertEqual(packet[IPv6].src, ip.src)
1042 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
1043 icmp = packet[ICMPv6DestUnreach]
1044 self.assertEqual(icmp.code, 1)
1045 inner = icmp[IPerror6]
1046 self.assertEqual(inner.src, self.pg0.remote_ip6)
1047 self.assertEqual(inner.dst, ip.src)
1048 self.assert_icmpv6_checksum_valid(packet)
1049 if inner.haslayer(TCPerror):
1050 self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
1051 elif inner.haslayer(UDPerror):
1052 self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
1054 self.assertEqual(inner[ICMPv6EchoRequest].id,
1057 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1060 def test_hairpinning(self):
1061 """ NAT64 hairpinning """
1063 client = self.pg0.remote_hosts[0]
1064 server = self.pg0.remote_hosts[1]
1065 server_tcp_in_port = 22
1066 server_tcp_out_port = 4022
1067 server_udp_in_port = 23
1068 server_udp_out_port = 4023
1069 client_tcp_in_port = 1234
1070 client_udp_in_port = 1235
1071 client_tcp_out_port = 0
1072 client_udp_out_port = 0
1073 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
1074 nat_addr_ip6 = ip.src
1076 self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1077 end_addr=self.nat_addr,
1080 flags = self.config_flags.NAT_IS_INSIDE
1081 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1082 sw_if_index=self.pg0.sw_if_index)
1083 self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1084 sw_if_index=self.pg1.sw_if_index)
1086 self.vapi.nat64_add_del_static_bib(i_addr=server.ip6n,
1087 o_addr=self.nat_addr,
1088 i_port=server_tcp_in_port,
1089 o_port=server_tcp_out_port,
1090 proto=IP_PROTOS.tcp, vrf_id=0,
1092 self.vapi.nat64_add_del_static_bib(i_addr=server.ip6n,
1093 o_addr=self.nat_addr,
1094 i_port=server_udp_in_port,
1095 o_port=server_udp_out_port,
1096 proto=IP_PROTOS.udp, vrf_id=0,
1101 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1102 IPv6(src=client.ip6, dst=nat_addr_ip6) /
1103 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
1105 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1106 IPv6(src=client.ip6, dst=nat_addr_ip6) /
1107 UDP(sport=client_udp_in_port, dport=server_udp_out_port))
1109 self.pg0.add_stream(pkts)
1110 self.pg_enable_capture(self.pg_interfaces)
1112 capture = self.pg0.get_capture(len(pkts))
1113 for packet in capture:
1115 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
1116 self.assertEqual(packet[IPv6].dst, server.ip6)
1117 self.assert_packet_checksums_valid(packet)
1118 if packet.haslayer(TCP):
1119 self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
1120 self.assertEqual(packet[TCP].dport, server_tcp_in_port)
1121 client_tcp_out_port = packet[TCP].sport
1123 self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
1124 self.assertEqual(packet[UDP].dport, server_udp_in_port)
1125 client_udp_out_port = packet[UDP].sport
1127 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1132 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1133 IPv6(src=server.ip6, dst=nat_addr_ip6) /
1134 TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
1136 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1137 IPv6(src=server.ip6, dst=nat_addr_ip6) /
1138 UDP(sport=server_udp_in_port, dport=client_udp_out_port))
1140 self.pg0.add_stream(pkts)
1141 self.pg_enable_capture(self.pg_interfaces)
1143 capture = self.pg0.get_capture(len(pkts))
1144 for packet in capture:
1146 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
1147 self.assertEqual(packet[IPv6].dst, client.ip6)
1148 self.assert_packet_checksums_valid(packet)
1149 if packet.haslayer(TCP):
1150 self.assertEqual(packet[TCP].sport, server_tcp_out_port)
1151 self.assertEqual(packet[TCP].dport, client_tcp_in_port)
1153 self.assertEqual(packet[UDP].sport, server_udp_out_port)
1154 self.assertEqual(packet[UDP].dport, client_udp_in_port)
1156 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1161 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1162 IPv6(src=client.ip6, dst=nat_addr_ip6) /
1163 ICMPv6DestUnreach(code=1) /
1164 packet[IPv6] for packet in capture]
1165 self.pg0.add_stream(pkts)
1166 self.pg_enable_capture(self.pg_interfaces)
1168 capture = self.pg0.get_capture(len(pkts))
1169 for packet in capture:
1171 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
1172 self.assertEqual(packet[IPv6].dst, server.ip6)
1173 icmp = packet[ICMPv6DestUnreach]
1174 self.assertEqual(icmp.code, 1)
1175 inner = icmp[IPerror6]
1176 self.assertEqual(inner.src, server.ip6)
1177 self.assertEqual(inner.dst, nat_addr_ip6)
1178 self.assert_packet_checksums_valid(packet)
1179 if inner.haslayer(TCPerror):
1180 self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
1181 self.assertEqual(inner[TCPerror].dport,
1182 client_tcp_out_port)
1184 self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
1185 self.assertEqual(inner[UDPerror].dport,
1186 client_udp_out_port)
1188 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1191 def test_prefix(self):
1192 """ NAT64 Network-Specific Prefix """
1194 self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1195 end_addr=self.nat_addr,
1198 flags = self.config_flags.NAT_IS_INSIDE
1199 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1200 sw_if_index=self.pg0.sw_if_index)
1201 self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1202 sw_if_index=self.pg1.sw_if_index)
1203 self.vapi.nat64_add_del_pool_addr_range(start_addr=self.vrf1_nat_addr,
1204 end_addr=self.vrf1_nat_addr,
1205 vrf_id=self.vrf1_id, is_add=1)
1206 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1207 sw_if_index=self.pg2.sw_if_index)
1210 global_pref64 = "2001:db8::"
1211 global_pref64_len = 32
1212 global_pref64_str = "{}/{}".format(global_pref64, global_pref64_len)
1213 self.vapi.nat64_add_del_prefix(prefix=global_pref64_str, vrf_id=0,
1216 prefix = self.vapi.nat64_prefix_dump()
1217 self.assertEqual(len(prefix), 1)
1218 self.assertEqual(str(prefix[0].prefix), global_pref64_str)
1219 self.assertEqual(prefix[0].vrf_id, 0)
1221 # Add tenant specific prefix
1222 vrf1_pref64 = "2001:db8:122:300::"
1223 vrf1_pref64_len = 56
1224 vrf1_pref64_str = "{}/{}".format(vrf1_pref64, vrf1_pref64_len)
1225 self.vapi.nat64_add_del_prefix(prefix=vrf1_pref64_str,
1226 vrf_id=self.vrf1_id, is_add=1)
1228 prefix = self.vapi.nat64_prefix_dump()
1229 self.assertEqual(len(prefix), 2)
1232 pkts = self.create_stream_in_ip6(self.pg0,
1235 plen=global_pref64_len)
1236 self.pg0.add_stream(pkts)
1237 self.pg_enable_capture(self.pg_interfaces)
1239 capture = self.pg1.get_capture(len(pkts))
1240 self.verify_capture_out(capture, nat_ip=self.nat_addr,
1241 dst_ip=self.pg1.remote_ip4)
1243 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
1244 self.pg1.add_stream(pkts)
1245 self.pg_enable_capture(self.pg_interfaces)
1247 capture = self.pg0.get_capture(len(pkts))
1248 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
1251 self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
1253 # Tenant specific prefix
1254 pkts = self.create_stream_in_ip6(self.pg2,
1257 plen=vrf1_pref64_len)
1258 self.pg2.add_stream(pkts)
1259 self.pg_enable_capture(self.pg_interfaces)
1261 capture = self.pg1.get_capture(len(pkts))
1262 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
1263 dst_ip=self.pg1.remote_ip4)
1265 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
1266 self.pg1.add_stream(pkts)
1267 self.pg_enable_capture(self.pg_interfaces)
1269 capture = self.pg2.get_capture(len(pkts))
1270 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
1273 self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
1275 def test_unknown_proto(self):
1276 """ NAT64 translate packet with unknown protocol """
1278 self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1279 end_addr=self.nat_addr,
1282 flags = self.config_flags.NAT_IS_INSIDE
1283 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1284 sw_if_index=self.pg0.sw_if_index)
1285 self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1286 sw_if_index=self.pg1.sw_if_index)
1287 remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
1290 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1291 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
1292 TCP(sport=self.tcp_port_in, dport=20))
1293 self.pg0.add_stream(p)
1294 self.pg_enable_capture(self.pg_interfaces)
1296 p = self.pg1.get_capture(1)
1298 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1299 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47) /
1301 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
1302 TCP(sport=1234, dport=1234))
1303 self.pg0.add_stream(p)
1304 self.pg_enable_capture(self.pg_interfaces)
1306 p = self.pg1.get_capture(1)
1309 self.assertEqual(packet[IP].src, self.nat_addr)
1310 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1311 self.assertEqual(packet.haslayer(GRE), 1)
1312 self.assert_packet_checksums_valid(packet)
1314 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1318 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1319 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1321 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
1322 TCP(sport=1234, dport=1234))
1323 self.pg1.add_stream(p)
1324 self.pg_enable_capture(self.pg_interfaces)
1326 p = self.pg0.get_capture(1)
1329 self.assertEqual(packet[IPv6].src, remote_ip6)
1330 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
1331 self.assertEqual(packet[IPv6].nh, 47)
1333 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1336 def test_hairpinning_unknown_proto(self):
1337 """ NAT64 translate packet with unknown protocol - hairpinning """
1339 client = self.pg0.remote_hosts[0]
1340 server = self.pg0.remote_hosts[1]
1341 server_tcp_in_port = 22
1342 server_tcp_out_port = 4022
1343 client_tcp_in_port = 1234
1344 client_tcp_out_port = 1235
1345 server_nat_ip = "10.0.0.100"
1346 client_nat_ip = "10.0.0.110"
1347 server_nat_ip6 = self.compose_ip6(server_nat_ip, '64:ff9b::', 96)
1348 client_nat_ip6 = self.compose_ip6(client_nat_ip, '64:ff9b::', 96)
1350 self.vapi.nat64_add_del_pool_addr_range(start_addr=server_nat_ip,
1351 end_addr=client_nat_ip,
1354 flags = self.config_flags.NAT_IS_INSIDE
1355 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1356 sw_if_index=self.pg0.sw_if_index)
1357 self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1358 sw_if_index=self.pg1.sw_if_index)
1360 self.vapi.nat64_add_del_static_bib(i_addr=server.ip6n,
1361 o_addr=server_nat_ip,
1362 i_port=server_tcp_in_port,
1363 o_port=server_tcp_out_port,
1364 proto=IP_PROTOS.tcp, vrf_id=0,
1367 self.vapi.nat64_add_del_static_bib(i_addr=server.ip6n,
1368 o_addr=server_nat_ip, i_port=0,
1370 proto=IP_PROTOS.gre, vrf_id=0,
1373 self.vapi.nat64_add_del_static_bib(i_addr=client.ip6n,
1374 o_addr=client_nat_ip,
1375 i_port=client_tcp_in_port,
1376 o_port=client_tcp_out_port,
1377 proto=IP_PROTOS.tcp, vrf_id=0,
1381 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1382 IPv6(src=client.ip6, dst=server_nat_ip6) /
1383 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
1384 self.pg0.add_stream(p)
1385 self.pg_enable_capture(self.pg_interfaces)
1387 p = self.pg0.get_capture(1)
1389 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1390 IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre) /
1392 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
1393 TCP(sport=1234, dport=1234))
1394 self.pg0.add_stream(p)
1395 self.pg_enable_capture(self.pg_interfaces)
1397 p = self.pg0.get_capture(1)
1400 self.assertEqual(packet[IPv6].src, client_nat_ip6)
1401 self.assertEqual(packet[IPv6].dst, server.ip6)
1402 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
1404 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1408 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1409 IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre) /
1411 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
1412 TCP(sport=1234, dport=1234))
1413 self.pg0.add_stream(p)
1414 self.pg_enable_capture(self.pg_interfaces)
1416 p = self.pg0.get_capture(1)
1419 self.assertEqual(packet[IPv6].src, server_nat_ip6)
1420 self.assertEqual(packet[IPv6].dst, client.ip6)
1421 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
1423 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1426 def test_one_armed_nat64(self):
1427 """ One armed NAT64 """
1429 remote_host_ip6 = self.compose_ip6(self.pg3.remote_ip4,
1433 self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1434 end_addr=self.nat_addr,
1437 flags = self.config_flags.NAT_IS_INSIDE
1438 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1439 sw_if_index=self.pg3.sw_if_index)
1440 self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1441 sw_if_index=self.pg3.sw_if_index)
1444 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
1445 IPv6(src=self.pg3.remote_ip6, dst=remote_host_ip6) /
1446 TCP(sport=12345, dport=80))
1447 self.pg3.add_stream(p)
1448 self.pg_enable_capture(self.pg_interfaces)
1450 capture = self.pg3.get_capture(1)
1455 self.assertEqual(ip.src, self.nat_addr)
1456 self.assertEqual(ip.dst, self.pg3.remote_ip4)
1457 self.assertNotEqual(tcp.sport, 12345)
1458 external_port = tcp.sport
1459 self.assertEqual(tcp.dport, 80)
1460 self.assert_packet_checksums_valid(p)
1462 self.logger.error(ppp("Unexpected or invalid packet:", p))
1466 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
1467 IP(src=self.pg3.remote_ip4, dst=self.nat_addr) /
1468 TCP(sport=80, dport=external_port))
1469 self.pg3.add_stream(p)
1470 self.pg_enable_capture(self.pg_interfaces)
1472 capture = self.pg3.get_capture(1)
1477 self.assertEqual(ip.src, remote_host_ip6)
1478 self.assertEqual(ip.dst, self.pg3.remote_ip6)
1479 self.assertEqual(tcp.sport, 80)
1480 self.assertEqual(tcp.dport, 12345)
1481 self.assert_packet_checksums_valid(p)
1483 self.logger.error(ppp("Unexpected or invalid packet:", p))
1486 def test_frag_in_order(self):
1487 """ NAT64 translate fragments arriving in order """
1488 self.tcp_port_in = random.randint(1025, 65535)
1490 self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1491 end_addr=self.nat_addr,
1494 flags = self.config_flags.NAT_IS_INSIDE
1495 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1496 sw_if_index=self.pg0.sw_if_index)
1497 self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1498 sw_if_index=self.pg1.sw_if_index)
1502 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
1503 self.tcp_port_in, 20, data)
1504 self.pg0.add_stream(pkts)
1505 self.pg_enable_capture(self.pg_interfaces)
1507 frags = self.pg1.get_capture(len(pkts))
1508 p = self.reass_frags_and_verify(frags,
1510 self.pg1.remote_ip4)
1511 self.assertEqual(p[TCP].dport, 20)
1512 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
1513 self.tcp_port_out = p[TCP].sport
1514 self.assertEqual(data, p[Raw].load)
1517 data = b"A" * 4 + b"b" * 16 + b"C" * 3
1518 pkts = self.create_stream_frag(self.pg1,
1523 self.pg1.add_stream(pkts)
1524 self.pg_enable_capture(self.pg_interfaces)
1526 frags = self.pg0.get_capture(len(pkts))
1527 self.logger.debug(ppc("Captured:", frags))
1528 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
1529 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
1530 self.assertEqual(p[TCP].sport, 20)
1531 self.assertEqual(p[TCP].dport, self.tcp_port_in)
1532 self.assertEqual(data, p[Raw].load)
1534 def test_reass_hairpinning(self):
1535 """ NAT64 fragments hairpinning """
1537 server = self.pg0.remote_hosts[1]
1538 server_in_port = random.randint(1025, 65535)
1539 server_out_port = random.randint(1025, 65535)
1540 client_in_port = random.randint(1025, 65535)
1541 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
1542 nat_addr_ip6 = ip.src
1544 self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1545 end_addr=self.nat_addr,
1548 flags = self.config_flags.NAT_IS_INSIDE
1549 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1550 sw_if_index=self.pg0.sw_if_index)
1551 self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1552 sw_if_index=self.pg1.sw_if_index)
1554 # add static BIB entry for server
1555 self.vapi.nat64_add_del_static_bib(i_addr=server.ip6n,
1556 o_addr=self.nat_addr,
1557 i_port=server_in_port,
1558 o_port=server_out_port,
1559 proto=IP_PROTOS.tcp, vrf_id=0,
1562 # send packet from host to server
1563 pkts = self.create_stream_frag_ip6(self.pg0,
1568 self.pg0.add_stream(pkts)
1569 self.pg_enable_capture(self.pg_interfaces)
1571 frags = self.pg0.get_capture(len(pkts))
1572 self.logger.debug(ppc("Captured:", frags))
1573 p = self.reass_frags_and_verify_ip6(frags, nat_addr_ip6, server.ip6)
1574 self.assertNotEqual(p[TCP].sport, client_in_port)
1575 self.assertEqual(p[TCP].dport, server_in_port)
1576 self.assertEqual(data, p[Raw].load)
1578 def test_frag_out_of_order(self):
1579 """ NAT64 translate fragments arriving out of order """
1580 self.tcp_port_in = random.randint(1025, 65535)
1582 self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1583 end_addr=self.nat_addr,
1586 flags = self.config_flags.NAT_IS_INSIDE
1587 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1588 sw_if_index=self.pg0.sw_if_index)
1589 self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1590 sw_if_index=self.pg1.sw_if_index)
1594 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
1595 self.tcp_port_in, 20, data)
1597 self.pg0.add_stream(pkts)
1598 self.pg_enable_capture(self.pg_interfaces)
1600 frags = self.pg1.get_capture(len(pkts))
1601 p = self.reass_frags_and_verify(frags,
1603 self.pg1.remote_ip4)
1604 self.assertEqual(p[TCP].dport, 20)
1605 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
1606 self.tcp_port_out = p[TCP].sport
1607 self.assertEqual(data, p[Raw].load)
1610 data = b"A" * 4 + b"B" * 16 + b"C" * 3
1611 pkts = self.create_stream_frag(self.pg1,
1617 self.pg1.add_stream(pkts)
1618 self.pg_enable_capture(self.pg_interfaces)
1620 frags = self.pg0.get_capture(len(pkts))
1621 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
1622 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
1623 self.assertEqual(p[TCP].sport, 20)
1624 self.assertEqual(p[TCP].dport, self.tcp_port_in)
1625 self.assertEqual(data, p[Raw].load)
1627 def test_interface_addr(self):
1628 """ Acquire NAT64 pool addresses from interface """
1629 self.vapi.nat64_add_del_interface_addr(
1631 sw_if_index=self.pg4.sw_if_index)
1633 # no address in NAT64 pool
1634 addresses = self.vapi.nat44_address_dump()
1635 self.assertEqual(0, len(addresses))
1637 # configure interface address and check NAT64 address pool
1638 self.pg4.config_ip4()
1639 addresses = self.vapi.nat64_pool_addr_dump()
1640 self.assertEqual(len(addresses), 1)
1642 self.assertEqual(str(addresses[0].address),
1645 # remove interface address and check NAT64 address pool
1646 self.pg4.unconfig_ip4()
1647 addresses = self.vapi.nat64_pool_addr_dump()
1648 self.assertEqual(0, len(addresses))
1650 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1651 def test_ipfix_max_bibs_sessions(self):
1652 """ IPFIX logging maximum session and BIB entries exceeded """
1655 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
1659 self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1660 end_addr=self.nat_addr,
1663 flags = self.config_flags.NAT_IS_INSIDE
1664 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1665 sw_if_index=self.pg0.sw_if_index)
1666 self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1667 sw_if_index=self.pg1.sw_if_index)
1671 for i in range(0, max_bibs):
1672 src = "fd01:aa::%x" % (i)
1673 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1674 IPv6(src=src, dst=remote_host_ip6) /
1675 TCP(sport=12345, dport=80))
1677 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1678 IPv6(src=src, dst=remote_host_ip6) /
1679 TCP(sport=12345, dport=22))
1681 self.pg0.add_stream(pkts)
1682 self.pg_enable_capture(self.pg_interfaces)
1684 self.pg1.get_capture(max_sessions)
1686 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4,
1687 src_address=self.pg3.local_ip4,
1689 template_interval=10)
1690 self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
1691 src_port=self.ipfix_src_port,
1694 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1695 IPv6(src=src, dst=remote_host_ip6) /
1696 TCP(sport=12345, dport=25))
1697 self.pg0.add_stream(p)
1698 self.pg_enable_capture(self.pg_interfaces)
1700 self.pg1.assert_nothing_captured()
1701 self.vapi.ipfix_flush()
1702 capture = self.pg3.get_capture(7)
1703 ipfix = IPFIXDecoder()
1704 # first load template
1706 self.assertTrue(p.haslayer(IPFIX))
1707 self.assertEqual(p[IP].src, self.pg3.local_ip4)
1708 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1709 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1710 self.assertEqual(p[UDP].dport, 4739)
1711 self.assertEqual(p[IPFIX].observationDomainID,
1712 self.ipfix_domain_id)
1713 if p.haslayer(Template):
1714 ipfix.add_template(p.getlayer(Template))
1715 # verify events in data set
1717 if p.haslayer(Data):
1718 data = ipfix.decode_data_set(p.getlayer(Set))
1719 self.verify_ipfix_max_sessions(data, max_sessions)
1721 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1722 IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
1723 TCP(sport=12345, dport=80))
1724 self.pg0.add_stream(p)
1725 self.pg_enable_capture(self.pg_interfaces)
1727 self.pg1.assert_nothing_captured()
1728 self.vapi.ipfix_flush()
1729 capture = self.pg3.get_capture(1)
1730 # verify events in data set
1732 self.assertTrue(p.haslayer(IPFIX))
1733 self.assertEqual(p[IP].src, self.pg3.local_ip4)
1734 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1735 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1736 self.assertEqual(p[UDP].dport, 4739)
1737 self.assertEqual(p[IPFIX].observationDomainID,
1738 self.ipfix_domain_id)
1739 if p.haslayer(Data):
1740 data = ipfix.decode_data_set(p.getlayer(Set))
1741 self.verify_ipfix_max_bibs(data, max_bibs)
1743 def test_ipfix_bib_ses(self):
1744 """ IPFIX logging NAT64 BIB/session create and delete events """
1745 self.tcp_port_in = random.randint(1025, 65535)
1746 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
1750 self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1751 end_addr=self.nat_addr,
1754 flags = self.config_flags.NAT_IS_INSIDE
1755 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1756 sw_if_index=self.pg0.sw_if_index)
1757 self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1758 sw_if_index=self.pg1.sw_if_index)
1759 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4,
1760 src_address=self.pg3.local_ip4,
1762 template_interval=10)
1763 self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
1764 src_port=self.ipfix_src_port,
1768 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1769 IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
1770 TCP(sport=self.tcp_port_in, dport=25))
1771 self.pg0.add_stream(p)
1772 self.pg_enable_capture(self.pg_interfaces)
1774 p = self.pg1.get_capture(1)
1775 self.tcp_port_out = p[0][TCP].sport
1776 self.vapi.ipfix_flush()
1777 capture = self.pg3.get_capture(8)
1778 ipfix = IPFIXDecoder()
1779 # first load template
1781 self.assertTrue(p.haslayer(IPFIX))
1782 self.assertEqual(p[IP].src, self.pg3.local_ip4)
1783 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1784 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1785 self.assertEqual(p[UDP].dport, 4739)
1786 self.assertEqual(p[IPFIX].observationDomainID,
1787 self.ipfix_domain_id)
1788 if p.haslayer(Template):
1789 ipfix.add_template(p.getlayer(Template))
1790 # verify events in data set
1792 if p.haslayer(Data):
1793 data = ipfix.decode_data_set(p.getlayer(Set))
1794 if scapy.compat.orb(data[0][230]) == 10:
1795 self.verify_ipfix_bib(data, 1, self.pg0.remote_ip6)
1796 elif scapy.compat.orb(data[0][230]) == 6:
1797 self.verify_ipfix_nat64_ses(data,
1799 self.pg0.remote_ip6,
1800 self.pg1.remote_ip4,
1803 self.logger.error(ppp("Unexpected or invalid packet: ", p))
1806 self.pg_enable_capture(self.pg_interfaces)
1807 self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1808 end_addr=self.nat_addr,
1811 self.vapi.ipfix_flush()
1812 capture = self.pg3.get_capture(2)
1813 # verify events in data set
1815 self.assertTrue(p.haslayer(IPFIX))
1816 self.assertEqual(p[IP].src, self.pg3.local_ip4)
1817 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1818 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1819 self.assertEqual(p[UDP].dport, 4739)
1820 self.assertEqual(p[IPFIX].observationDomainID,
1821 self.ipfix_domain_id)
1822 if p.haslayer(Data):
1823 data = ipfix.decode_data_set(p.getlayer(Set))
1824 if scapy.compat.orb(data[0][230]) == 11:
1825 self.verify_ipfix_bib(data, 0, self.pg0.remote_ip6)
1826 elif scapy.compat.orb(data[0][230]) == 7:
1827 self.verify_ipfix_nat64_ses(data,
1829 self.pg0.remote_ip6,
1830 self.pg1.remote_ip4,
1833 self.logger.error(ppp("Unexpected or invalid packet: ", p))
1835 def test_syslog_sess(self):
1836 """ Test syslog session creation and deletion """
1837 self.tcp_port_in = random.randint(1025, 65535)
1838 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
1842 self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1843 end_addr=self.nat_addr,
1846 flags = self.config_flags.NAT_IS_INSIDE
1847 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1848 sw_if_index=self.pg0.sw_if_index)
1849 self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1850 sw_if_index=self.pg1.sw_if_index)
1851 self.vapi.syslog_set_filter(
1852 self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_INFO)
1853 self.vapi.syslog_set_sender(self.pg3.local_ip4, self.pg3.remote_ip4)
1855 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1856 IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
1857 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
1858 self.pg0.add_stream(p)
1859 self.pg_enable_capture(self.pg_interfaces)
1861 p = self.pg1.get_capture(1)
1862 self.tcp_port_out = p[0][TCP].sport
1863 capture = self.pg3.get_capture(1)
1864 self.verify_syslog_sess(capture[0][Raw].load, is_ip6=True)
1866 self.pg_enable_capture(self.pg_interfaces)
1868 self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1869 end_addr=self.nat_addr,
1872 capture = self.pg3.get_capture(1)
1873 self.verify_syslog_sess(capture[0][Raw].load, False, True)
1875 def nat64_get_ses_num(self):
1877 Return number of active NAT64 sessions.
1879 st = self.vapi.nat64_st_dump(proto=255)
1882 def clear_nat64(self):
1884 Clear NAT64 configuration.
1886 self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
1887 src_port=self.ipfix_src_port,
1889 self.ipfix_src_port = 4739
1890 self.ipfix_domain_id = 1
1892 self.vapi.syslog_set_filter(
1893 self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_EMERG)
1895 self.vapi.nat64_set_timeouts(udp=300, tcp_established=7440,
1896 tcp_transitory=240, icmp=60)
1898 interfaces = self.vapi.nat64_interface_dump()
1899 for intf in interfaces:
1900 self.vapi.nat64_add_del_interface(is_add=0, flags=intf.flags,
1901 sw_if_index=intf.sw_if_index)
1903 bib = self.vapi.nat64_bib_dump(proto=255)
1905 if bibe.flags & self.config_flags.NAT_IS_STATIC:
1906 self.vapi.nat64_add_del_static_bib(i_addr=bibe.i_addr,
1914 adresses = self.vapi.nat64_pool_addr_dump()
1915 for addr in adresses:
1916 self.vapi.nat64_add_del_pool_addr_range(start_addr=addr.address,
1917 end_addr=addr.address,
1921 prefixes = self.vapi.nat64_prefix_dump()
1922 for prefix in prefixes:
1923 self.vapi.nat64_add_del_prefix(prefix=str(prefix.prefix),
1924 vrf_id=prefix.vrf_id, is_add=0)
1926 bibs = self.statistics.get_counter('/nat64/total-bibs')
1927 self.assertEqual(bibs[0][0], 0)
1928 sessions = self.statistics.get_counter('/nat64/total-sessions')
1929 self.assertEqual(sessions[0][0], 0)
1932 if __name__ == '__main__':
1933 unittest.main(testRunner=VppTestRunner)