11 from config import config
12 from framework import tag_fixme_vpp_workers
13 from framework import VppTestCase, VppTestRunner
14 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
15 from scapy.data import IP_PROTOS
16 from scapy.layers.inet import IP, TCP, UDP, ICMP
17 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
18 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
19 from scapy.layers.inet6 import (
28 from scapy.layers.l2 import Ether, GRE
29 from scapy.packet import Raw
30 from syslog_rfc5424_parser import SyslogMessage, ParseError
31 from syslog_rfc5424_parser.constants import SyslogSeverity
32 from util import ppc, ppp
33 from vpp_papi import VppEnum
36 @tag_fixme_vpp_workers
37 class TestNAT64(VppTestCase):
38 """NAT64 Test Cases"""
41 def SYSLOG_SEVERITY(self):
42 return VppEnum.vl_api_syslog_severity_t
45 def config_flags(self):
46 return VppEnum.vl_api_nat_config_flags_t
50 super(TestNAT64, cls).setUpClass()
52 cls.tcp_port_in = 6303
53 cls.tcp_port_out = 6303
54 cls.udp_port_in = 6304
55 cls.udp_port_out = 6304
57 cls.icmp_id_out = 6305
58 cls.tcp_external_port = 80
59 cls.nat_addr = "10.0.0.3"
60 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
62 cls.vrf1_nat_addr = "10.0.10.3"
63 cls.ipfix_src_port = 4739
64 cls.ipfix_domain_id = 1
66 cls.create_pg_interfaces(range(6))
67 cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
68 cls.ip6_interfaces.append(cls.pg_interfaces[2])
69 cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
71 cls.vapi.ip_table_add_del(
72 is_add=1, table={"table_id": cls.vrf1_id, "is_ip6": 1}
75 cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
77 cls.pg0.generate_remote_hosts(2)
79 for i in cls.ip6_interfaces:
82 i.configure_ipv6_neighbors()
84 for i in cls.ip4_interfaces:
93 cls.pg3.configure_ipv6_neighbors()
99 def tearDownClass(cls):
100 super(TestNAT64, cls).tearDownClass()
103 super(TestNAT64, self).setUp()
104 self.vapi.nat64_plugin_enable_disable(enable=1, bib_buckets=128, st_buckets=256)
107 super(TestNAT64, self).tearDown()
108 if not self.vpp_dead:
109 self.vapi.nat64_plugin_enable_disable(enable=0)
111 def show_commands_at_teardown(self):
112 self.logger.info(self.vapi.cli("show nat64 pool"))
113 self.logger.info(self.vapi.cli("show nat64 interfaces"))
114 self.logger.info(self.vapi.cli("show nat64 prefix"))
115 self.logger.info(self.vapi.cli("show nat64 bib all"))
116 self.logger.info(self.vapi.cli("show nat64 session table all"))
118 def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
120 Create IPv6 packet stream for inside network
122 :param in_if: Inside interface
123 :param out_if: Outside interface
124 :param ttl: Hop Limit of generated packets
125 :param pref: NAT64 prefix
126 :param plen: NAT64 prefix length
130 dst = "".join(["64:ff9b::", out_if.remote_ip4])
132 dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
136 Ether(dst=in_if.local_mac, src=in_if.remote_mac)
137 / IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim)
138 / TCP(sport=self.tcp_port_in, dport=20)
144 Ether(dst=in_if.local_mac, src=in_if.remote_mac)
145 / IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim)
146 / UDP(sport=self.udp_port_in, dport=20)
152 Ether(dst=in_if.local_mac, src=in_if.remote_mac)
153 / IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim)
154 / ICMPv6EchoRequest(id=self.icmp_id_in)
160 def create_stream_out(self, out_if, dst_ip=None, ttl=64, use_inside_ports=False):
162 Create packet stream for outside network
164 :param out_if: Outside interface
165 :param dst_ip: Destination IP address (Default use global NAT address)
166 :param ttl: TTL of generated packets
167 :param use_inside_ports: Use inside NAT ports as destination ports
168 instead of outside ports
171 dst_ip = self.nat_addr
172 if not use_inside_ports:
173 tcp_port = self.tcp_port_out
174 udp_port = self.udp_port_out
175 icmp_id = self.icmp_id_out
177 tcp_port = self.tcp_port_in
178 udp_port = self.udp_port_in
179 icmp_id = self.icmp_id_in
183 Ether(dst=out_if.local_mac, src=out_if.remote_mac)
184 / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl)
185 / TCP(dport=tcp_port, sport=20)
191 Ether(dst=out_if.local_mac, src=out_if.remote_mac)
192 / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl)
193 / UDP(dport=udp_port, sport=20)
199 Ether(dst=out_if.local_mac, src=out_if.remote_mac)
200 / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl)
201 / ICMP(id=icmp_id, type="echo-reply")
207 def verify_capture_out(
217 Verify captured packets on outside network
219 :param capture: Captured packets
220 :param nat_ip: Translated IP address (Default use global NAT address)
221 :param same_port: Source port number is not translated (Default False)
222 :param dst_ip: Destination IP address (Default do not verify)
223 :param is_ip6: If L3 protocol is IPv6 (Default False)
227 ICMP46 = ICMPv6EchoRequest
232 nat_ip = self.nat_addr
233 for packet in capture:
236 self.assert_packet_checksums_valid(packet)
237 self.assertEqual(packet[IP46].src, nat_ip)
238 if dst_ip is not None:
239 self.assertEqual(packet[IP46].dst, dst_ip)
240 if packet.haslayer(TCP):
243 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
245 self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
246 self.tcp_port_out = packet[TCP].sport
247 self.assert_packet_checksums_valid(packet)
248 elif packet.haslayer(UDP):
251 self.assertEqual(packet[UDP].sport, self.udp_port_in)
253 self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
254 self.udp_port_out = packet[UDP].sport
258 self.assertEqual(packet[ICMP46].id, self.icmp_id_in)
260 self.assertNotEqual(packet[ICMP46].id, self.icmp_id_in)
261 self.icmp_id_out = packet[ICMP46].id
262 self.assert_packet_checksums_valid(packet)
265 ppp("Unexpected or invalid packet (outside network):", packet)
269 def verify_capture_in_ip6(self, capture, src_ip, dst_ip):
271 Verify captured IPv6 packets on inside network
273 :param capture: Captured packets
274 :param src_ip: Source IP
275 :param dst_ip: Destination IP address
277 for packet in capture:
279 self.assertEqual(packet[IPv6].src, src_ip)
280 self.assertEqual(packet[IPv6].dst, dst_ip)
281 self.assert_packet_checksums_valid(packet)
282 if packet.haslayer(TCP):
283 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
284 elif packet.haslayer(UDP):
285 self.assertEqual(packet[UDP].dport, self.udp_port_in)
287 self.assertEqual(packet[ICMPv6EchoReply].id, self.icmp_id_in)
290 ppp("Unexpected or invalid packet (inside network):", packet)
294 def create_stream_frag(
295 self, src_if, dst, sport, dport, data, proto=IP_PROTOS.tcp, echo_reply=False
298 Create fragmented packet stream
300 :param src_if: Source interface
301 :param dst: Destination IPv4 address
302 :param sport: Source port
303 :param dport: Destination port
304 :param data: Payload data
305 :param proto: protocol (TCP, UDP, ICMP)
306 :param echo_reply: use echo_reply if protocol is ICMP
309 if proto == IP_PROTOS.tcp:
311 IP(src=src_if.remote_ip4, dst=dst)
312 / TCP(sport=sport, dport=dport)
315 p = p.__class__(scapy.compat.raw(p))
316 chksum = p[TCP].chksum
317 proto_header = TCP(sport=sport, dport=dport, chksum=chksum)
318 elif proto == IP_PROTOS.udp:
319 proto_header = UDP(sport=sport, dport=dport)
320 elif proto == IP_PROTOS.icmp:
322 proto_header = ICMP(id=sport, type="echo-request")
324 proto_header = ICMP(id=sport, type="echo-reply")
326 raise Exception("Unsupported protocol")
327 id = random.randint(0, 65535)
329 if proto == IP_PROTOS.tcp:
332 raw = Raw(data[0:16])
334 Ether(src=src_if.remote_mac, dst=src_if.local_mac)
335 / IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id)
340 if proto == IP_PROTOS.tcp:
341 raw = Raw(data[4:20])
343 raw = Raw(data[16:32])
345 Ether(src=src_if.remote_mac, dst=src_if.local_mac)
346 / IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id, proto=proto)
350 if proto == IP_PROTOS.tcp:
355 Ether(src=src_if.remote_mac, dst=src_if.local_mac)
356 / IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=proto, id=id)
362 def create_stream_frag_ip6(
363 self, src_if, dst, sport, dport, data, pref=None, plen=0, frag_size=128
366 Create fragmented packet stream
368 :param src_if: Source interface
369 :param dst: Destination IPv4 address
370 :param sport: Source TCP port
371 :param dport: Destination TCP port
372 :param data: Payload data
373 :param pref: NAT64 prefix
374 :param plen: NAT64 prefix length
375 :param fragsize: size of fragments
379 dst_ip6 = "".join(["64:ff9b::", dst])
381 dst_ip6 = self.compose_ip6(dst, pref, plen)
384 Ether(dst=src_if.local_mac, src=src_if.remote_mac)
385 / IPv6(src=src_if.remote_ip6, dst=dst_ip6)
386 / IPv6ExtHdrFragment(id=random.randint(0, 65535))
387 / TCP(sport=sport, dport=dport)
391 return fragment6(p, frag_size)
393 def reass_frags_and_verify(self, frags, src, dst):
395 Reassemble and verify fragmented packet
397 :param frags: Captured fragments
398 :param src: Source IPv4 address to verify
399 :param dst: Destination IPv4 address to verify
401 :returns: Reassembled IPv4 packet
405 self.assertEqual(p[IP].src, src)
406 self.assertEqual(p[IP].dst, dst)
407 self.assert_ip_checksum_valid(p)
408 buffer.seek(p[IP].frag * 8)
409 buffer.write(bytes(p[IP].payload))
410 ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst, proto=frags[0][IP].proto)
411 if ip.proto == IP_PROTOS.tcp:
412 p = ip / TCP(buffer.getvalue())
413 self.logger.debug(ppp("Reassembled:", p))
414 self.assert_tcp_checksum_valid(p)
415 elif ip.proto == IP_PROTOS.udp:
416 p = ip / UDP(buffer.getvalue()[:8]) / Raw(buffer.getvalue()[8:])
417 elif ip.proto == IP_PROTOS.icmp:
418 p = ip / ICMP(buffer.getvalue())
421 def reass_frags_and_verify_ip6(self, frags, src, dst):
423 Reassemble and verify fragmented packet
425 :param frags: Captured fragments
426 :param src: Source IPv6 address to verify
427 :param dst: Destination IPv6 address to verify
429 :returns: Reassembled IPv6 packet
433 self.assertEqual(p[IPv6].src, src)
434 self.assertEqual(p[IPv6].dst, dst)
435 buffer.seek(p[IPv6ExtHdrFragment].offset * 8)
436 buffer.write(bytes(p[IPv6ExtHdrFragment].payload))
438 src=frags[0][IPv6].src,
439 dst=frags[0][IPv6].dst,
440 nh=frags[0][IPv6ExtHdrFragment].nh,
442 if ip.nh == IP_PROTOS.tcp:
443 p = ip / TCP(buffer.getvalue())
444 elif ip.nh == IP_PROTOS.udp:
445 p = ip / UDP(buffer.getvalue())
446 self.logger.debug(ppp("Reassembled:", p))
447 self.assert_packet_checksums_valid(p)
450 def verify_ipfix_max_bibs(self, data, limit):
452 Verify IPFIX maximum BIB entries exceeded event
454 :param data: Decoded IPFIX data records
455 :param limit: Number of maximum BIB entries that can be created.
457 self.assertEqual(1, len(data))
460 self.assertEqual(scapy.compat.orb(record[230]), 13)
461 # natQuotaExceededEvent
462 self.assertEqual(struct.pack("I", 2), record[466])
464 self.assertEqual(struct.pack("I", limit), record[472])
466 def verify_ipfix_bib(self, data, is_create, src_addr):
468 Verify IPFIX NAT64 BIB create and delete events
470 :param data: Decoded IPFIX data records
471 :param is_create: Create event if nonzero value otherwise delete event
472 :param src_addr: IPv6 source address
474 self.assertEqual(1, len(data))
478 self.assertEqual(scapy.compat.orb(record[230]), 10)
480 self.assertEqual(scapy.compat.orb(record[230]), 11)
482 self.assertEqual(src_addr, str(ipaddress.IPv6Address(record[27])))
483 # postNATSourceIPv4Address
484 self.assertEqual(self.nat_addr_n, record[225])
486 self.assertEqual(IP_PROTOS.tcp, scapy.compat.orb(record[4]))
488 self.assertEqual(struct.pack("!I", 0), record[234])
489 # sourceTransportPort
490 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
491 # postNAPTSourceTransportPort
492 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
494 def verify_ipfix_nat64_ses(self, data, is_create, src_addr, dst_addr, dst_port):
496 Verify IPFIX NAT64 session create and delete events
498 :param data: Decoded IPFIX data records
499 :param is_create: Create event if nonzero value otherwise delete event
500 :param src_addr: IPv6 source address
501 :param dst_addr: IPv4 destination address
502 :param dst_port: destination TCP port
504 self.assertEqual(1, len(data))
508 self.assertEqual(scapy.compat.orb(record[230]), 6)
510 self.assertEqual(scapy.compat.orb(record[230]), 7)
512 self.assertEqual(src_addr, str(ipaddress.IPv6Address(record[27])))
513 # destinationIPv6Address
516 socket.AF_INET6, self.compose_ip6(dst_addr, "64:ff9b::", 96)
520 # postNATSourceIPv4Address
521 self.assertEqual(self.nat_addr_n, record[225])
522 # postNATDestinationIPv4Address
523 self.assertEqual(socket.inet_pton(socket.AF_INET, dst_addr), record[226])
525 self.assertEqual(IP_PROTOS.tcp, scapy.compat.orb(record[4]))
527 self.assertEqual(struct.pack("!I", 0), record[234])
528 # sourceTransportPort
529 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
530 # postNAPTSourceTransportPort
531 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
532 # destinationTransportPort
533 self.assertEqual(struct.pack("!H", dst_port), record[11])
534 # postNAPTDestinationTransportPort
535 self.assertEqual(struct.pack("!H", dst_port), record[228])
537 def verify_syslog_sess(self, data, is_add=True, is_ip6=False):
538 message = data.decode("utf-8")
540 message = SyslogMessage.parse(message)
541 except ParseError as e:
545 self.assertEqual(message.severity, SyslogSeverity.info)
546 self.assertEqual(message.appname, "NAT")
547 self.assertEqual(message.msgid, "SADD" if is_add else "SDEL")
548 sd_params = message.sd.get("nsess")
549 self.assertTrue(sd_params is not None)
551 self.assertEqual(sd_params.get("IATYP"), "IPv6")
552 self.assertEqual(sd_params.get("ISADDR"), self.pg0.remote_ip6)
554 self.assertEqual(sd_params.get("IATYP"), "IPv4")
555 self.assertEqual(sd_params.get("ISADDR"), self.pg0.remote_ip4)
556 self.assertTrue(sd_params.get("SSUBIX") is not None)
557 self.assertEqual(sd_params.get("ISPORT"), "%d" % self.tcp_port_in)
558 self.assertEqual(sd_params.get("XATYP"), "IPv4")
559 self.assertEqual(sd_params.get("XSADDR"), self.nat_addr)
560 self.assertEqual(sd_params.get("XSPORT"), "%d" % self.tcp_port_out)
561 self.assertEqual(sd_params.get("PROTO"), "%d" % IP_PROTOS.tcp)
562 self.assertEqual(sd_params.get("SVLAN"), "0")
563 self.assertEqual(sd_params.get("XDADDR"), self.pg1.remote_ip4)
564 self.assertEqual(sd_params.get("XDPORT"), "%d" % self.tcp_external_port)
566 def compose_ip6(self, ip4, pref, plen):
568 Compose IPv4-embedded IPv6 addresses
570 :param ip4: IPv4 address
571 :param pref: IPv6 prefix
572 :param plen: IPv6 prefix length
573 :returns: IPv4-embedded IPv6 addresses
575 pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
576 ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
591 pref_n[10] = ip4_n[3]
595 pref_n[10] = ip4_n[2]
596 pref_n[11] = ip4_n[3]
599 pref_n[10] = ip4_n[1]
600 pref_n[11] = ip4_n[2]
601 pref_n[12] = ip4_n[3]
603 pref_n[12] = ip4_n[0]
604 pref_n[13] = ip4_n[1]
605 pref_n[14] = ip4_n[2]
606 pref_n[15] = ip4_n[3]
607 packed_pref_n = b"".join([scapy.compat.chb(x) for x in pref_n])
608 return socket.inet_ntop(socket.AF_INET6, packed_pref_n)
610 def verify_ipfix_max_sessions(self, data, limit):
612 Verify IPFIX maximum session entries exceeded event
614 :param data: Decoded IPFIX data records
615 :param limit: Number of maximum session entries that can be created.
617 self.assertEqual(1, len(data))
620 self.assertEqual(scapy.compat.orb(record[230]), 13)
621 # natQuotaExceededEvent
622 self.assertEqual(struct.pack("I", 1), record[466])
624 self.assertEqual(struct.pack("I", limit), record[471])
626 def test_nat64_inside_interface_handles_neighbor_advertisement(self):
627 """NAT64 inside interface handles Neighbor Advertisement"""
629 flags = self.config_flags.NAT_IS_INSIDE
630 self.vapi.nat64_add_del_interface(
631 is_add=1, flags=flags, sw_if_index=self.pg5.sw_if_index
636 Ether(dst=self.pg5.local_mac, src=self.pg5.remote_mac)
637 / IPv6(src=self.pg5.remote_ip6, dst=self.pg5.local_ip6)
638 / ICMPv6EchoRequest()
641 self.pg5.add_stream(pkts)
642 self.pg_enable_capture(self.pg_interfaces)
645 # Wait for Neighbor Solicitation
646 capture = self.pg5.get_capture(len(pkts))
649 self.assertEqual(packet[IPv6].src, self.pg5.local_ip6)
650 self.assertEqual(packet.haslayer(ICMPv6ND_NS), 1)
651 tgt = packet[ICMPv6ND_NS].tgt
653 self.logger.error(ppp("Unexpected or invalid packet:", packet))
656 # Send Neighbor Advertisement
658 Ether(dst=self.pg5.local_mac, src=self.pg5.remote_mac)
659 / IPv6(src=self.pg5.remote_ip6, dst=self.pg5.local_ip6)
660 / ICMPv6ND_NA(tgt=tgt)
661 / ICMPv6NDOptDstLLAddr(lladdr=self.pg5.remote_mac)
664 self.pg5.add_stream(pkts)
665 self.pg_enable_capture(self.pg_interfaces)
668 # Try to send ping again
670 self.pg5.add_stream(pkts)
671 self.pg_enable_capture(self.pg_interfaces)
674 # Wait for ping reply
675 capture = self.pg5.get_capture(len(pkts))
678 self.assertEqual(packet[IPv6].src, self.pg5.local_ip6)
679 self.assertEqual(packet[IPv6].dst, self.pg5.remote_ip6)
680 self.assertEqual(packet.haslayer(ICMPv6EchoReply), 1)
682 self.logger.error(ppp("Unexpected or invalid packet:", packet))
686 """Add/delete address to NAT64 pool"""
689 self.vapi.nat64_add_del_pool_addr_range(
690 start_addr=nat_addr, end_addr=nat_addr, vrf_id=0xFFFFFFFF, is_add=1
693 addresses = self.vapi.nat64_pool_addr_dump()
694 self.assertEqual(len(addresses), 1)
695 self.assertEqual(str(addresses[0].address), nat_addr)
697 self.vapi.nat64_add_del_pool_addr_range(
698 start_addr=nat_addr, end_addr=nat_addr, vrf_id=0xFFFFFFFF, is_add=0
701 addresses = self.vapi.nat64_pool_addr_dump()
702 self.assertEqual(len(addresses), 0)
704 def test_interface(self):
705 """Enable/disable NAT64 feature on the interface"""
706 flags = self.config_flags.NAT_IS_INSIDE
707 self.vapi.nat64_add_del_interface(
708 is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
710 self.vapi.nat64_add_del_interface(
711 is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index
714 interfaces = self.vapi.nat64_interface_dump()
715 self.assertEqual(len(interfaces), 2)
718 for intf in interfaces:
719 if intf.sw_if_index == self.pg0.sw_if_index:
720 self.assertEqual(intf.flags, self.config_flags.NAT_IS_INSIDE)
722 elif intf.sw_if_index == self.pg1.sw_if_index:
723 self.assertEqual(intf.flags, self.config_flags.NAT_IS_OUTSIDE)
725 self.assertTrue(pg0_found)
726 self.assertTrue(pg1_found)
728 features = self.vapi.cli("show interface features pg0")
729 self.assertIn("nat64-in2out", features)
730 features = self.vapi.cli("show interface features pg1")
731 self.assertIn("nat64-out2in", features)
733 self.vapi.nat64_add_del_interface(
734 is_add=0, flags=flags, sw_if_index=self.pg0.sw_if_index
736 self.vapi.nat64_add_del_interface(
737 is_add=0, flags=flags, sw_if_index=self.pg1.sw_if_index
740 interfaces = self.vapi.nat64_interface_dump()
741 self.assertEqual(len(interfaces), 0)
743 def test_static_bib(self):
744 """Add/delete static BIB entry"""
745 in_addr = "2001:db8:85a3::8a2e:370:7334"
746 out_addr = "10.1.1.3"
749 proto = IP_PROTOS.tcp
751 self.vapi.nat64_add_del_static_bib(
760 bib = self.vapi.nat64_bib_dump(proto=IP_PROTOS.tcp)
763 if bibe.flags & self.config_flags.NAT_IS_STATIC:
765 self.assertEqual(str(bibe.i_addr), in_addr)
766 self.assertEqual(str(bibe.o_addr), out_addr)
767 self.assertEqual(bibe.i_port, in_port)
768 self.assertEqual(bibe.o_port, out_port)
769 self.assertEqual(static_bib_num, 1)
770 bibs = self.statistics.get_counter("/nat64/total-bibs")
771 self.assertEqual(bibs[0][0], 1)
773 self.vapi.nat64_add_del_static_bib(
782 bib = self.vapi.nat64_bib_dump(proto=IP_PROTOS.tcp)
785 if bibe.flags & self.config_flags.NAT_IS_STATIC:
787 self.assertEqual(static_bib_num, 0)
788 bibs = self.statistics.get_counter("/nat64/total-bibs")
789 self.assertEqual(bibs[0][0], 0)
791 def test_set_timeouts(self):
792 """Set NAT64 timeouts"""
793 # verify default values
794 timeouts = self.vapi.nat64_get_timeouts()
795 self.assertEqual(timeouts.udp, 300)
796 self.assertEqual(timeouts.icmp, 60)
797 self.assertEqual(timeouts.tcp_transitory, 240)
798 self.assertEqual(timeouts.tcp_established, 7440)
800 # set and verify custom values
801 self.vapi.nat64_set_timeouts(
802 udp=200, tcp_established=7450, tcp_transitory=250, icmp=30
804 timeouts = self.vapi.nat64_get_timeouts()
805 self.assertEqual(timeouts.udp, 200)
806 self.assertEqual(timeouts.icmp, 30)
807 self.assertEqual(timeouts.tcp_transitory, 250)
808 self.assertEqual(timeouts.tcp_established, 7450)
810 def test_dynamic(self):
811 """NAT64 dynamic translation test"""
812 self.tcp_port_in = 6303
813 self.udp_port_in = 6304
814 self.icmp_id_in = 6305
816 ses_num_start = self.nat64_get_ses_num()
818 self.vapi.nat64_add_del_pool_addr_range(
819 start_addr=self.nat_addr,
820 end_addr=self.nat_addr,
824 flags = self.config_flags.NAT_IS_INSIDE
825 self.vapi.nat64_add_del_interface(
826 is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
828 self.vapi.nat64_add_del_interface(
829 is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index
833 tcpn = self.statistics.get_counter("/nat64/in2out/tcp")[0]
834 udpn = self.statistics.get_counter("/nat64/in2out/udp")[0]
835 icmpn = self.statistics.get_counter("/nat64/in2out/icmp")[0]
836 drops = self.statistics.get_counter("/nat64/in2out/drops")[0]
838 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
839 self.pg0.add_stream(pkts)
840 self.pg_enable_capture(self.pg_interfaces)
842 capture = self.pg1.get_capture(len(pkts))
843 self.verify_capture_out(
844 capture, nat_ip=self.nat_addr, dst_ip=self.pg1.remote_ip4
847 if_idx = self.pg0.sw_if_index
848 cnt = self.statistics.get_counter("/nat64/in2out/tcp")[0]
849 self.assertEqual(cnt[if_idx] - tcpn[if_idx], 1)
850 cnt = self.statistics.get_counter("/nat64/in2out/udp")[0]
851 self.assertEqual(cnt[if_idx] - udpn[if_idx], 1)
852 cnt = self.statistics.get_counter("/nat64/in2out/icmp")[0]
853 self.assertEqual(cnt[if_idx] - icmpn[if_idx], 1)
854 cnt = self.statistics.get_counter("/nat64/in2out/drops")[0]
855 self.assertEqual(cnt[if_idx] - drops[if_idx], 0)
858 tcpn = self.statistics.get_counter("/nat64/out2in/tcp")[0]
859 udpn = self.statistics.get_counter("/nat64/out2in/udp")[0]
860 icmpn = self.statistics.get_counter("/nat64/out2in/icmp")[0]
861 drops = self.statistics.get_counter("/nat64/out2in/drops")[0]
863 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
864 self.pg1.add_stream(pkts)
865 self.pg_enable_capture(self.pg_interfaces)
867 capture = self.pg0.get_capture(len(pkts))
868 ip = IPv6(src="".join(["64:ff9b::", self.pg1.remote_ip4]))
869 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
871 if_idx = self.pg1.sw_if_index
872 cnt = self.statistics.get_counter("/nat64/out2in/tcp")[0]
873 self.assertEqual(cnt[if_idx] - tcpn[if_idx], 2)
874 cnt = self.statistics.get_counter("/nat64/out2in/udp")[0]
875 self.assertEqual(cnt[if_idx] - udpn[if_idx], 1)
876 cnt = self.statistics.get_counter("/nat64/out2in/icmp")[0]
877 self.assertEqual(cnt[if_idx] - icmpn[if_idx], 1)
878 cnt = self.statistics.get_counter("/nat64/out2in/drops")[0]
879 self.assertEqual(cnt[if_idx] - drops[if_idx], 0)
881 bibs = self.statistics.get_counter("/nat64/total-bibs")
882 self.assertEqual(bibs[0][0], 3)
883 sessions = self.statistics.get_counter("/nat64/total-sessions")
884 self.assertEqual(sessions[0][0], 3)
887 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
888 self.pg0.add_stream(pkts)
889 self.pg_enable_capture(self.pg_interfaces)
891 capture = self.pg1.get_capture(len(pkts))
892 self.verify_capture_out(
893 capture, nat_ip=self.nat_addr, dst_ip=self.pg1.remote_ip4
897 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
898 self.pg1.add_stream(pkts)
899 self.pg_enable_capture(self.pg_interfaces)
901 capture = self.pg0.get_capture(len(pkts))
902 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
904 ses_num_end = self.nat64_get_ses_num()
906 self.assertEqual(ses_num_end - ses_num_start, 3)
908 # tenant with specific VRF
909 self.vapi.nat64_add_del_pool_addr_range(
910 start_addr=self.vrf1_nat_addr,
911 end_addr=self.vrf1_nat_addr,
915 flags = self.config_flags.NAT_IS_INSIDE
916 self.vapi.nat64_add_del_interface(
917 is_add=1, flags=flags, sw_if_index=self.pg2.sw_if_index
920 pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
921 self.pg2.add_stream(pkts)
922 self.pg_enable_capture(self.pg_interfaces)
924 capture = self.pg1.get_capture(len(pkts))
925 self.verify_capture_out(
926 capture, nat_ip=self.vrf1_nat_addr, dst_ip=self.pg1.remote_ip4
929 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
930 self.pg1.add_stream(pkts)
931 self.pg_enable_capture(self.pg_interfaces)
933 capture = self.pg2.get_capture(len(pkts))
934 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
936 def test_static(self):
937 """NAT64 static translation test"""
938 self.tcp_port_in = 60303
939 self.udp_port_in = 60304
940 self.icmp_id_in = 60305
941 self.tcp_port_out = 60303
942 self.udp_port_out = 60304
943 self.icmp_id_out = 60305
945 ses_num_start = self.nat64_get_ses_num()
947 self.vapi.nat64_add_del_pool_addr_range(
948 start_addr=self.nat_addr,
949 end_addr=self.nat_addr,
953 flags = self.config_flags.NAT_IS_INSIDE
954 self.vapi.nat64_add_del_interface(
955 is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
957 self.vapi.nat64_add_del_interface(
958 is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index
961 self.vapi.nat64_add_del_static_bib(
962 i_addr=self.pg0.remote_ip6,
963 o_addr=self.nat_addr,
964 i_port=self.tcp_port_in,
965 o_port=self.tcp_port_out,
970 self.vapi.nat64_add_del_static_bib(
971 i_addr=self.pg0.remote_ip6,
972 o_addr=self.nat_addr,
973 i_port=self.udp_port_in,
974 o_port=self.udp_port_out,
979 self.vapi.nat64_add_del_static_bib(
980 i_addr=self.pg0.remote_ip6,
981 o_addr=self.nat_addr,
982 i_port=self.icmp_id_in,
983 o_port=self.icmp_id_out,
984 proto=IP_PROTOS.icmp,
990 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
991 self.pg0.add_stream(pkts)
992 self.pg_enable_capture(self.pg_interfaces)
994 capture = self.pg1.get_capture(len(pkts))
995 self.verify_capture_out(
996 capture, nat_ip=self.nat_addr, dst_ip=self.pg1.remote_ip4, same_port=True
1000 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
1001 self.pg1.add_stream(pkts)
1002 self.pg_enable_capture(self.pg_interfaces)
1004 capture = self.pg0.get_capture(len(pkts))
1005 ip = IPv6(src="".join(["64:ff9b::", self.pg1.remote_ip4]))
1006 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
1008 ses_num_end = self.nat64_get_ses_num()
1010 self.assertEqual(ses_num_end - ses_num_start, 3)
1012 def test_session_timeout(self):
1013 """NAT64 session timeout"""
1014 self.icmp_id_in = 1234
1015 self.vapi.nat64_add_del_pool_addr_range(
1016 start_addr=self.nat_addr,
1017 end_addr=self.nat_addr,
1021 flags = self.config_flags.NAT_IS_INSIDE
1022 self.vapi.nat64_add_del_interface(
1023 is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
1025 self.vapi.nat64_add_del_interface(
1026 is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index
1028 self.vapi.nat64_set_timeouts(
1029 udp=300, tcp_established=5, tcp_transitory=5, icmp=5
1032 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
1033 self.pg0.add_stream(pkts)
1034 self.pg_enable_capture(self.pg_interfaces)
1036 capture = self.pg1.get_capture(len(pkts))
1038 ses_num_before_timeout = self.nat64_get_ses_num()
1040 self.virtual_sleep(15)
1042 # ICMP and TCP session after timeout
1043 ses_num_after_timeout = self.nat64_get_ses_num()
1044 self.assertEqual(ses_num_before_timeout - ses_num_after_timeout, 2)
1046 def test_icmp_error(self):
1047 """NAT64 ICMP Error message translation"""
1048 self.tcp_port_in = 6303
1049 self.udp_port_in = 6304
1050 self.icmp_id_in = 6305
1052 self.vapi.nat64_add_del_pool_addr_range(
1053 start_addr=self.nat_addr,
1054 end_addr=self.nat_addr,
1058 flags = self.config_flags.NAT_IS_INSIDE
1059 self.vapi.nat64_add_del_interface(
1060 is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
1062 self.vapi.nat64_add_del_interface(
1063 is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index
1066 # send some packets to create sessions
1067 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
1068 self.pg0.add_stream(pkts)
1069 self.pg_enable_capture(self.pg_interfaces)
1071 capture_ip4 = self.pg1.get_capture(len(pkts))
1072 self.verify_capture_out(
1073 capture_ip4, nat_ip=self.nat_addr, dst_ip=self.pg1.remote_ip4
1076 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
1077 self.pg1.add_stream(pkts)
1078 self.pg_enable_capture(self.pg_interfaces)
1080 capture_ip6 = self.pg0.get_capture(len(pkts))
1081 ip = IPv6(src="".join(["64:ff9b::", self.pg1.remote_ip4]))
1082 self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src, self.pg0.remote_ip6)
1086 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1087 / IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src)
1088 / ICMPv6DestUnreach(code=1)
1090 for packet in capture_ip6
1092 self.pg0.add_stream(pkts)
1093 self.pg_enable_capture(self.pg_interfaces)
1095 capture = self.pg1.get_capture(len(pkts))
1096 for packet in capture:
1098 self.assertEqual(packet[IP].src, self.nat_addr)
1099 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1100 self.assertEqual(packet[ICMP].type, 3)
1101 self.assertEqual(packet[ICMP].code, 13)
1102 inner = packet[IPerror]
1103 self.assertEqual(inner.src, self.pg1.remote_ip4)
1104 self.assertEqual(inner.dst, self.nat_addr)
1105 self.assert_packet_checksums_valid(packet)
1106 if inner.haslayer(TCPerror):
1107 self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
1108 elif inner.haslayer(UDPerror):
1109 self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
1111 self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
1113 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1118 Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
1119 / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
1120 / ICMP(type=3, code=13)
1122 for packet in capture_ip4
1124 self.pg1.add_stream(pkts)
1125 self.pg_enable_capture(self.pg_interfaces)
1127 capture = self.pg0.get_capture(len(pkts))
1128 for packet in capture:
1130 self.assertEqual(packet[IPv6].src, ip.src)
1131 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
1132 icmp = packet[ICMPv6DestUnreach]
1133 self.assertEqual(icmp.code, 1)
1134 inner = icmp[IPerror6]
1135 self.assertEqual(inner.src, self.pg0.remote_ip6)
1136 self.assertEqual(inner.dst, ip.src)
1137 self.assert_icmpv6_checksum_valid(packet)
1138 if inner.haslayer(TCPerror):
1139 self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
1140 elif inner.haslayer(UDPerror):
1141 self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
1143 self.assertEqual(inner[ICMPv6EchoRequest].id, self.icmp_id_in)
1145 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1148 def test_hairpinning(self):
1149 """NAT64 hairpinning"""
1151 client = self.pg0.remote_hosts[0]
1152 server = self.pg0.remote_hosts[1]
1153 server_tcp_in_port = 22
1154 server_tcp_out_port = 4022
1155 server_udp_in_port = 23
1156 server_udp_out_port = 4023
1157 client_tcp_in_port = 1234
1158 client_udp_in_port = 1235
1159 client_tcp_out_port = 0
1160 client_udp_out_port = 0
1161 ip = IPv6(src="".join(["64:ff9b::", self.nat_addr]))
1162 nat_addr_ip6 = ip.src
1164 self.vapi.nat64_add_del_pool_addr_range(
1165 start_addr=self.nat_addr,
1166 end_addr=self.nat_addr,
1170 flags = self.config_flags.NAT_IS_INSIDE
1171 self.vapi.nat64_add_del_interface(
1172 is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
1174 self.vapi.nat64_add_del_interface(
1175 is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index
1178 self.vapi.nat64_add_del_static_bib(
1180 o_addr=self.nat_addr,
1181 i_port=server_tcp_in_port,
1182 o_port=server_tcp_out_port,
1183 proto=IP_PROTOS.tcp,
1187 self.vapi.nat64_add_del_static_bib(
1189 o_addr=self.nat_addr,
1190 i_port=server_udp_in_port,
1191 o_port=server_udp_out_port,
1192 proto=IP_PROTOS.udp,
1200 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1201 / IPv6(src=client.ip6, dst=nat_addr_ip6)
1202 / TCP(sport=client_tcp_in_port, dport=server_tcp_out_port)
1206 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1207 / IPv6(src=client.ip6, dst=nat_addr_ip6)
1208 / UDP(sport=client_udp_in_port, dport=server_udp_out_port)
1211 self.pg0.add_stream(pkts)
1212 self.pg_enable_capture(self.pg_interfaces)
1214 capture = self.pg0.get_capture(len(pkts))
1215 for packet in capture:
1217 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
1218 self.assertEqual(packet[IPv6].dst, server.ip6)
1219 self.assert_packet_checksums_valid(packet)
1220 if packet.haslayer(TCP):
1221 self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
1222 self.assertEqual(packet[TCP].dport, server_tcp_in_port)
1223 client_tcp_out_port = packet[TCP].sport
1225 self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
1226 self.assertEqual(packet[UDP].dport, server_udp_in_port)
1227 client_udp_out_port = packet[UDP].sport
1229 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1235 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1236 / IPv6(src=server.ip6, dst=nat_addr_ip6)
1237 / TCP(sport=server_tcp_in_port, dport=client_tcp_out_port)
1241 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1242 / IPv6(src=server.ip6, dst=nat_addr_ip6)
1243 / UDP(sport=server_udp_in_port, dport=client_udp_out_port)
1246 self.pg0.add_stream(pkts)
1247 self.pg_enable_capture(self.pg_interfaces)
1249 capture = self.pg0.get_capture(len(pkts))
1250 for packet in capture:
1252 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
1253 self.assertEqual(packet[IPv6].dst, client.ip6)
1254 self.assert_packet_checksums_valid(packet)
1255 if packet.haslayer(TCP):
1256 self.assertEqual(packet[TCP].sport, server_tcp_out_port)
1257 self.assertEqual(packet[TCP].dport, client_tcp_in_port)
1259 self.assertEqual(packet[UDP].sport, server_udp_out_port)
1260 self.assertEqual(packet[UDP].dport, client_udp_in_port)
1262 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1268 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1269 / IPv6(src=client.ip6, dst=nat_addr_ip6)
1270 / ICMPv6DestUnreach(code=1)
1272 for packet in capture
1274 self.pg0.add_stream(pkts)
1275 self.pg_enable_capture(self.pg_interfaces)
1277 capture = self.pg0.get_capture(len(pkts))
1278 for packet in capture:
1280 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
1281 self.assertEqual(packet[IPv6].dst, server.ip6)
1282 icmp = packet[ICMPv6DestUnreach]
1283 self.assertEqual(icmp.code, 1)
1284 inner = icmp[IPerror6]
1285 self.assertEqual(inner.src, server.ip6)
1286 self.assertEqual(inner.dst, nat_addr_ip6)
1287 self.assert_packet_checksums_valid(packet)
1288 if inner.haslayer(TCPerror):
1289 self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
1290 self.assertEqual(inner[TCPerror].dport, client_tcp_out_port)
1292 self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
1293 self.assertEqual(inner[UDPerror].dport, client_udp_out_port)
1295 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1298 def test_prefix(self):
1299 """NAT64 Network-Specific Prefix"""
1301 self.vapi.nat64_add_del_pool_addr_range(
1302 start_addr=self.nat_addr,
1303 end_addr=self.nat_addr,
1307 flags = self.config_flags.NAT_IS_INSIDE
1308 self.vapi.nat64_add_del_interface(
1309 is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
1311 self.vapi.nat64_add_del_interface(
1312 is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index
1314 self.vapi.nat64_add_del_pool_addr_range(
1315 start_addr=self.vrf1_nat_addr,
1316 end_addr=self.vrf1_nat_addr,
1317 vrf_id=self.vrf1_id,
1320 self.vapi.nat64_add_del_interface(
1321 is_add=1, flags=flags, sw_if_index=self.pg2.sw_if_index
1325 global_pref64 = "2001:db8::"
1326 global_pref64_len = 32
1327 global_pref64_str = "{}/{}".format(global_pref64, global_pref64_len)
1328 self.vapi.nat64_add_del_prefix(prefix=global_pref64_str, vrf_id=0, is_add=1)
1330 prefix = self.vapi.nat64_prefix_dump()
1331 self.assertEqual(len(prefix), 1)
1332 self.assertEqual(str(prefix[0].prefix), global_pref64_str)
1333 self.assertEqual(prefix[0].vrf_id, 0)
1335 # Add tenant specific prefix
1336 vrf1_pref64 = "2001:db8:122:300::"
1337 vrf1_pref64_len = 56
1338 vrf1_pref64_str = "{}/{}".format(vrf1_pref64, vrf1_pref64_len)
1339 self.vapi.nat64_add_del_prefix(
1340 prefix=vrf1_pref64_str, vrf_id=self.vrf1_id, is_add=1
1343 prefix = self.vapi.nat64_prefix_dump()
1344 self.assertEqual(len(prefix), 2)
1347 pkts = self.create_stream_in_ip6(
1348 self.pg0, self.pg1, pref=global_pref64, plen=global_pref64_len
1350 self.pg0.add_stream(pkts)
1351 self.pg_enable_capture(self.pg_interfaces)
1353 capture = self.pg1.get_capture(len(pkts))
1354 self.verify_capture_out(
1355 capture, nat_ip=self.nat_addr, dst_ip=self.pg1.remote_ip4
1358 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
1359 self.pg1.add_stream(pkts)
1360 self.pg_enable_capture(self.pg_interfaces)
1362 capture = self.pg0.get_capture(len(pkts))
1363 dst_ip = self.compose_ip6(self.pg1.remote_ip4, global_pref64, global_pref64_len)
1364 self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
1366 # Tenant specific prefix
1367 pkts = self.create_stream_in_ip6(
1368 self.pg2, self.pg1, pref=vrf1_pref64, plen=vrf1_pref64_len
1370 self.pg2.add_stream(pkts)
1371 self.pg_enable_capture(self.pg_interfaces)
1373 capture = self.pg1.get_capture(len(pkts))
1374 self.verify_capture_out(
1375 capture, nat_ip=self.vrf1_nat_addr, dst_ip=self.pg1.remote_ip4
1378 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
1379 self.pg1.add_stream(pkts)
1380 self.pg_enable_capture(self.pg_interfaces)
1382 capture = self.pg2.get_capture(len(pkts))
1383 dst_ip = self.compose_ip6(self.pg1.remote_ip4, vrf1_pref64, vrf1_pref64_len)
1384 self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
1386 def test_unknown_proto(self):
1387 """NAT64 translate packet with unknown protocol"""
1389 self.vapi.nat64_add_del_pool_addr_range(
1390 start_addr=self.nat_addr,
1391 end_addr=self.nat_addr,
1395 flags = self.config_flags.NAT_IS_INSIDE
1396 self.vapi.nat64_add_del_interface(
1397 is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
1399 self.vapi.nat64_add_del_interface(
1400 is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index
1402 remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, "64:ff9b::", 96)
1406 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1407 / IPv6(src=self.pg0.remote_ip6, dst=remote_ip6)
1408 / TCP(sport=self.tcp_port_in, dport=20)
1410 self.pg0.add_stream(p)
1411 self.pg_enable_capture(self.pg_interfaces)
1413 p = self.pg1.get_capture(1)
1416 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1417 / IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47)
1419 / IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4)
1420 / TCP(sport=1234, dport=1234)
1422 self.pg0.add_stream(p)
1423 self.pg_enable_capture(self.pg_interfaces)
1425 p = self.pg1.get_capture(1)
1428 self.assertEqual(packet[IP].src, self.nat_addr)
1429 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1430 self.assertEqual(packet.haslayer(GRE), 1)
1431 self.assert_packet_checksums_valid(packet)
1433 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1438 Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
1439 / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
1441 / IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4)
1442 / TCP(sport=1234, dport=1234)
1444 self.pg1.add_stream(p)
1445 self.pg_enable_capture(self.pg_interfaces)
1447 p = self.pg0.get_capture(1)
1450 self.assertEqual(packet[IPv6].src, remote_ip6)
1451 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
1452 self.assertEqual(packet[IPv6].nh, 47)
1454 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1457 def test_hairpinning_unknown_proto(self):
1458 """NAT64 translate packet with unknown protocol - hairpinning"""
1460 client = self.pg0.remote_hosts[0]
1461 server = self.pg0.remote_hosts[1]
1462 server_tcp_in_port = 22
1463 server_tcp_out_port = 4022
1464 client_tcp_in_port = 1234
1465 client_tcp_out_port = 1235
1466 server_nat_ip = "10.0.0.100"
1467 client_nat_ip = "10.0.0.110"
1468 server_nat_ip6 = self.compose_ip6(server_nat_ip, "64:ff9b::", 96)
1469 client_nat_ip6 = self.compose_ip6(client_nat_ip, "64:ff9b::", 96)
1471 self.vapi.nat64_add_del_pool_addr_range(
1472 start_addr=server_nat_ip,
1473 end_addr=client_nat_ip,
1477 flags = self.config_flags.NAT_IS_INSIDE
1478 self.vapi.nat64_add_del_interface(
1479 is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
1481 self.vapi.nat64_add_del_interface(
1482 is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index
1485 self.vapi.nat64_add_del_static_bib(
1487 o_addr=server_nat_ip,
1488 i_port=server_tcp_in_port,
1489 o_port=server_tcp_out_port,
1490 proto=IP_PROTOS.tcp,
1495 self.vapi.nat64_add_del_static_bib(
1497 o_addr=server_nat_ip,
1500 proto=IP_PROTOS.gre,
1505 self.vapi.nat64_add_del_static_bib(
1507 o_addr=client_nat_ip,
1508 i_port=client_tcp_in_port,
1509 o_port=client_tcp_out_port,
1510 proto=IP_PROTOS.tcp,
1517 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1518 / IPv6(src=client.ip6, dst=server_nat_ip6)
1519 / TCP(sport=client_tcp_in_port, dport=server_tcp_out_port)
1521 self.pg0.add_stream(p)
1522 self.pg_enable_capture(self.pg_interfaces)
1524 p = self.pg0.get_capture(1)
1527 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1528 / IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre)
1530 / IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4)
1531 / TCP(sport=1234, dport=1234)
1533 self.pg0.add_stream(p)
1534 self.pg_enable_capture(self.pg_interfaces)
1536 p = self.pg0.get_capture(1)
1539 self.assertEqual(packet[IPv6].src, client_nat_ip6)
1540 self.assertEqual(packet[IPv6].dst, server.ip6)
1541 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
1543 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1548 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1549 / IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre)
1551 / IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4)
1552 / TCP(sport=1234, dport=1234)
1554 self.pg0.add_stream(p)
1555 self.pg_enable_capture(self.pg_interfaces)
1557 p = self.pg0.get_capture(1)
1560 self.assertEqual(packet[IPv6].src, server_nat_ip6)
1561 self.assertEqual(packet[IPv6].dst, client.ip6)
1562 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
1564 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1567 def test_one_armed_nat64(self):
1568 """One armed NAT64"""
1570 remote_host_ip6 = self.compose_ip6(self.pg3.remote_ip4, "64:ff9b::", 96)
1572 self.vapi.nat64_add_del_pool_addr_range(
1573 start_addr=self.nat_addr,
1574 end_addr=self.nat_addr,
1578 flags = self.config_flags.NAT_IS_INSIDE
1579 self.vapi.nat64_add_del_interface(
1580 is_add=1, flags=flags, sw_if_index=self.pg3.sw_if_index
1582 self.vapi.nat64_add_del_interface(
1583 is_add=1, flags=0, sw_if_index=self.pg3.sw_if_index
1588 Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac)
1589 / IPv6(src=self.pg3.remote_ip6, dst=remote_host_ip6)
1590 / TCP(sport=12345, dport=80)
1592 self.pg3.add_stream(p)
1593 self.pg_enable_capture(self.pg_interfaces)
1595 capture = self.pg3.get_capture(1)
1600 self.assertEqual(ip.src, self.nat_addr)
1601 self.assertEqual(ip.dst, self.pg3.remote_ip4)
1602 self.assertNotEqual(tcp.sport, 12345)
1603 external_port = tcp.sport
1604 self.assertEqual(tcp.dport, 80)
1605 self.assert_packet_checksums_valid(p)
1607 self.logger.error(ppp("Unexpected or invalid packet:", p))
1612 Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac)
1613 / IP(src=self.pg3.remote_ip4, dst=self.nat_addr)
1614 / TCP(sport=80, dport=external_port)
1616 self.pg3.add_stream(p)
1617 self.pg_enable_capture(self.pg_interfaces)
1619 capture = self.pg3.get_capture(1)
1624 self.assertEqual(ip.src, remote_host_ip6)
1625 self.assertEqual(ip.dst, self.pg3.remote_ip6)
1626 self.assertEqual(tcp.sport, 80)
1627 self.assertEqual(tcp.dport, 12345)
1628 self.assert_packet_checksums_valid(p)
1630 self.logger.error(ppp("Unexpected or invalid packet:", p))
1633 def test_frag_in_order(self):
1634 """NAT64 translate fragments arriving in order"""
1635 self.tcp_port_in = random.randint(1025, 65535)
1637 self.vapi.nat64_add_del_pool_addr_range(
1638 start_addr=self.nat_addr,
1639 end_addr=self.nat_addr,
1643 flags = self.config_flags.NAT_IS_INSIDE
1644 self.vapi.nat64_add_del_interface(
1645 is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
1647 self.vapi.nat64_add_del_interface(
1648 is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index
1653 pkts = self.create_stream_frag_ip6(
1654 self.pg0, self.pg1.remote_ip4, self.tcp_port_in, 20, data
1656 self.pg0.add_stream(pkts)
1657 self.pg_enable_capture(self.pg_interfaces)
1659 frags = self.pg1.get_capture(len(pkts))
1660 p = self.reass_frags_and_verify(frags, self.nat_addr, self.pg1.remote_ip4)
1661 self.assertEqual(p[TCP].dport, 20)
1662 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
1663 self.tcp_port_out = p[TCP].sport
1664 self.assertEqual(data, p[Raw].load)
1667 data = b"A" * 4 + b"b" * 16 + b"C" * 3
1668 pkts = self.create_stream_frag(
1669 self.pg1, self.nat_addr, 20, self.tcp_port_out, data
1671 self.pg1.add_stream(pkts)
1672 self.pg_enable_capture(self.pg_interfaces)
1674 frags = self.pg0.get_capture(len(pkts))
1675 self.logger.debug(ppc("Captured:", frags))
1676 src = self.compose_ip6(self.pg1.remote_ip4, "64:ff9b::", 96)
1677 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
1678 self.assertEqual(p[TCP].sport, 20)
1679 self.assertEqual(p[TCP].dport, self.tcp_port_in)
1680 self.assertEqual(data, p[Raw].load)
1682 def test_reass_hairpinning(self):
1683 """NAT64 fragments hairpinning"""
1685 server = self.pg0.remote_hosts[1]
1686 server_in_port = random.randint(1025, 65535)
1687 server_out_port = random.randint(1025, 65535)
1688 client_in_port = random.randint(1025, 65535)
1689 ip = IPv6(src="".join(["64:ff9b::", self.nat_addr]))
1690 nat_addr_ip6 = ip.src
1692 self.vapi.nat64_add_del_pool_addr_range(
1693 start_addr=self.nat_addr,
1694 end_addr=self.nat_addr,
1698 flags = self.config_flags.NAT_IS_INSIDE
1699 self.vapi.nat64_add_del_interface(
1700 is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
1702 self.vapi.nat64_add_del_interface(
1703 is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index
1706 # add static BIB entry for server
1707 self.vapi.nat64_add_del_static_bib(
1709 o_addr=self.nat_addr,
1710 i_port=server_in_port,
1711 o_port=server_out_port,
1712 proto=IP_PROTOS.tcp,
1717 # send packet from host to server
1718 pkts = self.create_stream_frag_ip6(
1719 self.pg0, self.nat_addr, client_in_port, server_out_port, data
1721 self.pg0.add_stream(pkts)
1722 self.pg_enable_capture(self.pg_interfaces)
1724 frags = self.pg0.get_capture(len(pkts))
1725 self.logger.debug(ppc("Captured:", frags))
1726 p = self.reass_frags_and_verify_ip6(frags, nat_addr_ip6, server.ip6)
1727 self.assertNotEqual(p[TCP].sport, client_in_port)
1728 self.assertEqual(p[TCP].dport, server_in_port)
1729 self.assertEqual(data, p[Raw].load)
1731 def test_frag_out_of_order(self):
1732 """NAT64 translate fragments arriving out of order"""
1733 self.tcp_port_in = random.randint(1025, 65535)
1735 self.vapi.nat64_add_del_pool_addr_range(
1736 start_addr=self.nat_addr,
1737 end_addr=self.nat_addr,
1741 flags = self.config_flags.NAT_IS_INSIDE
1742 self.vapi.nat64_add_del_interface(
1743 is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
1745 self.vapi.nat64_add_del_interface(
1746 is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index
1751 pkts = self.create_stream_frag_ip6(
1752 self.pg0, self.pg1.remote_ip4, self.tcp_port_in, 20, data
1755 self.pg0.add_stream(pkts)
1756 self.pg_enable_capture(self.pg_interfaces)
1758 frags = self.pg1.get_capture(len(pkts))
1759 p = self.reass_frags_and_verify(frags, self.nat_addr, self.pg1.remote_ip4)
1760 self.assertEqual(p[TCP].dport, 20)
1761 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
1762 self.tcp_port_out = p[TCP].sport
1763 self.assertEqual(data, p[Raw].load)
1766 data = b"A" * 4 + b"B" * 16 + b"C" * 3
1767 pkts = self.create_stream_frag(
1768 self.pg1, self.nat_addr, 20, self.tcp_port_out, data
1771 self.pg1.add_stream(pkts)
1772 self.pg_enable_capture(self.pg_interfaces)
1774 frags = self.pg0.get_capture(len(pkts))
1775 src = self.compose_ip6(self.pg1.remote_ip4, "64:ff9b::", 96)
1776 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
1777 self.assertEqual(p[TCP].sport, 20)
1778 self.assertEqual(p[TCP].dport, self.tcp_port_in)
1779 self.assertEqual(data, p[Raw].load)
1781 def test_interface_addr(self):
1782 """Acquire NAT64 pool addresses from interface"""
1783 self.vapi.nat64_add_del_interface_addr(
1784 is_add=1, sw_if_index=self.pg4.sw_if_index
1787 # no address in NAT64 pool
1788 addresses = self.vapi.nat44_address_dump()
1789 self.assertEqual(0, len(addresses))
1791 # configure interface address and check NAT64 address pool
1792 self.pg4.config_ip4()
1793 addresses = self.vapi.nat64_pool_addr_dump()
1794 self.assertEqual(len(addresses), 1)
1796 self.assertEqual(str(addresses[0].address), self.pg4.local_ip4)
1798 # remove interface address and check NAT64 address pool
1799 self.pg4.unconfig_ip4()
1800 addresses = self.vapi.nat64_pool_addr_dump()
1801 self.assertEqual(0, len(addresses))
1803 @unittest.skipUnless(config.extended, "part of extended tests")
1804 def test_ipfix_max_bibs_sessions(self):
1805 """IPFIX logging maximum session and BIB entries exceeded"""
1808 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4, "64:ff9b::", 96)
1810 self.vapi.nat64_add_del_pool_addr_range(
1811 start_addr=self.nat_addr,
1812 end_addr=self.nat_addr,
1816 flags = self.config_flags.NAT_IS_INSIDE
1817 self.vapi.nat64_add_del_interface(
1818 is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
1820 self.vapi.nat64_add_del_interface(
1821 is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index
1826 for i in range(0, max_bibs):
1827 src = "fd01:aa::%x" % (i)
1829 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1830 / IPv6(src=src, dst=remote_host_ip6)
1831 / TCP(sport=12345, dport=80)
1835 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1836 / IPv6(src=src, dst=remote_host_ip6)
1837 / TCP(sport=12345, dport=22)
1840 self.pg0.add_stream(pkts)
1841 self.pg_enable_capture(self.pg_interfaces)
1843 self.pg1.get_capture(max_sessions)
1845 self.vapi.set_ipfix_exporter(
1846 collector_address=self.pg3.remote_ip4,
1847 src_address=self.pg3.local_ip4,
1849 template_interval=10,
1851 self.vapi.nat_ipfix_enable_disable(
1852 domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port, enable=1
1856 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1857 / IPv6(src=src, dst=remote_host_ip6)
1858 / TCP(sport=12345, dport=25)
1860 self.pg0.add_stream(p)
1861 self.pg_enable_capture(self.pg_interfaces)
1863 self.pg1.assert_nothing_captured()
1864 self.vapi.ipfix_flush()
1865 capture = self.pg3.get_capture(7)
1866 ipfix = IPFIXDecoder()
1867 # first load template
1869 self.assertTrue(p.haslayer(IPFIX))
1870 self.assertEqual(p[IP].src, self.pg3.local_ip4)
1871 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1872 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1873 self.assertEqual(p[UDP].dport, 4739)
1874 self.assertEqual(p[IPFIX].observationDomainID, self.ipfix_domain_id)
1875 if p.haslayer(Template):
1876 ipfix.add_template(p.getlayer(Template))
1877 # verify events in data set
1879 if p.haslayer(Data):
1880 data = ipfix.decode_data_set(p.getlayer(Set))
1881 self.verify_ipfix_max_sessions(data, max_sessions)
1884 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1885 / IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6)
1886 / TCP(sport=12345, dport=80)
1888 self.pg0.add_stream(p)
1889 self.pg_enable_capture(self.pg_interfaces)
1891 self.pg1.assert_nothing_captured()
1892 self.vapi.ipfix_flush()
1893 capture = self.pg3.get_capture(1)
1894 # verify events in data set
1896 self.assertTrue(p.haslayer(IPFIX))
1897 self.assertEqual(p[IP].src, self.pg3.local_ip4)
1898 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1899 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1900 self.assertEqual(p[UDP].dport, 4739)
1901 self.assertEqual(p[IPFIX].observationDomainID, self.ipfix_domain_id)
1902 if p.haslayer(Data):
1903 data = ipfix.decode_data_set(p.getlayer(Set))
1904 self.verify_ipfix_max_bibs(data, max_bibs)
1906 def test_ipfix_bib_ses(self):
1907 """IPFIX logging NAT64 BIB/session create and delete events"""
1908 self.tcp_port_in = random.randint(1025, 65535)
1909 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4, "64:ff9b::", 96)
1911 self.vapi.nat64_add_del_pool_addr_range(
1912 start_addr=self.nat_addr,
1913 end_addr=self.nat_addr,
1917 flags = self.config_flags.NAT_IS_INSIDE
1918 self.vapi.nat64_add_del_interface(
1919 is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
1921 self.vapi.nat64_add_del_interface(
1922 is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index
1924 self.vapi.set_ipfix_exporter(
1925 collector_address=self.pg3.remote_ip4,
1926 src_address=self.pg3.local_ip4,
1928 template_interval=10,
1930 self.vapi.nat_ipfix_enable_disable(
1931 domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port, enable=1
1936 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1937 / IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6)
1938 / TCP(sport=self.tcp_port_in, dport=25)
1940 self.pg0.add_stream(p)
1941 self.pg_enable_capture(self.pg_interfaces)
1943 p = self.pg1.get_capture(1)
1944 self.tcp_port_out = p[0][TCP].sport
1945 self.vapi.ipfix_flush()
1946 capture = self.pg3.get_capture(8)
1947 ipfix = IPFIXDecoder()
1948 # first load template
1950 self.assertTrue(p.haslayer(IPFIX))
1951 self.assertEqual(p[IP].src, self.pg3.local_ip4)
1952 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1953 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1954 self.assertEqual(p[UDP].dport, 4739)
1955 self.assertEqual(p[IPFIX].observationDomainID, self.ipfix_domain_id)
1956 if p.haslayer(Template):
1957 ipfix.add_template(p.getlayer(Template))
1958 # verify events in data set
1960 if p.haslayer(Data):
1961 data = ipfix.decode_data_set(p.getlayer(Set))
1962 if scapy.compat.orb(data[0][230]) == 10:
1963 self.verify_ipfix_bib(data, 1, self.pg0.remote_ip6)
1964 elif scapy.compat.orb(data[0][230]) == 6:
1965 self.verify_ipfix_nat64_ses(
1966 data, 1, self.pg0.remote_ip6, self.pg1.remote_ip4, 25
1969 self.logger.error(ppp("Unexpected or invalid packet: ", p))
1972 self.pg_enable_capture(self.pg_interfaces)
1973 self.vapi.nat64_add_del_pool_addr_range(
1974 start_addr=self.nat_addr,
1975 end_addr=self.nat_addr,
1979 self.vapi.ipfix_flush()
1980 capture = self.pg3.get_capture(2)
1981 # verify events in data set
1983 self.assertTrue(p.haslayer(IPFIX))
1984 self.assertEqual(p[IP].src, self.pg3.local_ip4)
1985 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1986 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1987 self.assertEqual(p[UDP].dport, 4739)
1988 self.assertEqual(p[IPFIX].observationDomainID, self.ipfix_domain_id)
1989 if p.haslayer(Data):
1990 data = ipfix.decode_data_set(p.getlayer(Set))
1991 if scapy.compat.orb(data[0][230]) == 11:
1992 self.verify_ipfix_bib(data, 0, self.pg0.remote_ip6)
1993 elif scapy.compat.orb(data[0][230]) == 7:
1994 self.verify_ipfix_nat64_ses(
1995 data, 0, self.pg0.remote_ip6, self.pg1.remote_ip4, 25
1998 self.logger.error(ppp("Unexpected or invalid packet: ", p))
2000 def test_syslog_sess(self):
2001 """Test syslog session creation and deletion"""
2002 self.tcp_port_in = random.randint(1025, 65535)
2003 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4, "64:ff9b::", 96)
2005 self.vapi.nat64_add_del_pool_addr_range(
2006 start_addr=self.nat_addr,
2007 end_addr=self.nat_addr,
2011 flags = self.config_flags.NAT_IS_INSIDE
2012 self.vapi.nat64_add_del_interface(
2013 is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
2015 self.vapi.nat64_add_del_interface(
2016 is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index
2018 self.vapi.syslog_set_filter(self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_INFO)
2019 self.vapi.syslog_set_sender(self.pg3.local_ip4, self.pg3.remote_ip4)
2022 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2023 / IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6)
2024 / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port)
2026 self.pg0.add_stream(p)
2027 self.pg_enable_capture(self.pg_interfaces)
2029 p = self.pg1.get_capture(1)
2030 self.tcp_port_out = p[0][TCP].sport
2031 capture = self.pg3.get_capture(1)
2032 self.verify_syslog_sess(capture[0][Raw].load, is_ip6=True)
2034 self.pg_enable_capture(self.pg_interfaces)
2036 self.vapi.nat64_add_del_pool_addr_range(
2037 start_addr=self.nat_addr,
2038 end_addr=self.nat_addr,
2042 capture = self.pg3.get_capture(1)
2043 self.verify_syslog_sess(capture[0][Raw].load, False, True)
2045 def nat64_get_ses_num(self):
2047 Return number of active NAT64 sessions.
2049 st = self.vapi.nat64_st_dump(proto=255)
2052 def clear_nat64(self):
2054 Clear NAT64 configuration.
2056 self.vapi.nat_ipfix_enable_disable(
2057 domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port, enable=0
2059 self.ipfix_src_port = 4739
2060 self.ipfix_domain_id = 1
2062 self.vapi.syslog_set_filter(self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_EMERG)
2064 self.vapi.nat64_set_timeouts(
2065 udp=300, tcp_established=7440, tcp_transitory=240, icmp=60
2068 interfaces = self.vapi.nat64_interface_dump()
2069 for intf in interfaces:
2070 self.vapi.nat64_add_del_interface(
2071 is_add=0, flags=intf.flags, sw_if_index=intf.sw_if_index
2074 bib = self.vapi.nat64_bib_dump(proto=255)
2076 if bibe.flags & self.config_flags.NAT_IS_STATIC:
2077 self.vapi.nat64_add_del_static_bib(
2087 adresses = self.vapi.nat64_pool_addr_dump()
2088 for addr in adresses:
2089 self.vapi.nat64_add_del_pool_addr_range(
2090 start_addr=addr.address,
2091 end_addr=addr.address,
2096 prefixes = self.vapi.nat64_prefix_dump()
2097 for prefix in prefixes:
2098 self.vapi.nat64_add_del_prefix(
2099 prefix=str(prefix.prefix), vrf_id=prefix.vrf_id, is_add=0
2102 bibs = self.statistics.get_counter("/nat64/total-bibs")
2103 self.assertEqual(bibs[0][0], 0)
2104 sessions = self.statistics.get_counter("/nat64/total-sessions")
2105 self.assertEqual(sessions[0][0], 0)
2108 if __name__ == "__main__":
2109 unittest.main(testRunner=VppTestRunner)