11 from config import config
12 from framework import tag_fixme_vpp_workers
13 from framework import VppTestCase, VppTestRunner
14 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
15 from scapy.data import IP_PROTOS
16 from scapy.layers.inet import IP, TCP, UDP, ICMP
17 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
18 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
19 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply, \
20 ICMPv6ND_NS, ICMPv6ND_NA, ICMPv6NDOptDstLLAddr, fragment6
21 from scapy.layers.l2 import Ether, GRE
22 from scapy.packet import Raw
23 from syslog_rfc5424_parser import SyslogMessage, ParseError
24 from syslog_rfc5424_parser.constants import SyslogSeverity
25 from util import ppc, ppp
26 from vpp_papi import VppEnum
29 @tag_fixme_vpp_workers
30 class TestNAT64(VppTestCase):
31 """ NAT64 Test Cases """
34 def SYSLOG_SEVERITY(self):
35 return VppEnum.vl_api_syslog_severity_t
38 def config_flags(self):
39 return VppEnum.vl_api_nat_config_flags_t
43 super(TestNAT64, cls).setUpClass()
45 cls.tcp_port_in = 6303
46 cls.tcp_port_out = 6303
47 cls.udp_port_in = 6304
48 cls.udp_port_out = 6304
50 cls.icmp_id_out = 6305
51 cls.tcp_external_port = 80
52 cls.nat_addr = '10.0.0.3'
53 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
55 cls.vrf1_nat_addr = '10.0.10.3'
56 cls.ipfix_src_port = 4739
57 cls.ipfix_domain_id = 1
59 cls.create_pg_interfaces(range(6))
60 cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
61 cls.ip6_interfaces.append(cls.pg_interfaces[2])
62 cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
64 cls.vapi.ip_table_add_del(is_add=1,
65 table={'table_id': cls.vrf1_id,
68 cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
70 cls.pg0.generate_remote_hosts(2)
72 for i in cls.ip6_interfaces:
75 i.configure_ipv6_neighbors()
77 for i in cls.ip4_interfaces:
86 cls.pg3.configure_ipv6_neighbors()
92 def tearDownClass(cls):
93 super(TestNAT64, cls).tearDownClass()
96 super(TestNAT64, self).setUp()
97 self.vapi.nat64_plugin_enable_disable(enable=1,
98 bib_buckets=128, st_buckets=256)
101 super(TestNAT64, self).tearDown()
102 if not self.vpp_dead:
103 self.vapi.nat64_plugin_enable_disable(enable=0)
105 def show_commands_at_teardown(self):
106 self.logger.info(self.vapi.cli("show nat64 pool"))
107 self.logger.info(self.vapi.cli("show nat64 interfaces"))
108 self.logger.info(self.vapi.cli("show nat64 prefix"))
109 self.logger.info(self.vapi.cli("show nat64 bib all"))
110 self.logger.info(self.vapi.cli("show nat64 session table all"))
112 def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
114 Create IPv6 packet stream for inside network
116 :param in_if: Inside interface
117 :param out_if: Outside interface
118 :param ttl: Hop Limit of generated packets
119 :param pref: NAT64 prefix
120 :param plen: NAT64 prefix length
124 dst = ''.join(['64:ff9b::', out_if.remote_ip4])
126 dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
129 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
130 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
131 TCP(sport=self.tcp_port_in, dport=20))
135 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
136 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
137 UDP(sport=self.udp_port_in, dport=20))
141 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
142 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
143 ICMPv6EchoRequest(id=self.icmp_id_in))
148 def create_stream_out(self, out_if, dst_ip=None, ttl=64,
149 use_inside_ports=False):
151 Create packet stream for outside network
153 :param out_if: Outside interface
154 :param dst_ip: Destination IP address (Default use global NAT address)
155 :param ttl: TTL of generated packets
156 :param use_inside_ports: Use inside NAT ports as destination ports
157 instead of outside ports
160 dst_ip = self.nat_addr
161 if not use_inside_ports:
162 tcp_port = self.tcp_port_out
163 udp_port = self.udp_port_out
164 icmp_id = self.icmp_id_out
166 tcp_port = self.tcp_port_in
167 udp_port = self.udp_port_in
168 icmp_id = self.icmp_id_in
171 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
172 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
173 TCP(dport=tcp_port, sport=20))
177 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
178 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
179 UDP(dport=udp_port, sport=20))
183 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
184 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
185 ICMP(id=icmp_id, type='echo-reply'))
190 def verify_capture_out(self, capture, nat_ip=None, same_port=False,
191 dst_ip=None, is_ip6=False, ignore_port=False):
193 Verify captured packets on outside network
195 :param capture: Captured packets
196 :param nat_ip: Translated IP address (Default use global NAT address)
197 :param same_port: Source port number is not translated (Default False)
198 :param dst_ip: Destination IP address (Default do not verify)
199 :param is_ip6: If L3 protocol is IPv6 (Default False)
203 ICMP46 = ICMPv6EchoRequest
208 nat_ip = self.nat_addr
209 for packet in capture:
212 self.assert_packet_checksums_valid(packet)
213 self.assertEqual(packet[IP46].src, nat_ip)
214 if dst_ip is not None:
215 self.assertEqual(packet[IP46].dst, dst_ip)
216 if packet.haslayer(TCP):
220 packet[TCP].sport, self.tcp_port_in)
223 packet[TCP].sport, self.tcp_port_in)
224 self.tcp_port_out = packet[TCP].sport
225 self.assert_packet_checksums_valid(packet)
226 elif packet.haslayer(UDP):
230 packet[UDP].sport, self.udp_port_in)
233 packet[UDP].sport, self.udp_port_in)
234 self.udp_port_out = packet[UDP].sport
239 packet[ICMP46].id, self.icmp_id_in)
242 packet[ICMP46].id, self.icmp_id_in)
243 self.icmp_id_out = packet[ICMP46].id
244 self.assert_packet_checksums_valid(packet)
246 self.logger.error(ppp("Unexpected or invalid packet "
247 "(outside network):", packet))
250 def verify_capture_in_ip6(self, capture, src_ip, dst_ip):
252 Verify captured IPv6 packets on inside network
254 :param capture: Captured packets
255 :param src_ip: Source IP
256 :param dst_ip: Destination IP address
258 for packet in capture:
260 self.assertEqual(packet[IPv6].src, src_ip)
261 self.assertEqual(packet[IPv6].dst, dst_ip)
262 self.assert_packet_checksums_valid(packet)
263 if packet.haslayer(TCP):
264 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
265 elif packet.haslayer(UDP):
266 self.assertEqual(packet[UDP].dport, self.udp_port_in)
268 self.assertEqual(packet[ICMPv6EchoReply].id,
271 self.logger.error(ppp("Unexpected or invalid packet "
272 "(inside network):", packet))
275 def create_stream_frag(self, src_if, dst, sport, dport, data,
276 proto=IP_PROTOS.tcp, echo_reply=False):
278 Create fragmented packet stream
280 :param src_if: Source interface
281 :param dst: Destination IPv4 address
282 :param sport: Source port
283 :param dport: Destination port
284 :param data: Payload data
285 :param proto: protocol (TCP, UDP, ICMP)
286 :param echo_reply: use echo_reply if protocol is ICMP
289 if proto == IP_PROTOS.tcp:
290 p = (IP(src=src_if.remote_ip4, dst=dst) /
291 TCP(sport=sport, dport=dport) /
293 p = p.__class__(scapy.compat.raw(p))
294 chksum = p[TCP].chksum
295 proto_header = TCP(sport=sport, dport=dport, chksum=chksum)
296 elif proto == IP_PROTOS.udp:
297 proto_header = UDP(sport=sport, dport=dport)
298 elif proto == IP_PROTOS.icmp:
300 proto_header = ICMP(id=sport, type='echo-request')
302 proto_header = ICMP(id=sport, type='echo-reply')
304 raise Exception("Unsupported protocol")
305 id = random.randint(0, 65535)
307 if proto == IP_PROTOS.tcp:
310 raw = Raw(data[0:16])
311 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
312 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id) /
316 if proto == IP_PROTOS.tcp:
317 raw = Raw(data[4:20])
319 raw = Raw(data[16:32])
320 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
321 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id,
325 if proto == IP_PROTOS.tcp:
329 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
330 IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=proto,
336 def create_stream_frag_ip6(self, src_if, dst, sport, dport, data,
337 pref=None, plen=0, frag_size=128):
339 Create fragmented packet stream
341 :param src_if: Source interface
342 :param dst: Destination IPv4 address
343 :param sport: Source TCP port
344 :param dport: Destination TCP port
345 :param data: Payload data
346 :param pref: NAT64 prefix
347 :param plen: NAT64 prefix length
348 :param fragsize: size of fragments
352 dst_ip6 = ''.join(['64:ff9b::', dst])
354 dst_ip6 = self.compose_ip6(dst, pref, plen)
356 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
357 IPv6(src=src_if.remote_ip6, dst=dst_ip6) /
358 IPv6ExtHdrFragment(id=random.randint(0, 65535)) /
359 TCP(sport=sport, dport=dport) /
362 return fragment6(p, frag_size)
364 def reass_frags_and_verify(self, frags, src, dst):
366 Reassemble and verify fragmented packet
368 :param frags: Captured fragments
369 :param src: Source IPv4 address to verify
370 :param dst: Destination IPv4 address to verify
372 :returns: Reassembled IPv4 packet
376 self.assertEqual(p[IP].src, src)
377 self.assertEqual(p[IP].dst, dst)
378 self.assert_ip_checksum_valid(p)
379 buffer.seek(p[IP].frag * 8)
380 buffer.write(bytes(p[IP].payload))
381 ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst,
382 proto=frags[0][IP].proto)
383 if ip.proto == IP_PROTOS.tcp:
384 p = (ip / TCP(buffer.getvalue()))
385 self.logger.debug(ppp("Reassembled:", p))
386 self.assert_tcp_checksum_valid(p)
387 elif ip.proto == IP_PROTOS.udp:
388 p = (ip / UDP(buffer.getvalue()[:8]) /
389 Raw(buffer.getvalue()[8:]))
390 elif ip.proto == IP_PROTOS.icmp:
391 p = (ip / ICMP(buffer.getvalue()))
394 def reass_frags_and_verify_ip6(self, frags, src, dst):
396 Reassemble and verify fragmented packet
398 :param frags: Captured fragments
399 :param src: Source IPv6 address to verify
400 :param dst: Destination IPv6 address to verify
402 :returns: Reassembled IPv6 packet
406 self.assertEqual(p[IPv6].src, src)
407 self.assertEqual(p[IPv6].dst, dst)
408 buffer.seek(p[IPv6ExtHdrFragment].offset * 8)
409 buffer.write(bytes(p[IPv6ExtHdrFragment].payload))
410 ip = IPv6(src=frags[0][IPv6].src, dst=frags[0][IPv6].dst,
411 nh=frags[0][IPv6ExtHdrFragment].nh)
412 if ip.nh == IP_PROTOS.tcp:
413 p = (ip / TCP(buffer.getvalue()))
414 elif ip.nh == IP_PROTOS.udp:
415 p = (ip / UDP(buffer.getvalue()))
416 self.logger.debug(ppp("Reassembled:", p))
417 self.assert_packet_checksums_valid(p)
420 def verify_ipfix_max_bibs(self, data, limit):
422 Verify IPFIX maximum BIB entries exceeded event
424 :param data: Decoded IPFIX data records
425 :param limit: Number of maximum BIB entries that can be created.
427 self.assertEqual(1, len(data))
430 self.assertEqual(scapy.compat.orb(record[230]), 13)
431 # natQuotaExceededEvent
432 self.assertEqual(struct.pack("I", 2), record[466])
434 self.assertEqual(struct.pack("I", limit), record[472])
436 def verify_ipfix_bib(self, data, is_create, src_addr):
438 Verify IPFIX NAT64 BIB create and delete events
440 :param data: Decoded IPFIX data records
441 :param is_create: Create event if nonzero value otherwise delete event
442 :param src_addr: IPv6 source address
444 self.assertEqual(1, len(data))
448 self.assertEqual(scapy.compat.orb(record[230]), 10)
450 self.assertEqual(scapy.compat.orb(record[230]), 11)
452 self.assertEqual(src_addr, str(ipaddress.IPv6Address(record[27])))
453 # postNATSourceIPv4Address
454 self.assertEqual(self.nat_addr_n, record[225])
456 self.assertEqual(IP_PROTOS.tcp, scapy.compat.orb(record[4]))
458 self.assertEqual(struct.pack("!I", 0), record[234])
459 # sourceTransportPort
460 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
461 # postNAPTSourceTransportPort
462 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
464 def verify_ipfix_nat64_ses(self, data, is_create, src_addr, dst_addr,
467 Verify IPFIX NAT64 session create and delete events
469 :param data: Decoded IPFIX data records
470 :param is_create: Create event if nonzero value otherwise delete event
471 :param src_addr: IPv6 source address
472 :param dst_addr: IPv4 destination address
473 :param dst_port: destination TCP port
475 self.assertEqual(1, len(data))
479 self.assertEqual(scapy.compat.orb(record[230]), 6)
481 self.assertEqual(scapy.compat.orb(record[230]), 7)
483 self.assertEqual(src_addr, str(ipaddress.IPv6Address(record[27])))
484 # destinationIPv6Address
485 self.assertEqual(socket.inet_pton(socket.AF_INET6,
486 self.compose_ip6(dst_addr,
490 # postNATSourceIPv4Address
491 self.assertEqual(self.nat_addr_n, record[225])
492 # postNATDestinationIPv4Address
493 self.assertEqual(socket.inet_pton(socket.AF_INET, dst_addr),
496 self.assertEqual(IP_PROTOS.tcp, scapy.compat.orb(record[4]))
498 self.assertEqual(struct.pack("!I", 0), record[234])
499 # sourceTransportPort
500 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
501 # postNAPTSourceTransportPort
502 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
503 # destinationTransportPort
504 self.assertEqual(struct.pack("!H", dst_port), record[11])
505 # postNAPTDestinationTransportPort
506 self.assertEqual(struct.pack("!H", dst_port), record[228])
508 def verify_syslog_sess(self, data, is_add=True, is_ip6=False):
509 message = data.decode('utf-8')
511 message = SyslogMessage.parse(message)
512 except ParseError as e:
516 self.assertEqual(message.severity, SyslogSeverity.info)
517 self.assertEqual(message.appname, 'NAT')
518 self.assertEqual(message.msgid, 'SADD' if is_add else 'SDEL')
519 sd_params = message.sd.get('nsess')
520 self.assertTrue(sd_params is not None)
522 self.assertEqual(sd_params.get('IATYP'), 'IPv6')
523 self.assertEqual(sd_params.get('ISADDR'), self.pg0.remote_ip6)
525 self.assertEqual(sd_params.get('IATYP'), 'IPv4')
526 self.assertEqual(sd_params.get('ISADDR'), self.pg0.remote_ip4)
527 self.assertTrue(sd_params.get('SSUBIX') is not None)
528 self.assertEqual(sd_params.get('ISPORT'), "%d" % self.tcp_port_in)
529 self.assertEqual(sd_params.get('XATYP'), 'IPv4')
530 self.assertEqual(sd_params.get('XSADDR'), self.nat_addr)
531 self.assertEqual(sd_params.get('XSPORT'), "%d" % self.tcp_port_out)
532 self.assertEqual(sd_params.get('PROTO'), "%d" % IP_PROTOS.tcp)
533 self.assertEqual(sd_params.get('SVLAN'), '0')
534 self.assertEqual(sd_params.get('XDADDR'), self.pg1.remote_ip4)
535 self.assertEqual(sd_params.get('XDPORT'),
536 "%d" % self.tcp_external_port)
538 def compose_ip6(self, ip4, pref, plen):
540 Compose IPv4-embedded IPv6 addresses
542 :param ip4: IPv4 address
543 :param pref: IPv6 prefix
544 :param plen: IPv6 prefix length
545 :returns: IPv4-embedded IPv6 addresses
547 pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
548 ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
563 pref_n[10] = ip4_n[3]
567 pref_n[10] = ip4_n[2]
568 pref_n[11] = ip4_n[3]
571 pref_n[10] = ip4_n[1]
572 pref_n[11] = ip4_n[2]
573 pref_n[12] = ip4_n[3]
575 pref_n[12] = ip4_n[0]
576 pref_n[13] = ip4_n[1]
577 pref_n[14] = ip4_n[2]
578 pref_n[15] = ip4_n[3]
579 packed_pref_n = b''.join([scapy.compat.chb(x) for x in pref_n])
580 return socket.inet_ntop(socket.AF_INET6, packed_pref_n)
582 def verify_ipfix_max_sessions(self, data, limit):
584 Verify IPFIX maximum session entries exceeded event
586 :param data: Decoded IPFIX data records
587 :param limit: Number of maximum session entries that can be created.
589 self.assertEqual(1, len(data))
592 self.assertEqual(scapy.compat.orb(record[230]), 13)
593 # natQuotaExceededEvent
594 self.assertEqual(struct.pack("I", 1), record[466])
596 self.assertEqual(struct.pack("I", limit), record[471])
598 def test_nat64_inside_interface_handles_neighbor_advertisement(self):
599 """ NAT64 inside interface handles Neighbor Advertisement """
601 flags = self.config_flags.NAT_IS_INSIDE
602 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
603 sw_if_index=self.pg5.sw_if_index)
606 ping = (Ether(dst=self.pg5.local_mac, src=self.pg5.remote_mac) /
607 IPv6(src=self.pg5.remote_ip6, dst=self.pg5.local_ip6) /
610 self.pg5.add_stream(pkts)
611 self.pg_enable_capture(self.pg_interfaces)
614 # Wait for Neighbor Solicitation
615 capture = self.pg5.get_capture(len(pkts))
618 self.assertEqual(packet[IPv6].src, self.pg5.local_ip6)
619 self.assertEqual(packet.haslayer(ICMPv6ND_NS), 1)
620 tgt = packet[ICMPv6ND_NS].tgt
622 self.logger.error(ppp("Unexpected or invalid packet:", packet))
625 # Send Neighbor Advertisement
626 p = (Ether(dst=self.pg5.local_mac, src=self.pg5.remote_mac) /
627 IPv6(src=self.pg5.remote_ip6, dst=self.pg5.local_ip6) /
628 ICMPv6ND_NA(tgt=tgt) /
629 ICMPv6NDOptDstLLAddr(lladdr=self.pg5.remote_mac))
631 self.pg5.add_stream(pkts)
632 self.pg_enable_capture(self.pg_interfaces)
635 # Try to send ping again
637 self.pg5.add_stream(pkts)
638 self.pg_enable_capture(self.pg_interfaces)
641 # Wait for ping reply
642 capture = self.pg5.get_capture(len(pkts))
645 self.assertEqual(packet[IPv6].src, self.pg5.local_ip6)
646 self.assertEqual(packet[IPv6].dst, self.pg5.remote_ip6)
647 self.assertEqual(packet.haslayer(ICMPv6EchoReply), 1)
649 self.logger.error(ppp("Unexpected or invalid packet:", packet))
653 """ Add/delete address to NAT64 pool """
656 self.vapi.nat64_add_del_pool_addr_range(start_addr=nat_addr,
658 vrf_id=0xFFFFFFFF, is_add=1)
660 addresses = self.vapi.nat64_pool_addr_dump()
661 self.assertEqual(len(addresses), 1)
662 self.assertEqual(str(addresses[0].address), nat_addr)
664 self.vapi.nat64_add_del_pool_addr_range(start_addr=nat_addr,
666 vrf_id=0xFFFFFFFF, is_add=0)
668 addresses = self.vapi.nat64_pool_addr_dump()
669 self.assertEqual(len(addresses), 0)
671 def test_interface(self):
672 """ Enable/disable NAT64 feature on the interface """
673 flags = self.config_flags.NAT_IS_INSIDE
674 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
675 sw_if_index=self.pg0.sw_if_index)
676 self.vapi.nat64_add_del_interface(is_add=1, flags=0,
677 sw_if_index=self.pg1.sw_if_index)
679 interfaces = self.vapi.nat64_interface_dump()
680 self.assertEqual(len(interfaces), 2)
683 for intf in interfaces:
684 if intf.sw_if_index == self.pg0.sw_if_index:
685 self.assertEqual(intf.flags, self.config_flags.NAT_IS_INSIDE)
687 elif intf.sw_if_index == self.pg1.sw_if_index:
688 self.assertEqual(intf.flags, self.config_flags.NAT_IS_OUTSIDE)
690 self.assertTrue(pg0_found)
691 self.assertTrue(pg1_found)
693 features = self.vapi.cli("show interface features pg0")
694 self.assertIn('nat64-in2out', features)
695 features = self.vapi.cli("show interface features pg1")
696 self.assertIn('nat64-out2in', features)
698 self.vapi.nat64_add_del_interface(is_add=0, flags=flags,
699 sw_if_index=self.pg0.sw_if_index)
700 self.vapi.nat64_add_del_interface(is_add=0, flags=flags,
701 sw_if_index=self.pg1.sw_if_index)
703 interfaces = self.vapi.nat64_interface_dump()
704 self.assertEqual(len(interfaces), 0)
706 def test_static_bib(self):
707 """ Add/delete static BIB entry """
708 in_addr = '2001:db8:85a3::8a2e:370:7334'
709 out_addr = '10.1.1.3'
712 proto = IP_PROTOS.tcp
714 self.vapi.nat64_add_del_static_bib(i_addr=in_addr, o_addr=out_addr,
715 i_port=in_port, o_port=out_port,
716 proto=proto, vrf_id=0, is_add=1)
717 bib = self.vapi.nat64_bib_dump(proto=IP_PROTOS.tcp)
720 if bibe.flags & self.config_flags.NAT_IS_STATIC:
722 self.assertEqual(str(bibe.i_addr), in_addr)
723 self.assertEqual(str(bibe.o_addr), out_addr)
724 self.assertEqual(bibe.i_port, in_port)
725 self.assertEqual(bibe.o_port, out_port)
726 self.assertEqual(static_bib_num, 1)
727 bibs = self.statistics.get_counter('/nat64/total-bibs')
728 self.assertEqual(bibs[0][0], 1)
730 self.vapi.nat64_add_del_static_bib(i_addr=in_addr, o_addr=out_addr,
731 i_port=in_port, o_port=out_port,
732 proto=proto, vrf_id=0, is_add=0)
733 bib = self.vapi.nat64_bib_dump(proto=IP_PROTOS.tcp)
736 if bibe.flags & self.config_flags.NAT_IS_STATIC:
738 self.assertEqual(static_bib_num, 0)
739 bibs = self.statistics.get_counter('/nat64/total-bibs')
740 self.assertEqual(bibs[0][0], 0)
742 def test_set_timeouts(self):
743 """ Set NAT64 timeouts """
744 # verify default values
745 timeouts = self.vapi.nat64_get_timeouts()
746 self.assertEqual(timeouts.udp, 300)
747 self.assertEqual(timeouts.icmp, 60)
748 self.assertEqual(timeouts.tcp_transitory, 240)
749 self.assertEqual(timeouts.tcp_established, 7440)
751 # set and verify custom values
752 self.vapi.nat64_set_timeouts(udp=200, tcp_established=7450,
753 tcp_transitory=250, icmp=30)
754 timeouts = self.vapi.nat64_get_timeouts()
755 self.assertEqual(timeouts.udp, 200)
756 self.assertEqual(timeouts.icmp, 30)
757 self.assertEqual(timeouts.tcp_transitory, 250)
758 self.assertEqual(timeouts.tcp_established, 7450)
760 def test_dynamic(self):
761 """ NAT64 dynamic translation test """
762 self.tcp_port_in = 6303
763 self.udp_port_in = 6304
764 self.icmp_id_in = 6305
766 ses_num_start = self.nat64_get_ses_num()
768 self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
769 end_addr=self.nat_addr,
772 flags = self.config_flags.NAT_IS_INSIDE
773 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
774 sw_if_index=self.pg0.sw_if_index)
775 self.vapi.nat64_add_del_interface(is_add=1, flags=0,
776 sw_if_index=self.pg1.sw_if_index)
779 tcpn = self.statistics.get_counter('/nat64/in2out/tcp')[0]
780 udpn = self.statistics.get_counter('/nat64/in2out/udp')[0]
781 icmpn = self.statistics.get_counter('/nat64/in2out/icmp')[0]
782 drops = self.statistics.get_counter('/nat64/in2out/drops')[0]
784 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
785 self.pg0.add_stream(pkts)
786 self.pg_enable_capture(self.pg_interfaces)
788 capture = self.pg1.get_capture(len(pkts))
789 self.verify_capture_out(capture, nat_ip=self.nat_addr,
790 dst_ip=self.pg1.remote_ip4)
792 if_idx = self.pg0.sw_if_index
793 cnt = self.statistics.get_counter('/nat64/in2out/tcp')[0]
794 self.assertEqual(cnt[if_idx] - tcpn[if_idx], 1)
795 cnt = self.statistics.get_counter('/nat64/in2out/udp')[0]
796 self.assertEqual(cnt[if_idx] - udpn[if_idx], 1)
797 cnt = self.statistics.get_counter('/nat64/in2out/icmp')[0]
798 self.assertEqual(cnt[if_idx] - icmpn[if_idx], 1)
799 cnt = self.statistics.get_counter('/nat64/in2out/drops')[0]
800 self.assertEqual(cnt[if_idx] - drops[if_idx], 0)
803 tcpn = self.statistics.get_counter('/nat64/out2in/tcp')[0]
804 udpn = self.statistics.get_counter('/nat64/out2in/udp')[0]
805 icmpn = self.statistics.get_counter('/nat64/out2in/icmp')[0]
806 drops = self.statistics.get_counter('/nat64/out2in/drops')[0]
808 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
809 self.pg1.add_stream(pkts)
810 self.pg_enable_capture(self.pg_interfaces)
812 capture = self.pg0.get_capture(len(pkts))
813 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
814 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
816 if_idx = self.pg1.sw_if_index
817 cnt = self.statistics.get_counter('/nat64/out2in/tcp')[0]
818 self.assertEqual(cnt[if_idx] - tcpn[if_idx], 2)
819 cnt = self.statistics.get_counter('/nat64/out2in/udp')[0]
820 self.assertEqual(cnt[if_idx] - udpn[if_idx], 1)
821 cnt = self.statistics.get_counter('/nat64/out2in/icmp')[0]
822 self.assertEqual(cnt[if_idx] - icmpn[if_idx], 1)
823 cnt = self.statistics.get_counter('/nat64/out2in/drops')[0]
824 self.assertEqual(cnt[if_idx] - drops[if_idx], 0)
826 bibs = self.statistics.get_counter('/nat64/total-bibs')
827 self.assertEqual(bibs[0][0], 3)
828 sessions = self.statistics.get_counter('/nat64/total-sessions')
829 self.assertEqual(sessions[0][0], 3)
832 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
833 self.pg0.add_stream(pkts)
834 self.pg_enable_capture(self.pg_interfaces)
836 capture = self.pg1.get_capture(len(pkts))
837 self.verify_capture_out(capture, nat_ip=self.nat_addr,
838 dst_ip=self.pg1.remote_ip4)
841 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
842 self.pg1.add_stream(pkts)
843 self.pg_enable_capture(self.pg_interfaces)
845 capture = self.pg0.get_capture(len(pkts))
846 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
848 ses_num_end = self.nat64_get_ses_num()
850 self.assertEqual(ses_num_end - ses_num_start, 3)
852 # tenant with specific VRF
853 self.vapi.nat64_add_del_pool_addr_range(start_addr=self.vrf1_nat_addr,
854 end_addr=self.vrf1_nat_addr,
855 vrf_id=self.vrf1_id, is_add=1)
856 flags = self.config_flags.NAT_IS_INSIDE
857 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
858 sw_if_index=self.pg2.sw_if_index)
860 pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
861 self.pg2.add_stream(pkts)
862 self.pg_enable_capture(self.pg_interfaces)
864 capture = self.pg1.get_capture(len(pkts))
865 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
866 dst_ip=self.pg1.remote_ip4)
868 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
869 self.pg1.add_stream(pkts)
870 self.pg_enable_capture(self.pg_interfaces)
872 capture = self.pg2.get_capture(len(pkts))
873 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
875 def test_static(self):
876 """ NAT64 static translation test """
877 self.tcp_port_in = 60303
878 self.udp_port_in = 60304
879 self.icmp_id_in = 60305
880 self.tcp_port_out = 60303
881 self.udp_port_out = 60304
882 self.icmp_id_out = 60305
884 ses_num_start = self.nat64_get_ses_num()
886 self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
887 end_addr=self.nat_addr,
890 flags = self.config_flags.NAT_IS_INSIDE
891 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
892 sw_if_index=self.pg0.sw_if_index)
893 self.vapi.nat64_add_del_interface(is_add=1, flags=0,
894 sw_if_index=self.pg1.sw_if_index)
896 self.vapi.nat64_add_del_static_bib(i_addr=self.pg0.remote_ip6,
897 o_addr=self.nat_addr,
898 i_port=self.tcp_port_in,
899 o_port=self.tcp_port_out,
900 proto=IP_PROTOS.tcp, vrf_id=0,
902 self.vapi.nat64_add_del_static_bib(i_addr=self.pg0.remote_ip6,
903 o_addr=self.nat_addr,
904 i_port=self.udp_port_in,
905 o_port=self.udp_port_out,
906 proto=IP_PROTOS.udp, vrf_id=0,
908 self.vapi.nat64_add_del_static_bib(i_addr=self.pg0.remote_ip6,
909 o_addr=self.nat_addr,
910 i_port=self.icmp_id_in,
911 o_port=self.icmp_id_out,
912 proto=IP_PROTOS.icmp, vrf_id=0,
916 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
917 self.pg0.add_stream(pkts)
918 self.pg_enable_capture(self.pg_interfaces)
920 capture = self.pg1.get_capture(len(pkts))
921 self.verify_capture_out(capture, nat_ip=self.nat_addr,
922 dst_ip=self.pg1.remote_ip4, same_port=True)
925 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
926 self.pg1.add_stream(pkts)
927 self.pg_enable_capture(self.pg_interfaces)
929 capture = self.pg0.get_capture(len(pkts))
930 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
931 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
933 ses_num_end = self.nat64_get_ses_num()
935 self.assertEqual(ses_num_end - ses_num_start, 3)
937 def test_session_timeout(self):
938 """ NAT64 session timeout """
939 self.icmp_id_in = 1234
940 self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
941 end_addr=self.nat_addr,
944 flags = self.config_flags.NAT_IS_INSIDE
945 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
946 sw_if_index=self.pg0.sw_if_index)
947 self.vapi.nat64_add_del_interface(is_add=1, flags=0,
948 sw_if_index=self.pg1.sw_if_index)
949 self.vapi.nat64_set_timeouts(udp=300, tcp_established=5,
953 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
954 self.pg0.add_stream(pkts)
955 self.pg_enable_capture(self.pg_interfaces)
957 capture = self.pg1.get_capture(len(pkts))
959 ses_num_before_timeout = self.nat64_get_ses_num()
961 self.virtual_sleep(15)
963 # ICMP and TCP session after timeout
964 ses_num_after_timeout = self.nat64_get_ses_num()
965 self.assertEqual(ses_num_before_timeout - ses_num_after_timeout, 2)
967 def test_icmp_error(self):
968 """ NAT64 ICMP Error message translation """
969 self.tcp_port_in = 6303
970 self.udp_port_in = 6304
971 self.icmp_id_in = 6305
973 self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
974 end_addr=self.nat_addr,
977 flags = self.config_flags.NAT_IS_INSIDE
978 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
979 sw_if_index=self.pg0.sw_if_index)
980 self.vapi.nat64_add_del_interface(is_add=1, flags=0,
981 sw_if_index=self.pg1.sw_if_index)
983 # send some packets to create sessions
984 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
985 self.pg0.add_stream(pkts)
986 self.pg_enable_capture(self.pg_interfaces)
988 capture_ip4 = self.pg1.get_capture(len(pkts))
989 self.verify_capture_out(capture_ip4,
990 nat_ip=self.nat_addr,
991 dst_ip=self.pg1.remote_ip4)
993 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
994 self.pg1.add_stream(pkts)
995 self.pg_enable_capture(self.pg_interfaces)
997 capture_ip6 = self.pg0.get_capture(len(pkts))
998 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
999 self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
1000 self.pg0.remote_ip6)
1003 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1004 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
1005 ICMPv6DestUnreach(code=1) /
1006 packet[IPv6] for packet in capture_ip6]
1007 self.pg0.add_stream(pkts)
1008 self.pg_enable_capture(self.pg_interfaces)
1010 capture = self.pg1.get_capture(len(pkts))
1011 for packet in capture:
1013 self.assertEqual(packet[IP].src, self.nat_addr)
1014 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1015 self.assertEqual(packet[ICMP].type, 3)
1016 self.assertEqual(packet[ICMP].code, 13)
1017 inner = packet[IPerror]
1018 self.assertEqual(inner.src, self.pg1.remote_ip4)
1019 self.assertEqual(inner.dst, self.nat_addr)
1020 self.assert_packet_checksums_valid(packet)
1021 if inner.haslayer(TCPerror):
1022 self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
1023 elif inner.haslayer(UDPerror):
1024 self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
1026 self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
1028 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1032 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1033 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1034 ICMP(type=3, code=13) /
1035 packet[IP] for packet in capture_ip4]
1036 self.pg1.add_stream(pkts)
1037 self.pg_enable_capture(self.pg_interfaces)
1039 capture = self.pg0.get_capture(len(pkts))
1040 for packet in capture:
1042 self.assertEqual(packet[IPv6].src, ip.src)
1043 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
1044 icmp = packet[ICMPv6DestUnreach]
1045 self.assertEqual(icmp.code, 1)
1046 inner = icmp[IPerror6]
1047 self.assertEqual(inner.src, self.pg0.remote_ip6)
1048 self.assertEqual(inner.dst, ip.src)
1049 self.assert_icmpv6_checksum_valid(packet)
1050 if inner.haslayer(TCPerror):
1051 self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
1052 elif inner.haslayer(UDPerror):
1053 self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
1055 self.assertEqual(inner[ICMPv6EchoRequest].id,
1058 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1061 def test_hairpinning(self):
1062 """ NAT64 hairpinning """
1064 client = self.pg0.remote_hosts[0]
1065 server = self.pg0.remote_hosts[1]
1066 server_tcp_in_port = 22
1067 server_tcp_out_port = 4022
1068 server_udp_in_port = 23
1069 server_udp_out_port = 4023
1070 client_tcp_in_port = 1234
1071 client_udp_in_port = 1235
1072 client_tcp_out_port = 0
1073 client_udp_out_port = 0
1074 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
1075 nat_addr_ip6 = ip.src
1077 self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1078 end_addr=self.nat_addr,
1081 flags = self.config_flags.NAT_IS_INSIDE
1082 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1083 sw_if_index=self.pg0.sw_if_index)
1084 self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1085 sw_if_index=self.pg1.sw_if_index)
1087 self.vapi.nat64_add_del_static_bib(i_addr=server.ip6n,
1088 o_addr=self.nat_addr,
1089 i_port=server_tcp_in_port,
1090 o_port=server_tcp_out_port,
1091 proto=IP_PROTOS.tcp, vrf_id=0,
1093 self.vapi.nat64_add_del_static_bib(i_addr=server.ip6n,
1094 o_addr=self.nat_addr,
1095 i_port=server_udp_in_port,
1096 o_port=server_udp_out_port,
1097 proto=IP_PROTOS.udp, vrf_id=0,
1102 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1103 IPv6(src=client.ip6, dst=nat_addr_ip6) /
1104 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
1106 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1107 IPv6(src=client.ip6, dst=nat_addr_ip6) /
1108 UDP(sport=client_udp_in_port, dport=server_udp_out_port))
1110 self.pg0.add_stream(pkts)
1111 self.pg_enable_capture(self.pg_interfaces)
1113 capture = self.pg0.get_capture(len(pkts))
1114 for packet in capture:
1116 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
1117 self.assertEqual(packet[IPv6].dst, server.ip6)
1118 self.assert_packet_checksums_valid(packet)
1119 if packet.haslayer(TCP):
1120 self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
1121 self.assertEqual(packet[TCP].dport, server_tcp_in_port)
1122 client_tcp_out_port = packet[TCP].sport
1124 self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
1125 self.assertEqual(packet[UDP].dport, server_udp_in_port)
1126 client_udp_out_port = packet[UDP].sport
1128 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1133 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1134 IPv6(src=server.ip6, dst=nat_addr_ip6) /
1135 TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
1137 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1138 IPv6(src=server.ip6, dst=nat_addr_ip6) /
1139 UDP(sport=server_udp_in_port, dport=client_udp_out_port))
1141 self.pg0.add_stream(pkts)
1142 self.pg_enable_capture(self.pg_interfaces)
1144 capture = self.pg0.get_capture(len(pkts))
1145 for packet in capture:
1147 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
1148 self.assertEqual(packet[IPv6].dst, client.ip6)
1149 self.assert_packet_checksums_valid(packet)
1150 if packet.haslayer(TCP):
1151 self.assertEqual(packet[TCP].sport, server_tcp_out_port)
1152 self.assertEqual(packet[TCP].dport, client_tcp_in_port)
1154 self.assertEqual(packet[UDP].sport, server_udp_out_port)
1155 self.assertEqual(packet[UDP].dport, client_udp_in_port)
1157 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1162 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1163 IPv6(src=client.ip6, dst=nat_addr_ip6) /
1164 ICMPv6DestUnreach(code=1) /
1165 packet[IPv6] for packet in capture]
1166 self.pg0.add_stream(pkts)
1167 self.pg_enable_capture(self.pg_interfaces)
1169 capture = self.pg0.get_capture(len(pkts))
1170 for packet in capture:
1172 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
1173 self.assertEqual(packet[IPv6].dst, server.ip6)
1174 icmp = packet[ICMPv6DestUnreach]
1175 self.assertEqual(icmp.code, 1)
1176 inner = icmp[IPerror6]
1177 self.assertEqual(inner.src, server.ip6)
1178 self.assertEqual(inner.dst, nat_addr_ip6)
1179 self.assert_packet_checksums_valid(packet)
1180 if inner.haslayer(TCPerror):
1181 self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
1182 self.assertEqual(inner[TCPerror].dport,
1183 client_tcp_out_port)
1185 self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
1186 self.assertEqual(inner[UDPerror].dport,
1187 client_udp_out_port)
1189 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1192 def test_prefix(self):
1193 """ NAT64 Network-Specific Prefix """
1195 self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1196 end_addr=self.nat_addr,
1199 flags = self.config_flags.NAT_IS_INSIDE
1200 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1201 sw_if_index=self.pg0.sw_if_index)
1202 self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1203 sw_if_index=self.pg1.sw_if_index)
1204 self.vapi.nat64_add_del_pool_addr_range(start_addr=self.vrf1_nat_addr,
1205 end_addr=self.vrf1_nat_addr,
1206 vrf_id=self.vrf1_id, is_add=1)
1207 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1208 sw_if_index=self.pg2.sw_if_index)
1211 global_pref64 = "2001:db8::"
1212 global_pref64_len = 32
1213 global_pref64_str = "{}/{}".format(global_pref64, global_pref64_len)
1214 self.vapi.nat64_add_del_prefix(prefix=global_pref64_str, vrf_id=0,
1217 prefix = self.vapi.nat64_prefix_dump()
1218 self.assertEqual(len(prefix), 1)
1219 self.assertEqual(str(prefix[0].prefix), global_pref64_str)
1220 self.assertEqual(prefix[0].vrf_id, 0)
1222 # Add tenant specific prefix
1223 vrf1_pref64 = "2001:db8:122:300::"
1224 vrf1_pref64_len = 56
1225 vrf1_pref64_str = "{}/{}".format(vrf1_pref64, vrf1_pref64_len)
1226 self.vapi.nat64_add_del_prefix(prefix=vrf1_pref64_str,
1227 vrf_id=self.vrf1_id, is_add=1)
1229 prefix = self.vapi.nat64_prefix_dump()
1230 self.assertEqual(len(prefix), 2)
1233 pkts = self.create_stream_in_ip6(self.pg0,
1236 plen=global_pref64_len)
1237 self.pg0.add_stream(pkts)
1238 self.pg_enable_capture(self.pg_interfaces)
1240 capture = self.pg1.get_capture(len(pkts))
1241 self.verify_capture_out(capture, nat_ip=self.nat_addr,
1242 dst_ip=self.pg1.remote_ip4)
1244 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
1245 self.pg1.add_stream(pkts)
1246 self.pg_enable_capture(self.pg_interfaces)
1248 capture = self.pg0.get_capture(len(pkts))
1249 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
1252 self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
1254 # Tenant specific prefix
1255 pkts = self.create_stream_in_ip6(self.pg2,
1258 plen=vrf1_pref64_len)
1259 self.pg2.add_stream(pkts)
1260 self.pg_enable_capture(self.pg_interfaces)
1262 capture = self.pg1.get_capture(len(pkts))
1263 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
1264 dst_ip=self.pg1.remote_ip4)
1266 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
1267 self.pg1.add_stream(pkts)
1268 self.pg_enable_capture(self.pg_interfaces)
1270 capture = self.pg2.get_capture(len(pkts))
1271 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
1274 self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
1276 def test_unknown_proto(self):
1277 """ NAT64 translate packet with unknown protocol """
1279 self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1280 end_addr=self.nat_addr,
1283 flags = self.config_flags.NAT_IS_INSIDE
1284 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1285 sw_if_index=self.pg0.sw_if_index)
1286 self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1287 sw_if_index=self.pg1.sw_if_index)
1288 remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
1291 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1292 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
1293 TCP(sport=self.tcp_port_in, dport=20))
1294 self.pg0.add_stream(p)
1295 self.pg_enable_capture(self.pg_interfaces)
1297 p = self.pg1.get_capture(1)
1299 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1300 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47) /
1302 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
1303 TCP(sport=1234, dport=1234))
1304 self.pg0.add_stream(p)
1305 self.pg_enable_capture(self.pg_interfaces)
1307 p = self.pg1.get_capture(1)
1310 self.assertEqual(packet[IP].src, self.nat_addr)
1311 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1312 self.assertEqual(packet.haslayer(GRE), 1)
1313 self.assert_packet_checksums_valid(packet)
1315 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1319 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1320 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1322 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
1323 TCP(sport=1234, dport=1234))
1324 self.pg1.add_stream(p)
1325 self.pg_enable_capture(self.pg_interfaces)
1327 p = self.pg0.get_capture(1)
1330 self.assertEqual(packet[IPv6].src, remote_ip6)
1331 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
1332 self.assertEqual(packet[IPv6].nh, 47)
1334 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1337 def test_hairpinning_unknown_proto(self):
1338 """ NAT64 translate packet with unknown protocol - hairpinning """
1340 client = self.pg0.remote_hosts[0]
1341 server = self.pg0.remote_hosts[1]
1342 server_tcp_in_port = 22
1343 server_tcp_out_port = 4022
1344 client_tcp_in_port = 1234
1345 client_tcp_out_port = 1235
1346 server_nat_ip = "10.0.0.100"
1347 client_nat_ip = "10.0.0.110"
1348 server_nat_ip6 = self.compose_ip6(server_nat_ip, '64:ff9b::', 96)
1349 client_nat_ip6 = self.compose_ip6(client_nat_ip, '64:ff9b::', 96)
1351 self.vapi.nat64_add_del_pool_addr_range(start_addr=server_nat_ip,
1352 end_addr=client_nat_ip,
1355 flags = self.config_flags.NAT_IS_INSIDE
1356 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1357 sw_if_index=self.pg0.sw_if_index)
1358 self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1359 sw_if_index=self.pg1.sw_if_index)
1361 self.vapi.nat64_add_del_static_bib(i_addr=server.ip6n,
1362 o_addr=server_nat_ip,
1363 i_port=server_tcp_in_port,
1364 o_port=server_tcp_out_port,
1365 proto=IP_PROTOS.tcp, vrf_id=0,
1368 self.vapi.nat64_add_del_static_bib(i_addr=server.ip6n,
1369 o_addr=server_nat_ip, i_port=0,
1371 proto=IP_PROTOS.gre, vrf_id=0,
1374 self.vapi.nat64_add_del_static_bib(i_addr=client.ip6n,
1375 o_addr=client_nat_ip,
1376 i_port=client_tcp_in_port,
1377 o_port=client_tcp_out_port,
1378 proto=IP_PROTOS.tcp, vrf_id=0,
1382 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1383 IPv6(src=client.ip6, dst=server_nat_ip6) /
1384 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
1385 self.pg0.add_stream(p)
1386 self.pg_enable_capture(self.pg_interfaces)
1388 p = self.pg0.get_capture(1)
1390 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1391 IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre) /
1393 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
1394 TCP(sport=1234, dport=1234))
1395 self.pg0.add_stream(p)
1396 self.pg_enable_capture(self.pg_interfaces)
1398 p = self.pg0.get_capture(1)
1401 self.assertEqual(packet[IPv6].src, client_nat_ip6)
1402 self.assertEqual(packet[IPv6].dst, server.ip6)
1403 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
1405 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1409 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1410 IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre) /
1412 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
1413 TCP(sport=1234, dport=1234))
1414 self.pg0.add_stream(p)
1415 self.pg_enable_capture(self.pg_interfaces)
1417 p = self.pg0.get_capture(1)
1420 self.assertEqual(packet[IPv6].src, server_nat_ip6)
1421 self.assertEqual(packet[IPv6].dst, client.ip6)
1422 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
1424 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1427 def test_one_armed_nat64(self):
1428 """ One armed NAT64 """
1430 remote_host_ip6 = self.compose_ip6(self.pg3.remote_ip4,
1434 self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1435 end_addr=self.nat_addr,
1438 flags = self.config_flags.NAT_IS_INSIDE
1439 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1440 sw_if_index=self.pg3.sw_if_index)
1441 self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1442 sw_if_index=self.pg3.sw_if_index)
1445 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
1446 IPv6(src=self.pg3.remote_ip6, dst=remote_host_ip6) /
1447 TCP(sport=12345, dport=80))
1448 self.pg3.add_stream(p)
1449 self.pg_enable_capture(self.pg_interfaces)
1451 capture = self.pg3.get_capture(1)
1456 self.assertEqual(ip.src, self.nat_addr)
1457 self.assertEqual(ip.dst, self.pg3.remote_ip4)
1458 self.assertNotEqual(tcp.sport, 12345)
1459 external_port = tcp.sport
1460 self.assertEqual(tcp.dport, 80)
1461 self.assert_packet_checksums_valid(p)
1463 self.logger.error(ppp("Unexpected or invalid packet:", p))
1467 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
1468 IP(src=self.pg3.remote_ip4, dst=self.nat_addr) /
1469 TCP(sport=80, dport=external_port))
1470 self.pg3.add_stream(p)
1471 self.pg_enable_capture(self.pg_interfaces)
1473 capture = self.pg3.get_capture(1)
1478 self.assertEqual(ip.src, remote_host_ip6)
1479 self.assertEqual(ip.dst, self.pg3.remote_ip6)
1480 self.assertEqual(tcp.sport, 80)
1481 self.assertEqual(tcp.dport, 12345)
1482 self.assert_packet_checksums_valid(p)
1484 self.logger.error(ppp("Unexpected or invalid packet:", p))
1487 def test_frag_in_order(self):
1488 """ NAT64 translate fragments arriving in order """
1489 self.tcp_port_in = random.randint(1025, 65535)
1491 self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1492 end_addr=self.nat_addr,
1495 flags = self.config_flags.NAT_IS_INSIDE
1496 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1497 sw_if_index=self.pg0.sw_if_index)
1498 self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1499 sw_if_index=self.pg1.sw_if_index)
1503 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
1504 self.tcp_port_in, 20, data)
1505 self.pg0.add_stream(pkts)
1506 self.pg_enable_capture(self.pg_interfaces)
1508 frags = self.pg1.get_capture(len(pkts))
1509 p = self.reass_frags_and_verify(frags,
1511 self.pg1.remote_ip4)
1512 self.assertEqual(p[TCP].dport, 20)
1513 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
1514 self.tcp_port_out = p[TCP].sport
1515 self.assertEqual(data, p[Raw].load)
1518 data = b"A" * 4 + b"b" * 16 + b"C" * 3
1519 pkts = self.create_stream_frag(self.pg1,
1524 self.pg1.add_stream(pkts)
1525 self.pg_enable_capture(self.pg_interfaces)
1527 frags = self.pg0.get_capture(len(pkts))
1528 self.logger.debug(ppc("Captured:", frags))
1529 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
1530 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
1531 self.assertEqual(p[TCP].sport, 20)
1532 self.assertEqual(p[TCP].dport, self.tcp_port_in)
1533 self.assertEqual(data, p[Raw].load)
1535 def test_reass_hairpinning(self):
1536 """ NAT64 fragments hairpinning """
1538 server = self.pg0.remote_hosts[1]
1539 server_in_port = random.randint(1025, 65535)
1540 server_out_port = random.randint(1025, 65535)
1541 client_in_port = random.randint(1025, 65535)
1542 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
1543 nat_addr_ip6 = ip.src
1545 self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1546 end_addr=self.nat_addr,
1549 flags = self.config_flags.NAT_IS_INSIDE
1550 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1551 sw_if_index=self.pg0.sw_if_index)
1552 self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1553 sw_if_index=self.pg1.sw_if_index)
1555 # add static BIB entry for server
1556 self.vapi.nat64_add_del_static_bib(i_addr=server.ip6n,
1557 o_addr=self.nat_addr,
1558 i_port=server_in_port,
1559 o_port=server_out_port,
1560 proto=IP_PROTOS.tcp, vrf_id=0,
1563 # send packet from host to server
1564 pkts = self.create_stream_frag_ip6(self.pg0,
1569 self.pg0.add_stream(pkts)
1570 self.pg_enable_capture(self.pg_interfaces)
1572 frags = self.pg0.get_capture(len(pkts))
1573 self.logger.debug(ppc("Captured:", frags))
1574 p = self.reass_frags_and_verify_ip6(frags, nat_addr_ip6, server.ip6)
1575 self.assertNotEqual(p[TCP].sport, client_in_port)
1576 self.assertEqual(p[TCP].dport, server_in_port)
1577 self.assertEqual(data, p[Raw].load)
1579 def test_frag_out_of_order(self):
1580 """ NAT64 translate fragments arriving out of order """
1581 self.tcp_port_in = random.randint(1025, 65535)
1583 self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1584 end_addr=self.nat_addr,
1587 flags = self.config_flags.NAT_IS_INSIDE
1588 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1589 sw_if_index=self.pg0.sw_if_index)
1590 self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1591 sw_if_index=self.pg1.sw_if_index)
1595 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
1596 self.tcp_port_in, 20, data)
1598 self.pg0.add_stream(pkts)
1599 self.pg_enable_capture(self.pg_interfaces)
1601 frags = self.pg1.get_capture(len(pkts))
1602 p = self.reass_frags_and_verify(frags,
1604 self.pg1.remote_ip4)
1605 self.assertEqual(p[TCP].dport, 20)
1606 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
1607 self.tcp_port_out = p[TCP].sport
1608 self.assertEqual(data, p[Raw].load)
1611 data = b"A" * 4 + b"B" * 16 + b"C" * 3
1612 pkts = self.create_stream_frag(self.pg1,
1618 self.pg1.add_stream(pkts)
1619 self.pg_enable_capture(self.pg_interfaces)
1621 frags = self.pg0.get_capture(len(pkts))
1622 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
1623 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
1624 self.assertEqual(p[TCP].sport, 20)
1625 self.assertEqual(p[TCP].dport, self.tcp_port_in)
1626 self.assertEqual(data, p[Raw].load)
1628 def test_interface_addr(self):
1629 """ Acquire NAT64 pool addresses from interface """
1630 self.vapi.nat64_add_del_interface_addr(
1632 sw_if_index=self.pg4.sw_if_index)
1634 # no address in NAT64 pool
1635 addresses = self.vapi.nat44_address_dump()
1636 self.assertEqual(0, len(addresses))
1638 # configure interface address and check NAT64 address pool
1639 self.pg4.config_ip4()
1640 addresses = self.vapi.nat64_pool_addr_dump()
1641 self.assertEqual(len(addresses), 1)
1643 self.assertEqual(str(addresses[0].address),
1646 # remove interface address and check NAT64 address pool
1647 self.pg4.unconfig_ip4()
1648 addresses = self.vapi.nat64_pool_addr_dump()
1649 self.assertEqual(0, len(addresses))
1651 @unittest.skipUnless(config.extended, "part of extended tests")
1652 def test_ipfix_max_bibs_sessions(self):
1653 """ IPFIX logging maximum session and BIB entries exceeded """
1656 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
1660 self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1661 end_addr=self.nat_addr,
1664 flags = self.config_flags.NAT_IS_INSIDE
1665 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1666 sw_if_index=self.pg0.sw_if_index)
1667 self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1668 sw_if_index=self.pg1.sw_if_index)
1672 for i in range(0, max_bibs):
1673 src = "fd01:aa::%x" % (i)
1674 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1675 IPv6(src=src, dst=remote_host_ip6) /
1676 TCP(sport=12345, dport=80))
1678 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1679 IPv6(src=src, dst=remote_host_ip6) /
1680 TCP(sport=12345, dport=22))
1682 self.pg0.add_stream(pkts)
1683 self.pg_enable_capture(self.pg_interfaces)
1685 self.pg1.get_capture(max_sessions)
1687 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4,
1688 src_address=self.pg3.local_ip4,
1690 template_interval=10)
1691 self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
1692 src_port=self.ipfix_src_port,
1695 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1696 IPv6(src=src, dst=remote_host_ip6) /
1697 TCP(sport=12345, dport=25))
1698 self.pg0.add_stream(p)
1699 self.pg_enable_capture(self.pg_interfaces)
1701 self.pg1.assert_nothing_captured()
1702 self.vapi.ipfix_flush()
1703 capture = self.pg3.get_capture(7)
1704 ipfix = IPFIXDecoder()
1705 # first load template
1707 self.assertTrue(p.haslayer(IPFIX))
1708 self.assertEqual(p[IP].src, self.pg3.local_ip4)
1709 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1710 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1711 self.assertEqual(p[UDP].dport, 4739)
1712 self.assertEqual(p[IPFIX].observationDomainID,
1713 self.ipfix_domain_id)
1714 if p.haslayer(Template):
1715 ipfix.add_template(p.getlayer(Template))
1716 # verify events in data set
1718 if p.haslayer(Data):
1719 data = ipfix.decode_data_set(p.getlayer(Set))
1720 self.verify_ipfix_max_sessions(data, max_sessions)
1722 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1723 IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
1724 TCP(sport=12345, dport=80))
1725 self.pg0.add_stream(p)
1726 self.pg_enable_capture(self.pg_interfaces)
1728 self.pg1.assert_nothing_captured()
1729 self.vapi.ipfix_flush()
1730 capture = self.pg3.get_capture(1)
1731 # verify events in data set
1733 self.assertTrue(p.haslayer(IPFIX))
1734 self.assertEqual(p[IP].src, self.pg3.local_ip4)
1735 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1736 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1737 self.assertEqual(p[UDP].dport, 4739)
1738 self.assertEqual(p[IPFIX].observationDomainID,
1739 self.ipfix_domain_id)
1740 if p.haslayer(Data):
1741 data = ipfix.decode_data_set(p.getlayer(Set))
1742 self.verify_ipfix_max_bibs(data, max_bibs)
1744 def test_ipfix_bib_ses(self):
1745 """ IPFIX logging NAT64 BIB/session create and delete events """
1746 self.tcp_port_in = random.randint(1025, 65535)
1747 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
1751 self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1752 end_addr=self.nat_addr,
1755 flags = self.config_flags.NAT_IS_INSIDE
1756 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1757 sw_if_index=self.pg0.sw_if_index)
1758 self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1759 sw_if_index=self.pg1.sw_if_index)
1760 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4,
1761 src_address=self.pg3.local_ip4,
1763 template_interval=10)
1764 self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
1765 src_port=self.ipfix_src_port,
1769 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1770 IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
1771 TCP(sport=self.tcp_port_in, dport=25))
1772 self.pg0.add_stream(p)
1773 self.pg_enable_capture(self.pg_interfaces)
1775 p = self.pg1.get_capture(1)
1776 self.tcp_port_out = p[0][TCP].sport
1777 self.vapi.ipfix_flush()
1778 capture = self.pg3.get_capture(8)
1779 ipfix = IPFIXDecoder()
1780 # first load template
1782 self.assertTrue(p.haslayer(IPFIX))
1783 self.assertEqual(p[IP].src, self.pg3.local_ip4)
1784 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1785 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1786 self.assertEqual(p[UDP].dport, 4739)
1787 self.assertEqual(p[IPFIX].observationDomainID,
1788 self.ipfix_domain_id)
1789 if p.haslayer(Template):
1790 ipfix.add_template(p.getlayer(Template))
1791 # verify events in data set
1793 if p.haslayer(Data):
1794 data = ipfix.decode_data_set(p.getlayer(Set))
1795 if scapy.compat.orb(data[0][230]) == 10:
1796 self.verify_ipfix_bib(data, 1, self.pg0.remote_ip6)
1797 elif scapy.compat.orb(data[0][230]) == 6:
1798 self.verify_ipfix_nat64_ses(data,
1800 self.pg0.remote_ip6,
1801 self.pg1.remote_ip4,
1804 self.logger.error(ppp("Unexpected or invalid packet: ", p))
1807 self.pg_enable_capture(self.pg_interfaces)
1808 self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1809 end_addr=self.nat_addr,
1812 self.vapi.ipfix_flush()
1813 capture = self.pg3.get_capture(2)
1814 # verify events in data set
1816 self.assertTrue(p.haslayer(IPFIX))
1817 self.assertEqual(p[IP].src, self.pg3.local_ip4)
1818 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1819 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1820 self.assertEqual(p[UDP].dport, 4739)
1821 self.assertEqual(p[IPFIX].observationDomainID,
1822 self.ipfix_domain_id)
1823 if p.haslayer(Data):
1824 data = ipfix.decode_data_set(p.getlayer(Set))
1825 if scapy.compat.orb(data[0][230]) == 11:
1826 self.verify_ipfix_bib(data, 0, self.pg0.remote_ip6)
1827 elif scapy.compat.orb(data[0][230]) == 7:
1828 self.verify_ipfix_nat64_ses(data,
1830 self.pg0.remote_ip6,
1831 self.pg1.remote_ip4,
1834 self.logger.error(ppp("Unexpected or invalid packet: ", p))
1836 def test_syslog_sess(self):
1837 """ Test syslog session creation and deletion """
1838 self.tcp_port_in = random.randint(1025, 65535)
1839 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
1843 self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1844 end_addr=self.nat_addr,
1847 flags = self.config_flags.NAT_IS_INSIDE
1848 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1849 sw_if_index=self.pg0.sw_if_index)
1850 self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1851 sw_if_index=self.pg1.sw_if_index)
1852 self.vapi.syslog_set_filter(
1853 self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_INFO)
1854 self.vapi.syslog_set_sender(self.pg3.local_ip4, self.pg3.remote_ip4)
1856 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1857 IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
1858 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
1859 self.pg0.add_stream(p)
1860 self.pg_enable_capture(self.pg_interfaces)
1862 p = self.pg1.get_capture(1)
1863 self.tcp_port_out = p[0][TCP].sport
1864 capture = self.pg3.get_capture(1)
1865 self.verify_syslog_sess(capture[0][Raw].load, is_ip6=True)
1867 self.pg_enable_capture(self.pg_interfaces)
1869 self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1870 end_addr=self.nat_addr,
1873 capture = self.pg3.get_capture(1)
1874 self.verify_syslog_sess(capture[0][Raw].load, False, True)
1876 def nat64_get_ses_num(self):
1878 Return number of active NAT64 sessions.
1880 st = self.vapi.nat64_st_dump(proto=255)
1883 def clear_nat64(self):
1885 Clear NAT64 configuration.
1887 self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
1888 src_port=self.ipfix_src_port,
1890 self.ipfix_src_port = 4739
1891 self.ipfix_domain_id = 1
1893 self.vapi.syslog_set_filter(
1894 self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_EMERG)
1896 self.vapi.nat64_set_timeouts(udp=300, tcp_established=7440,
1897 tcp_transitory=240, icmp=60)
1899 interfaces = self.vapi.nat64_interface_dump()
1900 for intf in interfaces:
1901 self.vapi.nat64_add_del_interface(is_add=0, flags=intf.flags,
1902 sw_if_index=intf.sw_if_index)
1904 bib = self.vapi.nat64_bib_dump(proto=255)
1906 if bibe.flags & self.config_flags.NAT_IS_STATIC:
1907 self.vapi.nat64_add_del_static_bib(i_addr=bibe.i_addr,
1915 adresses = self.vapi.nat64_pool_addr_dump()
1916 for addr in adresses:
1917 self.vapi.nat64_add_del_pool_addr_range(start_addr=addr.address,
1918 end_addr=addr.address,
1922 prefixes = self.vapi.nat64_prefix_dump()
1923 for prefix in prefixes:
1924 self.vapi.nat64_add_del_prefix(prefix=str(prefix.prefix),
1925 vrf_id=prefix.vrf_id, is_add=0)
1927 bibs = self.statistics.get_counter('/nat64/total-bibs')
1928 self.assertEqual(bibs[0][0], 0)
1929 sessions = self.statistics.get_counter('/nat64/total-sessions')
1930 self.assertEqual(sessions[0][0], 0)
1933 if __name__ == '__main__':
1934 unittest.main(testRunner=VppTestRunner)