11 from config import config
12 from framework import VppTestCase
13 from asfframework import (
14 tag_fixme_vpp_workers,
19 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
20 from scapy.data import IP_PROTOS
21 from scapy.layers.inet import IP, TCP, UDP, ICMP
22 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
23 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
24 from scapy.layers.inet6 import (
33 from scapy.layers.l2 import Ether, GRE
34 from scapy.packet import Raw
35 from syslog_rfc5424_parser import SyslogMessage, ParseError
36 from syslog_rfc5424_parser.constants import SyslogSeverity
37 from util import ppc, ppp
38 from vpp_papi import VppEnum
41 @tag_fixme_vpp_workers
43 class TestNAT64(VppTestCase):
44 """NAT64 Test Cases"""
47 def SYSLOG_SEVERITY(self):
48 return VppEnum.vl_api_syslog_severity_t
51 def config_flags(self):
52 return VppEnum.vl_api_nat_config_flags_t
56 super(TestNAT64, cls).setUpClass()
58 if is_distro_ubuntu2204 == True and not hasattr(cls, "vpp"):
60 cls.tcp_port_in = 6303
61 cls.tcp_port_out = 6303
62 cls.udp_port_in = 6304
63 cls.udp_port_out = 6304
65 cls.icmp_id_out = 6305
66 cls.tcp_external_port = 80
67 cls.nat_addr = "10.0.0.3"
68 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
70 cls.vrf1_nat_addr = "10.0.10.3"
71 cls.ipfix_src_port = 4739
72 cls.ipfix_domain_id = 1
74 cls.create_pg_interfaces(range(6))
75 cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
76 cls.ip6_interfaces.append(cls.pg_interfaces[2])
77 cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
79 cls.vapi.ip_table_add_del(
80 is_add=1, table={"table_id": cls.vrf1_id, "is_ip6": 1}
83 cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
85 cls.pg0.generate_remote_hosts(2)
87 for i in cls.ip6_interfaces:
90 i.configure_ipv6_neighbors()
92 for i in cls.ip4_interfaces:
101 cls.pg3.configure_ipv6_neighbors()
107 def tearDownClass(cls):
108 super(TestNAT64, cls).tearDownClass()
111 super(TestNAT64, self).setUp()
112 self.vapi.nat64_plugin_enable_disable(enable=1, bib_buckets=128, st_buckets=256)
115 super(TestNAT64, self).tearDown()
116 if not self.vpp_dead:
117 self.vapi.nat64_plugin_enable_disable(enable=0)
119 def show_commands_at_teardown(self):
120 self.logger.info(self.vapi.cli("show nat64 pool"))
121 self.logger.info(self.vapi.cli("show nat64 interfaces"))
122 self.logger.info(self.vapi.cli("show nat64 prefix"))
123 self.logger.info(self.vapi.cli("show nat64 bib all"))
124 self.logger.info(self.vapi.cli("show nat64 session table all"))
126 def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
128 Create IPv6 packet stream for inside network
130 :param in_if: Inside interface
131 :param out_if: Outside interface
132 :param ttl: Hop Limit of generated packets
133 :param pref: NAT64 prefix
134 :param plen: NAT64 prefix length
138 dst = "".join(["64:ff9b::", out_if.remote_ip4])
140 dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
144 Ether(dst=in_if.local_mac, src=in_if.remote_mac)
145 / IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim)
146 / TCP(sport=self.tcp_port_in, dport=20)
152 Ether(dst=in_if.local_mac, src=in_if.remote_mac)
153 / IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim)
154 / UDP(sport=self.udp_port_in, dport=20)
160 Ether(dst=in_if.local_mac, src=in_if.remote_mac)
161 / IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim)
162 / ICMPv6EchoRequest(id=self.icmp_id_in)
168 def create_stream_out(self, out_if, dst_ip=None, ttl=64, use_inside_ports=False):
170 Create packet stream for outside network
172 :param out_if: Outside interface
173 :param dst_ip: Destination IP address (Default use global NAT address)
174 :param ttl: TTL of generated packets
175 :param use_inside_ports: Use inside NAT ports as destination ports
176 instead of outside ports
179 dst_ip = self.nat_addr
180 if not use_inside_ports:
181 tcp_port = self.tcp_port_out
182 udp_port = self.udp_port_out
183 icmp_id = self.icmp_id_out
185 tcp_port = self.tcp_port_in
186 udp_port = self.udp_port_in
187 icmp_id = self.icmp_id_in
191 Ether(dst=out_if.local_mac, src=out_if.remote_mac)
192 / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl)
193 / TCP(dport=tcp_port, sport=20)
199 Ether(dst=out_if.local_mac, src=out_if.remote_mac)
200 / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl)
201 / UDP(dport=udp_port, sport=20)
207 Ether(dst=out_if.local_mac, src=out_if.remote_mac)
208 / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl)
209 / ICMP(id=icmp_id, type="echo-reply")
215 def verify_capture_out(
225 Verify captured packets on outside network
227 :param capture: Captured packets
228 :param nat_ip: Translated IP address (Default use global NAT address)
229 :param same_port: Source port number is not translated (Default False)
230 :param dst_ip: Destination IP address (Default do not verify)
231 :param is_ip6: If L3 protocol is IPv6 (Default False)
235 ICMP46 = ICMPv6EchoRequest
240 nat_ip = self.nat_addr
241 for packet in capture:
244 self.assert_packet_checksums_valid(packet)
245 self.assertEqual(packet[IP46].src, nat_ip)
246 if dst_ip is not None:
247 self.assertEqual(packet[IP46].dst, dst_ip)
248 if packet.haslayer(TCP):
251 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
253 self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
254 self.tcp_port_out = packet[TCP].sport
255 self.assert_packet_checksums_valid(packet)
256 elif packet.haslayer(UDP):
259 self.assertEqual(packet[UDP].sport, self.udp_port_in)
261 self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
262 self.udp_port_out = packet[UDP].sport
266 self.assertEqual(packet[ICMP46].id, self.icmp_id_in)
268 self.assertNotEqual(packet[ICMP46].id, self.icmp_id_in)
269 self.icmp_id_out = packet[ICMP46].id
270 self.assert_packet_checksums_valid(packet)
273 ppp("Unexpected or invalid packet (outside network):", packet)
277 def verify_capture_in_ip6(self, capture, src_ip, dst_ip):
279 Verify captured IPv6 packets on inside network
281 :param capture: Captured packets
282 :param src_ip: Source IP
283 :param dst_ip: Destination IP address
285 for packet in capture:
287 self.assertEqual(packet[IPv6].src, src_ip)
288 self.assertEqual(packet[IPv6].dst, dst_ip)
289 self.assert_packet_checksums_valid(packet)
290 if packet.haslayer(TCP):
291 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
292 elif packet.haslayer(UDP):
293 self.assertEqual(packet[UDP].dport, self.udp_port_in)
295 self.assertEqual(packet[ICMPv6EchoReply].id, self.icmp_id_in)
298 ppp("Unexpected or invalid packet (inside network):", packet)
302 def create_stream_frag(
303 self, src_if, dst, sport, dport, data, proto=IP_PROTOS.tcp, echo_reply=False
306 Create fragmented packet stream
308 :param src_if: Source interface
309 :param dst: Destination IPv4 address
310 :param sport: Source port
311 :param dport: Destination port
312 :param data: Payload data
313 :param proto: protocol (TCP, UDP, ICMP)
314 :param echo_reply: use echo_reply if protocol is ICMP
317 if proto == IP_PROTOS.tcp:
319 IP(src=src_if.remote_ip4, dst=dst)
320 / TCP(sport=sport, dport=dport)
323 p = p.__class__(scapy.compat.raw(p))
324 chksum = p[TCP].chksum
325 proto_header = TCP(sport=sport, dport=dport, chksum=chksum)
326 elif proto == IP_PROTOS.udp:
327 proto_header = UDP(sport=sport, dport=dport)
328 elif proto == IP_PROTOS.icmp:
330 proto_header = ICMP(id=sport, type="echo-request")
332 proto_header = ICMP(id=sport, type="echo-reply")
334 raise Exception("Unsupported protocol")
335 id = random.randint(0, 65535)
337 if proto == IP_PROTOS.tcp:
340 raw = Raw(data[0:16])
342 Ether(src=src_if.remote_mac, dst=src_if.local_mac)
343 / IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id)
348 if proto == IP_PROTOS.tcp:
349 raw = Raw(data[4:20])
351 raw = Raw(data[16:32])
353 Ether(src=src_if.remote_mac, dst=src_if.local_mac)
354 / IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id, proto=proto)
358 if proto == IP_PROTOS.tcp:
363 Ether(src=src_if.remote_mac, dst=src_if.local_mac)
364 / IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=proto, id=id)
370 def create_stream_frag_ip6(
371 self, src_if, dst, sport, dport, data, pref=None, plen=0, frag_size=128
374 Create fragmented packet stream
376 :param src_if: Source interface
377 :param dst: Destination IPv4 address
378 :param sport: Source TCP port
379 :param dport: Destination TCP port
380 :param data: Payload data
381 :param pref: NAT64 prefix
382 :param plen: NAT64 prefix length
383 :param fragsize: size of fragments
387 dst_ip6 = "".join(["64:ff9b::", dst])
389 dst_ip6 = self.compose_ip6(dst, pref, plen)
392 Ether(dst=src_if.local_mac, src=src_if.remote_mac)
393 / IPv6(src=src_if.remote_ip6, dst=dst_ip6)
394 / IPv6ExtHdrFragment(id=random.randint(0, 65535))
395 / TCP(sport=sport, dport=dport)
399 return fragment6(p, frag_size)
401 def reass_frags_and_verify(self, frags, src, dst):
403 Reassemble and verify fragmented packet
405 :param frags: Captured fragments
406 :param src: Source IPv4 address to verify
407 :param dst: Destination IPv4 address to verify
409 :returns: Reassembled IPv4 packet
413 self.assertEqual(p[IP].src, src)
414 self.assertEqual(p[IP].dst, dst)
415 self.assert_ip_checksum_valid(p)
416 buffer.seek(p[IP].frag * 8)
417 buffer.write(bytes(p[IP].payload))
418 ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst, proto=frags[0][IP].proto)
419 if ip.proto == IP_PROTOS.tcp:
420 p = ip / TCP(buffer.getvalue())
421 self.logger.debug(ppp("Reassembled:", p))
422 self.assert_tcp_checksum_valid(p)
423 elif ip.proto == IP_PROTOS.udp:
424 p = ip / UDP(buffer.getvalue()[:8]) / Raw(buffer.getvalue()[8:])
425 elif ip.proto == IP_PROTOS.icmp:
426 p = ip / ICMP(buffer.getvalue())
429 def reass_frags_and_verify_ip6(self, frags, src, dst):
431 Reassemble and verify fragmented packet
433 :param frags: Captured fragments
434 :param src: Source IPv6 address to verify
435 :param dst: Destination IPv6 address to verify
437 :returns: Reassembled IPv6 packet
441 self.assertEqual(p[IPv6].src, src)
442 self.assertEqual(p[IPv6].dst, dst)
443 buffer.seek(p[IPv6ExtHdrFragment].offset * 8)
444 buffer.write(bytes(p[IPv6ExtHdrFragment].payload))
446 src=frags[0][IPv6].src,
447 dst=frags[0][IPv6].dst,
448 nh=frags[0][IPv6ExtHdrFragment].nh,
450 if ip.nh == IP_PROTOS.tcp:
451 p = ip / TCP(buffer.getvalue())
452 elif ip.nh == IP_PROTOS.udp:
453 p = ip / UDP(buffer.getvalue())
454 self.logger.debug(ppp("Reassembled:", p))
455 self.assert_packet_checksums_valid(p)
458 def verify_ipfix_max_bibs(self, data, limit):
460 Verify IPFIX maximum BIB entries exceeded event
462 :param data: Decoded IPFIX data records
463 :param limit: Number of maximum BIB entries that can be created.
465 self.assertEqual(1, len(data))
468 self.assertEqual(scapy.compat.orb(record[230]), 13)
469 # natQuotaExceededEvent
470 self.assertEqual(struct.pack("!I", 2), record[466])
472 self.assertEqual(struct.pack("!I", limit), record[472])
475 def verify_ipfix_bib(self, data, is_create, src_addr):
477 Verify IPFIX NAT64 BIB create and delete events
479 :param data: Decoded IPFIX data records
480 :param is_create: Create event if nonzero value otherwise delete event
481 :param src_addr: IPv6 source address
483 self.assertEqual(1, len(data))
487 self.assertEqual(scapy.compat.orb(record[230]), 10)
489 self.assertEqual(scapy.compat.orb(record[230]), 11)
491 self.assertEqual(src_addr, str(ipaddress.IPv6Address(record[27])))
492 # postNATSourceIPv4Address
493 self.assertEqual(self.nat_addr_n, record[225])
495 self.assertEqual(IP_PROTOS.tcp, scapy.compat.orb(record[4]))
497 self.assertEqual(struct.pack("!I", 0), record[234])
498 # sourceTransportPort
499 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
500 # postNAPTSourceTransportPort
501 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
503 def verify_ipfix_nat64_ses(self, data, is_create, src_addr, dst_addr, dst_port):
505 Verify IPFIX NAT64 session create and delete events
507 :param data: Decoded IPFIX data records
508 :param is_create: Create event if nonzero value otherwise delete event
509 :param src_addr: IPv6 source address
510 :param dst_addr: IPv4 destination address
511 :param dst_port: destination TCP port
513 self.assertEqual(1, len(data))
517 self.assertEqual(scapy.compat.orb(record[230]), 6)
519 self.assertEqual(scapy.compat.orb(record[230]), 7)
521 self.assertEqual(src_addr, str(ipaddress.IPv6Address(record[27])))
522 # destinationIPv6Address
525 socket.AF_INET6, self.compose_ip6(dst_addr, "64:ff9b::", 96)
529 # postNATSourceIPv4Address
530 self.assertEqual(self.nat_addr_n, record[225])
531 # postNATDestinationIPv4Address
532 self.assertEqual(socket.inet_pton(socket.AF_INET, dst_addr), record[226])
534 self.assertEqual(IP_PROTOS.tcp, scapy.compat.orb(record[4]))
536 self.assertEqual(struct.pack("!I", 0), record[234])
537 # sourceTransportPort
538 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
539 # postNAPTSourceTransportPort
540 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
541 # destinationTransportPort
542 self.assertEqual(struct.pack("!H", dst_port), record[11])
543 # postNAPTDestinationTransportPort
544 self.assertEqual(struct.pack("!H", dst_port), record[228])
546 def verify_syslog_sess(self, data, is_add=True, is_ip6=False):
547 message = data.decode("utf-8")
549 message = SyslogMessage.parse(message)
550 except ParseError as e:
554 self.assertEqual(message.severity, SyslogSeverity.info)
555 self.assertEqual(message.appname, "NAT")
556 self.assertEqual(message.msgid, "SADD" if is_add else "SDEL")
557 sd_params = message.sd.get("nsess")
558 self.assertTrue(sd_params is not None)
560 self.assertEqual(sd_params.get("IATYP"), "IPv6")
561 self.assertEqual(sd_params.get("ISADDR"), self.pg0.remote_ip6)
563 self.assertEqual(sd_params.get("IATYP"), "IPv4")
564 self.assertEqual(sd_params.get("ISADDR"), self.pg0.remote_ip4)
565 self.assertTrue(sd_params.get("SSUBIX") is not None)
566 self.assertEqual(sd_params.get("ISPORT"), "%d" % self.tcp_port_in)
567 self.assertEqual(sd_params.get("XATYP"), "IPv4")
568 self.assertEqual(sd_params.get("XSADDR"), self.nat_addr)
569 self.assertEqual(sd_params.get("XSPORT"), "%d" % self.tcp_port_out)
570 self.assertEqual(sd_params.get("PROTO"), "%d" % IP_PROTOS.tcp)
571 self.assertEqual(sd_params.get("SVLAN"), "0")
572 self.assertEqual(sd_params.get("XDADDR"), self.pg1.remote_ip4)
573 self.assertEqual(sd_params.get("XDPORT"), "%d" % self.tcp_external_port)
575 def compose_ip6(self, ip4, pref, plen):
577 Compose IPv4-embedded IPv6 addresses
579 :param ip4: IPv4 address
580 :param pref: IPv6 prefix
581 :param plen: IPv6 prefix length
582 :returns: IPv4-embedded IPv6 addresses
584 pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
585 ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
600 pref_n[10] = ip4_n[3]
604 pref_n[10] = ip4_n[2]
605 pref_n[11] = ip4_n[3]
608 pref_n[10] = ip4_n[1]
609 pref_n[11] = ip4_n[2]
610 pref_n[12] = ip4_n[3]
612 pref_n[12] = ip4_n[0]
613 pref_n[13] = ip4_n[1]
614 pref_n[14] = ip4_n[2]
615 pref_n[15] = ip4_n[3]
616 packed_pref_n = b"".join([scapy.compat.chb(x) for x in pref_n])
617 return socket.inet_ntop(socket.AF_INET6, packed_pref_n)
619 def verify_ipfix_max_sessions(self, data, limit):
621 Verify IPFIX maximum session entries exceeded event
623 :param data: Decoded IPFIX data records
624 :param limit: Number of maximum session entries that can be created.
626 self.assertEqual(1, len(data))
629 self.assertEqual(scapy.compat.orb(record[230]), 13)
630 # natQuotaExceededEvent
631 self.assertEqual(struct.pack("!I", 1), record[466])
633 self.assertEqual(struct.pack("!I", limit), record[471])
636 def test_nat64_inside_interface_handles_neighbor_advertisement(self):
637 """NAT64 inside interface handles Neighbor Advertisement"""
639 flags = self.config_flags.NAT_IS_INSIDE
640 self.vapi.nat64_add_del_interface(
641 is_add=1, flags=flags, sw_if_index=self.pg5.sw_if_index
646 Ether(dst=self.pg5.local_mac, src=self.pg5.remote_mac)
647 / IPv6(src=self.pg5.remote_ip6, dst=self.pg5.local_ip6)
648 / ICMPv6EchoRequest()
651 self.pg5.add_stream(pkts)
652 self.pg_enable_capture(self.pg_interfaces)
655 # Wait for Neighbor Solicitation
656 capture = self.pg5.get_capture(len(pkts))
659 self.assertEqual(packet[IPv6].src, self.pg5.local_ip6_ll)
660 self.assertEqual(packet.haslayer(ICMPv6ND_NS), 1)
661 tgt = packet[ICMPv6ND_NS].tgt
663 self.logger.error(ppp("Unexpected or invalid packet:", packet))
666 # Send Neighbor Advertisement
668 Ether(dst=self.pg5.local_mac, src=self.pg5.remote_mac)
669 / IPv6(src=self.pg5.remote_ip6, dst=self.pg5.local_ip6)
670 / ICMPv6ND_NA(tgt=tgt)
671 / ICMPv6NDOptDstLLAddr(lladdr=self.pg5.remote_mac)
674 self.pg5.add_stream(pkts)
675 self.pg_enable_capture(self.pg_interfaces)
678 # Try to send ping again
680 self.pg5.add_stream(pkts)
681 self.pg_enable_capture(self.pg_interfaces)
684 # Wait for ping reply
685 capture = self.pg5.get_capture(len(pkts))
688 self.assertEqual(packet[IPv6].src, self.pg5.local_ip6)
689 self.assertEqual(packet[IPv6].dst, self.pg5.remote_ip6)
690 self.assertEqual(packet.haslayer(ICMPv6EchoReply), 1)
692 self.logger.error(ppp("Unexpected or invalid packet:", packet))
696 """Add/delete address to NAT64 pool"""
699 self.vapi.nat64_add_del_pool_addr_range(
700 start_addr=nat_addr, end_addr=nat_addr, vrf_id=0xFFFFFFFF, is_add=1
703 addresses = self.vapi.nat64_pool_addr_dump()
704 self.assertEqual(len(addresses), 1)
705 self.assertEqual(str(addresses[0].address), nat_addr)
707 self.vapi.nat64_add_del_pool_addr_range(
708 start_addr=nat_addr, end_addr=nat_addr, vrf_id=0xFFFFFFFF, is_add=0
711 addresses = self.vapi.nat64_pool_addr_dump()
712 self.assertEqual(len(addresses), 0)
714 def test_interface(self):
715 """Enable/disable NAT64 feature on the interface"""
716 flags = self.config_flags.NAT_IS_INSIDE
717 self.vapi.nat64_add_del_interface(
718 is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
720 self.vapi.nat64_add_del_interface(
721 is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index
724 interfaces = self.vapi.nat64_interface_dump()
725 self.assertEqual(len(interfaces), 2)
728 for intf in interfaces:
729 if intf.sw_if_index == self.pg0.sw_if_index:
730 self.assertEqual(intf.flags, self.config_flags.NAT_IS_INSIDE)
732 elif intf.sw_if_index == self.pg1.sw_if_index:
733 self.assertEqual(intf.flags, self.config_flags.NAT_IS_OUTSIDE)
735 self.assertTrue(pg0_found)
736 self.assertTrue(pg1_found)
738 features = self.vapi.cli("show interface features pg0")
739 self.assertIn("nat64-in2out", features)
740 features = self.vapi.cli("show interface features pg1")
741 self.assertIn("nat64-out2in", features)
743 self.vapi.nat64_add_del_interface(
744 is_add=0, flags=flags, sw_if_index=self.pg0.sw_if_index
746 self.vapi.nat64_add_del_interface(
747 is_add=0, flags=flags, sw_if_index=self.pg1.sw_if_index
750 interfaces = self.vapi.nat64_interface_dump()
751 self.assertEqual(len(interfaces), 0)
753 def test_static_bib(self):
754 """Add/delete static BIB entry"""
755 in_addr = "2001:db8:85a3::8a2e:370:7334"
756 out_addr = "10.1.1.3"
759 proto = IP_PROTOS.tcp
761 self.vapi.nat64_add_del_static_bib(
770 bib = self.vapi.nat64_bib_dump(proto=IP_PROTOS.tcp)
773 if bibe.flags & self.config_flags.NAT_IS_STATIC:
775 self.assertEqual(str(bibe.i_addr), in_addr)
776 self.assertEqual(str(bibe.o_addr), out_addr)
777 self.assertEqual(bibe.i_port, in_port)
778 self.assertEqual(bibe.o_port, out_port)
779 self.assertEqual(static_bib_num, 1)
780 bibs = self.statistics.get_counter("/nat64/total-bibs")
781 self.assertEqual(bibs[0][0], 1)
783 self.vapi.nat64_add_del_static_bib(
792 bib = self.vapi.nat64_bib_dump(proto=IP_PROTOS.tcp)
795 if bibe.flags & self.config_flags.NAT_IS_STATIC:
797 self.assertEqual(static_bib_num, 0)
798 bibs = self.statistics.get_counter("/nat64/total-bibs")
799 self.assertEqual(bibs[0][0], 0)
801 def test_set_timeouts(self):
802 """Set NAT64 timeouts"""
803 # verify default values
804 timeouts = self.vapi.nat64_get_timeouts()
805 self.assertEqual(timeouts.udp, 300)
806 self.assertEqual(timeouts.icmp, 60)
807 self.assertEqual(timeouts.tcp_transitory, 240)
808 self.assertEqual(timeouts.tcp_established, 7440)
810 # set and verify custom values
811 self.vapi.nat64_set_timeouts(
812 udp=200, tcp_established=7450, tcp_transitory=250, icmp=30
814 timeouts = self.vapi.nat64_get_timeouts()
815 self.assertEqual(timeouts.udp, 200)
816 self.assertEqual(timeouts.icmp, 30)
817 self.assertEqual(timeouts.tcp_transitory, 250)
818 self.assertEqual(timeouts.tcp_established, 7450)
820 def test_dynamic(self):
821 """NAT64 dynamic translation test"""
822 self.tcp_port_in = 6303
823 self.udp_port_in = 6304
824 self.icmp_id_in = 6305
826 ses_num_start = self.nat64_get_ses_num()
828 self.vapi.nat64_add_del_pool_addr_range(
829 start_addr=self.nat_addr,
830 end_addr=self.nat_addr,
834 flags = self.config_flags.NAT_IS_INSIDE
835 self.vapi.nat64_add_del_interface(
836 is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
838 self.vapi.nat64_add_del_interface(
839 is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index
843 tcpn = self.statistics.get_counter("/nat64/in2out/tcp")[0]
844 udpn = self.statistics.get_counter("/nat64/in2out/udp")[0]
845 icmpn = self.statistics.get_counter("/nat64/in2out/icmp")[0]
846 drops = self.statistics.get_counter("/nat64/in2out/drops")[0]
848 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
849 self.pg0.add_stream(pkts)
850 self.pg_enable_capture(self.pg_interfaces)
852 capture = self.pg1.get_capture(len(pkts))
853 self.verify_capture_out(
854 capture, nat_ip=self.nat_addr, dst_ip=self.pg1.remote_ip4
857 if_idx = self.pg0.sw_if_index
858 cnt = self.statistics.get_counter("/nat64/in2out/tcp")[0]
859 self.assertEqual(cnt[if_idx] - tcpn[if_idx], 1)
860 cnt = self.statistics.get_counter("/nat64/in2out/udp")[0]
861 self.assertEqual(cnt[if_idx] - udpn[if_idx], 1)
862 cnt = self.statistics.get_counter("/nat64/in2out/icmp")[0]
863 self.assertEqual(cnt[if_idx] - icmpn[if_idx], 1)
864 cnt = self.statistics.get_counter("/nat64/in2out/drops")[0]
865 self.assertEqual(cnt[if_idx] - drops[if_idx], 0)
868 tcpn = self.statistics.get_counter("/nat64/out2in/tcp")[0]
869 udpn = self.statistics.get_counter("/nat64/out2in/udp")[0]
870 icmpn = self.statistics.get_counter("/nat64/out2in/icmp")[0]
871 drops = self.statistics.get_counter("/nat64/out2in/drops")[0]
873 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
874 self.pg1.add_stream(pkts)
875 self.pg_enable_capture(self.pg_interfaces)
877 capture = self.pg0.get_capture(len(pkts))
878 ip = IPv6(src="".join(["64:ff9b::", self.pg1.remote_ip4]))
879 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
881 if_idx = self.pg1.sw_if_index
882 cnt = self.statistics.get_counter("/nat64/out2in/tcp")[0]
883 self.assertEqual(cnt[if_idx] - tcpn[if_idx], 2)
884 cnt = self.statistics.get_counter("/nat64/out2in/udp")[0]
885 self.assertEqual(cnt[if_idx] - udpn[if_idx], 1)
886 cnt = self.statistics.get_counter("/nat64/out2in/icmp")[0]
887 self.assertEqual(cnt[if_idx] - icmpn[if_idx], 1)
888 cnt = self.statistics.get_counter("/nat64/out2in/drops")[0]
889 self.assertEqual(cnt[if_idx] - drops[if_idx], 0)
891 bibs = self.statistics.get_counter("/nat64/total-bibs")
892 self.assertEqual(bibs[0][0], 3)
893 sessions = self.statistics.get_counter("/nat64/total-sessions")
894 self.assertEqual(sessions[0][0], 3)
897 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
898 self.pg0.add_stream(pkts)
899 self.pg_enable_capture(self.pg_interfaces)
901 capture = self.pg1.get_capture(len(pkts))
902 self.verify_capture_out(
903 capture, nat_ip=self.nat_addr, dst_ip=self.pg1.remote_ip4
907 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
908 self.pg1.add_stream(pkts)
909 self.pg_enable_capture(self.pg_interfaces)
911 capture = self.pg0.get_capture(len(pkts))
912 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
914 ses_num_end = self.nat64_get_ses_num()
916 self.assertEqual(ses_num_end - ses_num_start, 3)
918 # tenant with specific VRF
919 self.vapi.nat64_add_del_pool_addr_range(
920 start_addr=self.vrf1_nat_addr,
921 end_addr=self.vrf1_nat_addr,
925 flags = self.config_flags.NAT_IS_INSIDE
926 self.vapi.nat64_add_del_interface(
927 is_add=1, flags=flags, sw_if_index=self.pg2.sw_if_index
930 pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
931 self.pg2.add_stream(pkts)
932 self.pg_enable_capture(self.pg_interfaces)
934 capture = self.pg1.get_capture(len(pkts))
935 self.verify_capture_out(
936 capture, nat_ip=self.vrf1_nat_addr, dst_ip=self.pg1.remote_ip4
939 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
940 self.pg1.add_stream(pkts)
941 self.pg_enable_capture(self.pg_interfaces)
943 capture = self.pg2.get_capture(len(pkts))
944 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
946 def test_static(self):
947 """NAT64 static translation test"""
948 self.tcp_port_in = 60303
949 self.udp_port_in = 60304
950 self.icmp_id_in = 60305
951 self.tcp_port_out = 60303
952 self.udp_port_out = 60304
953 self.icmp_id_out = 60305
955 ses_num_start = self.nat64_get_ses_num()
957 self.vapi.nat64_add_del_pool_addr_range(
958 start_addr=self.nat_addr,
959 end_addr=self.nat_addr,
963 flags = self.config_flags.NAT_IS_INSIDE
964 self.vapi.nat64_add_del_interface(
965 is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
967 self.vapi.nat64_add_del_interface(
968 is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index
971 self.vapi.nat64_add_del_static_bib(
972 i_addr=self.pg0.remote_ip6,
973 o_addr=self.nat_addr,
974 i_port=self.tcp_port_in,
975 o_port=self.tcp_port_out,
980 self.vapi.nat64_add_del_static_bib(
981 i_addr=self.pg0.remote_ip6,
982 o_addr=self.nat_addr,
983 i_port=self.udp_port_in,
984 o_port=self.udp_port_out,
989 self.vapi.nat64_add_del_static_bib(
990 i_addr=self.pg0.remote_ip6,
991 o_addr=self.nat_addr,
992 i_port=self.icmp_id_in,
993 o_port=self.icmp_id_out,
994 proto=IP_PROTOS.icmp,
1000 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
1001 self.pg0.add_stream(pkts)
1002 self.pg_enable_capture(self.pg_interfaces)
1004 capture = self.pg1.get_capture(len(pkts))
1005 self.verify_capture_out(
1006 capture, nat_ip=self.nat_addr, dst_ip=self.pg1.remote_ip4, same_port=True
1010 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
1011 self.pg1.add_stream(pkts)
1012 self.pg_enable_capture(self.pg_interfaces)
1014 capture = self.pg0.get_capture(len(pkts))
1015 ip = IPv6(src="".join(["64:ff9b::", self.pg1.remote_ip4]))
1016 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
1018 ses_num_end = self.nat64_get_ses_num()
1020 self.assertEqual(ses_num_end - ses_num_start, 3)
1022 def test_session_timeout(self):
1023 """NAT64 session timeout"""
1024 self.icmp_id_in = 1234
1025 self.vapi.nat64_add_del_pool_addr_range(
1026 start_addr=self.nat_addr,
1027 end_addr=self.nat_addr,
1031 flags = self.config_flags.NAT_IS_INSIDE
1032 self.vapi.nat64_add_del_interface(
1033 is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
1035 self.vapi.nat64_add_del_interface(
1036 is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index
1038 self.vapi.nat64_set_timeouts(
1039 udp=300, tcp_established=5, tcp_transitory=5, icmp=5
1042 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
1043 self.pg0.add_stream(pkts)
1044 self.pg_enable_capture(self.pg_interfaces)
1046 capture = self.pg1.get_capture(len(pkts))
1048 ses_num_before_timeout = self.nat64_get_ses_num()
1050 self.virtual_sleep(15)
1052 # ICMP and TCP session after timeout
1053 ses_num_after_timeout = self.nat64_get_ses_num()
1054 self.assertEqual(ses_num_before_timeout - ses_num_after_timeout, 2)
1056 def test_icmp_error(self):
1057 """NAT64 ICMP Error message translation"""
1058 self.tcp_port_in = 6303
1059 self.udp_port_in = 6304
1060 self.icmp_id_in = 6305
1062 self.vapi.nat64_add_del_pool_addr_range(
1063 start_addr=self.nat_addr,
1064 end_addr=self.nat_addr,
1068 flags = self.config_flags.NAT_IS_INSIDE
1069 self.vapi.nat64_add_del_interface(
1070 is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
1072 self.vapi.nat64_add_del_interface(
1073 is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index
1076 # send some packets to create sessions
1077 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
1078 self.pg0.add_stream(pkts)
1079 self.pg_enable_capture(self.pg_interfaces)
1081 capture_ip4 = self.pg1.get_capture(len(pkts))
1082 self.verify_capture_out(
1083 capture_ip4, nat_ip=self.nat_addr, dst_ip=self.pg1.remote_ip4
1086 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
1087 self.pg1.add_stream(pkts)
1088 self.pg_enable_capture(self.pg_interfaces)
1090 capture_ip6 = self.pg0.get_capture(len(pkts))
1091 ip = IPv6(src="".join(["64:ff9b::", self.pg1.remote_ip4]))
1092 self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src, self.pg0.remote_ip6)
1096 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1097 / IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src)
1098 / ICMPv6DestUnreach(code=1)
1100 for packet in capture_ip6
1102 self.pg0.add_stream(pkts)
1103 self.pg_enable_capture(self.pg_interfaces)
1105 capture = self.pg1.get_capture(len(pkts))
1106 for packet in capture:
1108 self.assertEqual(packet[IP].src, self.nat_addr)
1109 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1110 self.assertEqual(packet[ICMP].type, 3)
1111 self.assertEqual(packet[ICMP].code, 13)
1112 inner = packet[IPerror]
1113 self.assertEqual(inner.src, self.pg1.remote_ip4)
1114 self.assertEqual(inner.dst, self.nat_addr)
1115 self.assert_packet_checksums_valid(packet)
1116 if inner.haslayer(TCPerror):
1117 self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
1118 elif inner.haslayer(UDPerror):
1119 self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
1121 self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
1123 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1128 Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
1129 / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
1130 / ICMP(type=3, code=13)
1132 for packet in capture_ip4
1134 self.pg1.add_stream(pkts)
1135 self.pg_enable_capture(self.pg_interfaces)
1137 capture = self.pg0.get_capture(len(pkts))
1138 for packet in capture:
1140 self.assertEqual(packet[IPv6].src, ip.src)
1141 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
1142 icmp = packet[ICMPv6DestUnreach]
1143 self.assertEqual(icmp.code, 1)
1144 inner = icmp[IPerror6]
1145 self.assertEqual(inner.src, self.pg0.remote_ip6)
1146 self.assertEqual(inner.dst, ip.src)
1147 self.assert_icmpv6_checksum_valid(packet)
1148 if inner.haslayer(TCPerror):
1149 self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
1150 elif inner.haslayer(UDPerror):
1151 self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
1153 self.assertEqual(inner[ICMPv6EchoRequest].id, self.icmp_id_in)
1155 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1158 def test_hairpinning(self):
1159 """NAT64 hairpinning"""
1161 client = self.pg0.remote_hosts[0]
1162 server = self.pg0.remote_hosts[1]
1163 server_tcp_in_port = 22
1164 server_tcp_out_port = 4022
1165 server_udp_in_port = 23
1166 server_udp_out_port = 4023
1167 client_tcp_in_port = 1234
1168 client_udp_in_port = 1235
1169 client_tcp_out_port = 0
1170 client_udp_out_port = 0
1171 ip = IPv6(src="".join(["64:ff9b::", self.nat_addr]))
1172 nat_addr_ip6 = ip.src
1174 self.vapi.nat64_add_del_pool_addr_range(
1175 start_addr=self.nat_addr,
1176 end_addr=self.nat_addr,
1180 flags = self.config_flags.NAT_IS_INSIDE
1181 self.vapi.nat64_add_del_interface(
1182 is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
1184 self.vapi.nat64_add_del_interface(
1185 is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index
1188 self.vapi.nat64_add_del_static_bib(
1190 o_addr=self.nat_addr,
1191 i_port=server_tcp_in_port,
1192 o_port=server_tcp_out_port,
1193 proto=IP_PROTOS.tcp,
1197 self.vapi.nat64_add_del_static_bib(
1199 o_addr=self.nat_addr,
1200 i_port=server_udp_in_port,
1201 o_port=server_udp_out_port,
1202 proto=IP_PROTOS.udp,
1210 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1211 / IPv6(src=client.ip6, dst=nat_addr_ip6)
1212 / TCP(sport=client_tcp_in_port, dport=server_tcp_out_port)
1216 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1217 / IPv6(src=client.ip6, dst=nat_addr_ip6)
1218 / UDP(sport=client_udp_in_port, dport=server_udp_out_port)
1221 self.pg0.add_stream(pkts)
1222 self.pg_enable_capture(self.pg_interfaces)
1224 capture = self.pg0.get_capture(len(pkts))
1225 for packet in capture:
1227 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
1228 self.assertEqual(packet[IPv6].dst, server.ip6)
1229 self.assert_packet_checksums_valid(packet)
1230 if packet.haslayer(TCP):
1231 self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
1232 self.assertEqual(packet[TCP].dport, server_tcp_in_port)
1233 client_tcp_out_port = packet[TCP].sport
1235 self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
1236 self.assertEqual(packet[UDP].dport, server_udp_in_port)
1237 client_udp_out_port = packet[UDP].sport
1239 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1245 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1246 / IPv6(src=server.ip6, dst=nat_addr_ip6)
1247 / TCP(sport=server_tcp_in_port, dport=client_tcp_out_port)
1251 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1252 / IPv6(src=server.ip6, dst=nat_addr_ip6)
1253 / UDP(sport=server_udp_in_port, dport=client_udp_out_port)
1256 self.pg0.add_stream(pkts)
1257 self.pg_enable_capture(self.pg_interfaces)
1259 capture = self.pg0.get_capture(len(pkts))
1260 for packet in capture:
1262 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
1263 self.assertEqual(packet[IPv6].dst, client.ip6)
1264 self.assert_packet_checksums_valid(packet)
1265 if packet.haslayer(TCP):
1266 self.assertEqual(packet[TCP].sport, server_tcp_out_port)
1267 self.assertEqual(packet[TCP].dport, client_tcp_in_port)
1269 self.assertEqual(packet[UDP].sport, server_udp_out_port)
1270 self.assertEqual(packet[UDP].dport, client_udp_in_port)
1272 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1278 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1279 / IPv6(src=client.ip6, dst=nat_addr_ip6)
1280 / ICMPv6DestUnreach(code=1)
1282 for packet in capture
1284 self.pg0.add_stream(pkts)
1285 self.pg_enable_capture(self.pg_interfaces)
1287 capture = self.pg0.get_capture(len(pkts))
1288 for packet in capture:
1290 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
1291 self.assertEqual(packet[IPv6].dst, server.ip6)
1292 icmp = packet[ICMPv6DestUnreach]
1293 self.assertEqual(icmp.code, 1)
1294 inner = icmp[IPerror6]
1295 self.assertEqual(inner.src, server.ip6)
1296 self.assertEqual(inner.dst, nat_addr_ip6)
1297 self.assert_packet_checksums_valid(packet)
1298 if inner.haslayer(TCPerror):
1299 self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
1300 self.assertEqual(inner[TCPerror].dport, client_tcp_out_port)
1302 self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
1303 self.assertEqual(inner[UDPerror].dport, client_udp_out_port)
1305 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1308 def test_prefix(self):
1309 """NAT64 Network-Specific Prefix"""
1311 self.vapi.nat64_add_del_pool_addr_range(
1312 start_addr=self.nat_addr,
1313 end_addr=self.nat_addr,
1317 flags = self.config_flags.NAT_IS_INSIDE
1318 self.vapi.nat64_add_del_interface(
1319 is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
1321 self.vapi.nat64_add_del_interface(
1322 is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index
1324 self.vapi.nat64_add_del_pool_addr_range(
1325 start_addr=self.vrf1_nat_addr,
1326 end_addr=self.vrf1_nat_addr,
1327 vrf_id=self.vrf1_id,
1330 self.vapi.nat64_add_del_interface(
1331 is_add=1, flags=flags, sw_if_index=self.pg2.sw_if_index
1335 global_pref64 = "2001:db8::"
1336 global_pref64_len = 32
1337 global_pref64_str = "{}/{}".format(global_pref64, global_pref64_len)
1338 self.vapi.nat64_add_del_prefix(prefix=global_pref64_str, vrf_id=0, is_add=1)
1340 prefix = self.vapi.nat64_prefix_dump()
1341 self.assertEqual(len(prefix), 1)
1342 self.assertEqual(str(prefix[0].prefix), global_pref64_str)
1343 self.assertEqual(prefix[0].vrf_id, 0)
1345 # Add tenant specific prefix
1346 vrf1_pref64 = "2001:db8:122:300::"
1347 vrf1_pref64_len = 56
1348 vrf1_pref64_str = "{}/{}".format(vrf1_pref64, vrf1_pref64_len)
1349 self.vapi.nat64_add_del_prefix(
1350 prefix=vrf1_pref64_str, vrf_id=self.vrf1_id, is_add=1
1353 prefix = self.vapi.nat64_prefix_dump()
1354 self.assertEqual(len(prefix), 2)
1357 pkts = self.create_stream_in_ip6(
1358 self.pg0, self.pg1, pref=global_pref64, plen=global_pref64_len
1360 self.pg0.add_stream(pkts)
1361 self.pg_enable_capture(self.pg_interfaces)
1363 capture = self.pg1.get_capture(len(pkts))
1364 self.verify_capture_out(
1365 capture, nat_ip=self.nat_addr, dst_ip=self.pg1.remote_ip4
1368 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
1369 self.pg1.add_stream(pkts)
1370 self.pg_enable_capture(self.pg_interfaces)
1372 capture = self.pg0.get_capture(len(pkts))
1373 dst_ip = self.compose_ip6(self.pg1.remote_ip4, global_pref64, global_pref64_len)
1374 self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
1376 # Tenant specific prefix
1377 pkts = self.create_stream_in_ip6(
1378 self.pg2, self.pg1, pref=vrf1_pref64, plen=vrf1_pref64_len
1380 self.pg2.add_stream(pkts)
1381 self.pg_enable_capture(self.pg_interfaces)
1383 capture = self.pg1.get_capture(len(pkts))
1384 self.verify_capture_out(
1385 capture, nat_ip=self.vrf1_nat_addr, dst_ip=self.pg1.remote_ip4
1388 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
1389 self.pg1.add_stream(pkts)
1390 self.pg_enable_capture(self.pg_interfaces)
1392 capture = self.pg2.get_capture(len(pkts))
1393 dst_ip = self.compose_ip6(self.pg1.remote_ip4, vrf1_pref64, vrf1_pref64_len)
1394 self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
1396 def test_unknown_proto(self):
1397 """NAT64 translate packet with unknown protocol"""
1399 self.vapi.nat64_add_del_pool_addr_range(
1400 start_addr=self.nat_addr,
1401 end_addr=self.nat_addr,
1405 flags = self.config_flags.NAT_IS_INSIDE
1406 self.vapi.nat64_add_del_interface(
1407 is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
1409 self.vapi.nat64_add_del_interface(
1410 is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index
1412 remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, "64:ff9b::", 96)
1416 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1417 / IPv6(src=self.pg0.remote_ip6, dst=remote_ip6)
1418 / TCP(sport=self.tcp_port_in, dport=20)
1420 self.pg0.add_stream(p)
1421 self.pg_enable_capture(self.pg_interfaces)
1423 p = self.pg1.get_capture(1)
1426 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1427 / IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47)
1429 / IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4)
1430 / TCP(sport=1234, dport=1234)
1432 self.pg0.add_stream(p)
1433 self.pg_enable_capture(self.pg_interfaces)
1435 p = self.pg1.get_capture(1)
1438 self.assertEqual(packet[IP].src, self.nat_addr)
1439 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1440 self.assertEqual(packet.haslayer(GRE), 1)
1441 self.assert_packet_checksums_valid(packet)
1443 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1448 Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
1449 / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
1451 / IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4)
1452 / TCP(sport=1234, dport=1234)
1454 self.pg1.add_stream(p)
1455 self.pg_enable_capture(self.pg_interfaces)
1457 p = self.pg0.get_capture(1)
1460 self.assertEqual(packet[IPv6].src, remote_ip6)
1461 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
1462 self.assertEqual(packet[IPv6].nh, 47)
1464 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1467 def test_hairpinning_unknown_proto(self):
1468 """NAT64 translate packet with unknown protocol - hairpinning"""
1470 client = self.pg0.remote_hosts[0]
1471 server = self.pg0.remote_hosts[1]
1472 server_tcp_in_port = 22
1473 server_tcp_out_port = 4022
1474 client_tcp_in_port = 1234
1475 client_tcp_out_port = 1235
1476 server_nat_ip = "10.0.0.100"
1477 client_nat_ip = "10.0.0.110"
1478 server_nat_ip6 = self.compose_ip6(server_nat_ip, "64:ff9b::", 96)
1479 client_nat_ip6 = self.compose_ip6(client_nat_ip, "64:ff9b::", 96)
1481 self.vapi.nat64_add_del_pool_addr_range(
1482 start_addr=server_nat_ip,
1483 end_addr=client_nat_ip,
1487 flags = self.config_flags.NAT_IS_INSIDE
1488 self.vapi.nat64_add_del_interface(
1489 is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
1491 self.vapi.nat64_add_del_interface(
1492 is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index
1495 self.vapi.nat64_add_del_static_bib(
1497 o_addr=server_nat_ip,
1498 i_port=server_tcp_in_port,
1499 o_port=server_tcp_out_port,
1500 proto=IP_PROTOS.tcp,
1505 self.vapi.nat64_add_del_static_bib(
1507 o_addr=server_nat_ip,
1510 proto=IP_PROTOS.gre,
1515 self.vapi.nat64_add_del_static_bib(
1517 o_addr=client_nat_ip,
1518 i_port=client_tcp_in_port,
1519 o_port=client_tcp_out_port,
1520 proto=IP_PROTOS.tcp,
1527 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1528 / IPv6(src=client.ip6, dst=server_nat_ip6)
1529 / TCP(sport=client_tcp_in_port, dport=server_tcp_out_port)
1531 self.pg0.add_stream(p)
1532 self.pg_enable_capture(self.pg_interfaces)
1534 p = self.pg0.get_capture(1)
1537 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1538 / IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre)
1540 / IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4)
1541 / TCP(sport=1234, dport=1234)
1543 self.pg0.add_stream(p)
1544 self.pg_enable_capture(self.pg_interfaces)
1546 p = self.pg0.get_capture(1)
1549 self.assertEqual(packet[IPv6].src, client_nat_ip6)
1550 self.assertEqual(packet[IPv6].dst, server.ip6)
1551 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
1553 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1558 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1559 / IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre)
1561 / IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4)
1562 / TCP(sport=1234, dport=1234)
1564 self.pg0.add_stream(p)
1565 self.pg_enable_capture(self.pg_interfaces)
1567 p = self.pg0.get_capture(1)
1570 self.assertEqual(packet[IPv6].src, server_nat_ip6)
1571 self.assertEqual(packet[IPv6].dst, client.ip6)
1572 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
1574 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1577 def test_one_armed_nat64(self):
1578 """One armed NAT64"""
1580 remote_host_ip6 = self.compose_ip6(self.pg3.remote_ip4, "64:ff9b::", 96)
1582 self.vapi.nat64_add_del_pool_addr_range(
1583 start_addr=self.nat_addr,
1584 end_addr=self.nat_addr,
1588 flags = self.config_flags.NAT_IS_INSIDE
1589 self.vapi.nat64_add_del_interface(
1590 is_add=1, flags=flags, sw_if_index=self.pg3.sw_if_index
1592 self.vapi.nat64_add_del_interface(
1593 is_add=1, flags=0, sw_if_index=self.pg3.sw_if_index
1598 Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac)
1599 / IPv6(src=self.pg3.remote_ip6, dst=remote_host_ip6)
1600 / TCP(sport=12345, dport=80)
1602 self.pg3.add_stream(p)
1603 self.pg_enable_capture(self.pg_interfaces)
1605 capture = self.pg3.get_capture(1)
1610 self.assertEqual(ip.src, self.nat_addr)
1611 self.assertEqual(ip.dst, self.pg3.remote_ip4)
1612 self.assertNotEqual(tcp.sport, 12345)
1613 external_port = tcp.sport
1614 self.assertEqual(tcp.dport, 80)
1615 self.assert_packet_checksums_valid(p)
1617 self.logger.error(ppp("Unexpected or invalid packet:", p))
1622 Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac)
1623 / IP(src=self.pg3.remote_ip4, dst=self.nat_addr)
1624 / TCP(sport=80, dport=external_port)
1626 self.pg3.add_stream(p)
1627 self.pg_enable_capture(self.pg_interfaces)
1629 capture = self.pg3.get_capture(1)
1634 self.assertEqual(ip.src, remote_host_ip6)
1635 self.assertEqual(ip.dst, self.pg3.remote_ip6)
1636 self.assertEqual(tcp.sport, 80)
1637 self.assertEqual(tcp.dport, 12345)
1638 self.assert_packet_checksums_valid(p)
1640 self.logger.error(ppp("Unexpected or invalid packet:", p))
1643 def test_frag_in_order(self):
1644 """NAT64 translate fragments arriving in order"""
1645 self.tcp_port_in = random.randint(1025, 65535)
1647 self.vapi.nat64_add_del_pool_addr_range(
1648 start_addr=self.nat_addr,
1649 end_addr=self.nat_addr,
1653 flags = self.config_flags.NAT_IS_INSIDE
1654 self.vapi.nat64_add_del_interface(
1655 is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
1657 self.vapi.nat64_add_del_interface(
1658 is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index
1663 pkts = self.create_stream_frag_ip6(
1664 self.pg0, self.pg1.remote_ip4, self.tcp_port_in, 20, data
1666 self.pg0.add_stream(pkts)
1667 self.pg_enable_capture(self.pg_interfaces)
1669 frags = self.pg1.get_capture(len(pkts))
1670 p = self.reass_frags_and_verify(frags, self.nat_addr, self.pg1.remote_ip4)
1671 self.assertEqual(p[TCP].dport, 20)
1672 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
1673 self.tcp_port_out = p[TCP].sport
1674 self.assertEqual(data, p[Raw].load)
1677 data = b"A" * 4 + b"b" * 16 + b"C" * 3
1678 pkts = self.create_stream_frag(
1679 self.pg1, self.nat_addr, 20, self.tcp_port_out, data
1681 self.pg1.add_stream(pkts)
1682 self.pg_enable_capture(self.pg_interfaces)
1684 frags = self.pg0.get_capture(len(pkts))
1685 self.logger.debug(ppc("Captured:", frags))
1686 src = self.compose_ip6(self.pg1.remote_ip4, "64:ff9b::", 96)
1687 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
1688 self.assertEqual(p[TCP].sport, 20)
1689 self.assertEqual(p[TCP].dport, self.tcp_port_in)
1690 self.assertEqual(data, p[Raw].load)
1692 def test_reass_hairpinning(self):
1693 """NAT64 fragments hairpinning"""
1695 server = self.pg0.remote_hosts[1]
1696 server_in_port = random.randint(1025, 65535)
1697 server_out_port = random.randint(1025, 65535)
1698 client_in_port = random.randint(1025, 65535)
1699 ip = IPv6(src="".join(["64:ff9b::", self.nat_addr]))
1700 nat_addr_ip6 = ip.src
1702 self.vapi.nat64_add_del_pool_addr_range(
1703 start_addr=self.nat_addr,
1704 end_addr=self.nat_addr,
1708 flags = self.config_flags.NAT_IS_INSIDE
1709 self.vapi.nat64_add_del_interface(
1710 is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
1712 self.vapi.nat64_add_del_interface(
1713 is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index
1716 # add static BIB entry for server
1717 self.vapi.nat64_add_del_static_bib(
1719 o_addr=self.nat_addr,
1720 i_port=server_in_port,
1721 o_port=server_out_port,
1722 proto=IP_PROTOS.tcp,
1727 # send packet from host to server
1728 pkts = self.create_stream_frag_ip6(
1729 self.pg0, self.nat_addr, client_in_port, server_out_port, data
1731 self.pg0.add_stream(pkts)
1732 self.pg_enable_capture(self.pg_interfaces)
1734 frags = self.pg0.get_capture(len(pkts))
1735 self.logger.debug(ppc("Captured:", frags))
1736 p = self.reass_frags_and_verify_ip6(frags, nat_addr_ip6, server.ip6)
1737 self.assertNotEqual(p[TCP].sport, client_in_port)
1738 self.assertEqual(p[TCP].dport, server_in_port)
1739 self.assertEqual(data, p[Raw].load)
1741 def test_frag_out_of_order(self):
1742 """NAT64 translate fragments arriving out of order"""
1743 self.tcp_port_in = random.randint(1025, 65535)
1745 self.vapi.nat64_add_del_pool_addr_range(
1746 start_addr=self.nat_addr,
1747 end_addr=self.nat_addr,
1751 flags = self.config_flags.NAT_IS_INSIDE
1752 self.vapi.nat64_add_del_interface(
1753 is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
1755 self.vapi.nat64_add_del_interface(
1756 is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index
1761 pkts = self.create_stream_frag_ip6(
1762 self.pg0, self.pg1.remote_ip4, self.tcp_port_in, 20, data
1765 self.pg0.add_stream(pkts)
1766 self.pg_enable_capture(self.pg_interfaces)
1768 frags = self.pg1.get_capture(len(pkts))
1769 p = self.reass_frags_and_verify(frags, self.nat_addr, self.pg1.remote_ip4)
1770 self.assertEqual(p[TCP].dport, 20)
1771 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
1772 self.tcp_port_out = p[TCP].sport
1773 self.assertEqual(data, p[Raw].load)
1776 data = b"A" * 4 + b"B" * 16 + b"C" * 3
1777 pkts = self.create_stream_frag(
1778 self.pg1, self.nat_addr, 20, self.tcp_port_out, data
1781 self.pg1.add_stream(pkts)
1782 self.pg_enable_capture(self.pg_interfaces)
1784 frags = self.pg0.get_capture(len(pkts))
1785 src = self.compose_ip6(self.pg1.remote_ip4, "64:ff9b::", 96)
1786 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
1787 self.assertEqual(p[TCP].sport, 20)
1788 self.assertEqual(p[TCP].dport, self.tcp_port_in)
1789 self.assertEqual(data, p[Raw].load)
1791 def test_interface_addr(self):
1792 """Acquire NAT64 pool addresses from interface"""
1793 self.vapi.nat64_add_del_interface_addr(
1794 is_add=1, sw_if_index=self.pg4.sw_if_index
1797 # no address in NAT64 pool
1798 addresses = self.vapi.nat44_address_dump()
1799 self.assertEqual(0, len(addresses))
1801 # configure interface address and check NAT64 address pool
1802 self.pg4.config_ip4()
1803 addresses = self.vapi.nat64_pool_addr_dump()
1804 self.assertEqual(len(addresses), 1)
1806 self.assertEqual(str(addresses[0].address), self.pg4.local_ip4)
1808 # remove interface address and check NAT64 address pool
1809 self.pg4.unconfig_ip4()
1810 addresses = self.vapi.nat64_pool_addr_dump()
1811 self.assertEqual(0, len(addresses))
1813 @unittest.skipUnless(config.extended, "part of extended tests")
1814 def test_ipfix_max_bibs_sessions(self):
1815 """IPFIX logging maximum session and BIB entries exceeded"""
1818 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4, "64:ff9b::", 96)
1820 self.vapi.nat64_add_del_pool_addr_range(
1821 start_addr=self.nat_addr,
1822 end_addr=self.nat_addr,
1826 flags = self.config_flags.NAT_IS_INSIDE
1827 self.vapi.nat64_add_del_interface(
1828 is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
1830 self.vapi.nat64_add_del_interface(
1831 is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index
1836 for i in range(0, max_bibs):
1837 src = "fd01:aa::%x" % (i)
1839 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1840 / IPv6(src=src, dst=remote_host_ip6)
1841 / TCP(sport=12345, dport=80)
1845 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1846 / IPv6(src=src, dst=remote_host_ip6)
1847 / TCP(sport=12345, dport=22)
1850 self.pg0.add_stream(pkts)
1851 self.pg_enable_capture(self.pg_interfaces)
1853 self.pg1.get_capture(max_sessions)
1855 self.vapi.set_ipfix_exporter(
1856 collector_address=self.pg3.remote_ip4,
1857 src_address=self.pg3.local_ip4,
1859 template_interval=10,
1861 self.vapi.nat_ipfix_enable_disable(
1862 domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port, enable=1
1866 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1867 / IPv6(src=src, dst=remote_host_ip6)
1868 / TCP(sport=12345, dport=25)
1870 self.pg0.add_stream(p)
1871 self.pg_enable_capture(self.pg_interfaces)
1873 self.pg1.assert_nothing_captured()
1874 self.vapi.ipfix_flush()
1875 capture = self.pg3.get_capture(7)
1876 ipfix = IPFIXDecoder()
1877 # first load template
1879 self.assertTrue(p.haslayer(IPFIX))
1880 self.assertEqual(p[IP].src, self.pg3.local_ip4)
1881 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1882 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1883 self.assertEqual(p[UDP].dport, 4739)
1884 self.assertEqual(p[IPFIX].observationDomainID, self.ipfix_domain_id)
1885 if p.haslayer(Template):
1886 ipfix.add_template(p.getlayer(Template))
1887 # verify events in data set
1890 if p.haslayer(Data):
1891 data = ipfix.decode_data_set(p.getlayer(Set))
1892 event_count += self.verify_ipfix_max_sessions(data, max_sessions)
1893 self.assertEqual(event_count, 1)
1896 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1897 / IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6)
1898 / TCP(sport=12345, dport=80)
1900 self.pg0.add_stream(p)
1901 self.pg_enable_capture(self.pg_interfaces)
1903 self.pg1.assert_nothing_captured()
1904 self.vapi.ipfix_flush()
1905 capture = self.pg3.get_capture(1)
1906 # verify events in data set
1909 self.assertTrue(p.haslayer(IPFIX))
1910 self.assertEqual(p[IP].src, self.pg3.local_ip4)
1911 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1912 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1913 self.assertEqual(p[UDP].dport, 4739)
1914 self.assertEqual(p[IPFIX].observationDomainID, self.ipfix_domain_id)
1915 if p.haslayer(Data):
1916 data = ipfix.decode_data_set(p.getlayer(Set))
1917 event_count += self.verify_ipfix_max_bibs(data, max_bibs)
1918 self.assertEqual(event_count, 1)
1920 def test_ipfix_bib_ses(self):
1921 """IPFIX logging NAT64 BIB/session create and delete events"""
1922 self.tcp_port_in = random.randint(1025, 65535)
1923 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4, "64:ff9b::", 96)
1925 self.vapi.nat64_add_del_pool_addr_range(
1926 start_addr=self.nat_addr,
1927 end_addr=self.nat_addr,
1931 flags = self.config_flags.NAT_IS_INSIDE
1932 self.vapi.nat64_add_del_interface(
1933 is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
1935 self.vapi.nat64_add_del_interface(
1936 is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index
1938 self.vapi.set_ipfix_exporter(
1939 collector_address=self.pg3.remote_ip4,
1940 src_address=self.pg3.local_ip4,
1942 template_interval=10,
1944 self.vapi.nat_ipfix_enable_disable(
1945 domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port, enable=1
1950 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1951 / IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6)
1952 / TCP(sport=self.tcp_port_in, dport=25)
1954 self.pg0.add_stream(p)
1955 self.pg_enable_capture(self.pg_interfaces)
1957 p = self.pg1.get_capture(1)
1958 self.tcp_port_out = p[0][TCP].sport
1959 self.vapi.ipfix_flush()
1960 capture = self.pg3.get_capture(8)
1961 ipfix = IPFIXDecoder()
1962 # first load template
1964 self.assertTrue(p.haslayer(IPFIX))
1965 self.assertEqual(p[IP].src, self.pg3.local_ip4)
1966 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1967 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1968 self.assertEqual(p[UDP].dport, 4739)
1969 self.assertEqual(p[IPFIX].observationDomainID, self.ipfix_domain_id)
1970 if p.haslayer(Template):
1971 ipfix.add_template(p.getlayer(Template))
1972 # verify events in data set
1974 if p.haslayer(Data):
1975 data = ipfix.decode_data_set(p.getlayer(Set))
1976 if scapy.compat.orb(data[0][230]) == 10:
1977 self.verify_ipfix_bib(data, 1, self.pg0.remote_ip6)
1978 elif scapy.compat.orb(data[0][230]) == 6:
1979 self.verify_ipfix_nat64_ses(
1980 data, 1, self.pg0.remote_ip6, self.pg1.remote_ip4, 25
1983 self.logger.error(ppp("Unexpected or invalid packet: ", p))
1986 self.pg_enable_capture(self.pg_interfaces)
1987 self.vapi.nat64_add_del_pool_addr_range(
1988 start_addr=self.nat_addr,
1989 end_addr=self.nat_addr,
1993 self.vapi.ipfix_flush()
1994 capture = self.pg3.get_capture(2)
1995 # verify events in data set
1997 self.assertTrue(p.haslayer(IPFIX))
1998 self.assertEqual(p[IP].src, self.pg3.local_ip4)
1999 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2000 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2001 self.assertEqual(p[UDP].dport, 4739)
2002 self.assertEqual(p[IPFIX].observationDomainID, self.ipfix_domain_id)
2003 if p.haslayer(Data):
2004 data = ipfix.decode_data_set(p.getlayer(Set))
2005 if scapy.compat.orb(data[0][230]) == 11:
2006 self.verify_ipfix_bib(data, 0, self.pg0.remote_ip6)
2007 elif scapy.compat.orb(data[0][230]) == 7:
2008 self.verify_ipfix_nat64_ses(
2009 data, 0, self.pg0.remote_ip6, self.pg1.remote_ip4, 25
2012 self.logger.error(ppp("Unexpected or invalid packet: ", p))
2014 def test_syslog_sess(self):
2015 """Test syslog session creation and deletion"""
2016 self.tcp_port_in = random.randint(1025, 65535)
2017 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4, "64:ff9b::", 96)
2019 self.vapi.nat64_add_del_pool_addr_range(
2020 start_addr=self.nat_addr,
2021 end_addr=self.nat_addr,
2025 flags = self.config_flags.NAT_IS_INSIDE
2026 self.vapi.nat64_add_del_interface(
2027 is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
2029 self.vapi.nat64_add_del_interface(
2030 is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index
2032 self.vapi.syslog_set_filter(self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_INFO)
2033 self.vapi.syslog_set_sender(self.pg3.local_ip4, self.pg3.remote_ip4)
2036 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2037 / IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6)
2038 / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port)
2040 self.pg0.add_stream(p)
2041 self.pg_enable_capture(self.pg_interfaces)
2043 p = self.pg1.get_capture(1)
2044 self.tcp_port_out = p[0][TCP].sport
2045 capture = self.pg3.get_capture(1)
2046 self.verify_syslog_sess(capture[0][Raw].load, is_ip6=True)
2048 self.pg_enable_capture(self.pg_interfaces)
2050 self.vapi.nat64_add_del_pool_addr_range(
2051 start_addr=self.nat_addr,
2052 end_addr=self.nat_addr,
2056 capture = self.pg3.get_capture(1)
2057 self.verify_syslog_sess(capture[0][Raw].load, False, True)
2059 def nat64_get_ses_num(self):
2061 Return number of active NAT64 sessions.
2063 st = self.vapi.nat64_st_dump(proto=255)
2066 def clear_nat64(self):
2068 Clear NAT64 configuration.
2070 self.vapi.nat_ipfix_enable_disable(
2071 domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port, enable=0
2073 self.ipfix_src_port = 4739
2074 self.ipfix_domain_id = 1
2076 self.vapi.syslog_set_filter(self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_EMERG)
2078 self.vapi.nat64_set_timeouts(
2079 udp=300, tcp_established=7440, tcp_transitory=240, icmp=60
2082 interfaces = self.vapi.nat64_interface_dump()
2083 for intf in interfaces:
2084 self.vapi.nat64_add_del_interface(
2085 is_add=0, flags=intf.flags, sw_if_index=intf.sw_if_index
2088 bib = self.vapi.nat64_bib_dump(proto=255)
2090 if bibe.flags & self.config_flags.NAT_IS_STATIC:
2091 self.vapi.nat64_add_del_static_bib(
2101 adresses = self.vapi.nat64_pool_addr_dump()
2102 for addr in adresses:
2103 self.vapi.nat64_add_del_pool_addr_range(
2104 start_addr=addr.address,
2105 end_addr=addr.address,
2110 prefixes = self.vapi.nat64_prefix_dump()
2111 for prefix in prefixes:
2112 self.vapi.nat64_add_del_prefix(
2113 prefix=str(prefix.prefix), vrf_id=prefix.vrf_id, is_add=0
2116 bibs = self.statistics.get_counter("/nat64/total-bibs")
2117 self.assertEqual(bibs[0][0], 0)
2118 sessions = self.statistics.get_counter("/nat64/total-sessions")
2119 self.assertEqual(sessions[0][0], 0)
2122 if __name__ == "__main__":
2123 unittest.main(testRunner=VppTestRunner)