12 from framework import tag_fixme_vpp_workers
13 from framework import VppTestCase, VppTestRunner, running_extended_tests
14 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
15 from scapy.data import IP_PROTOS
16 from scapy.layers.inet import IP, TCP, UDP, ICMP
17 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
18 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
19 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply, \
20 ICMPv6ND_NS, ICMPv6ND_NA, ICMPv6NDOptDstLLAddr, fragment6
21 from scapy.layers.l2 import Ether, GRE
22 from scapy.packet import Raw
23 from syslog_rfc5424_parser import SyslogMessage, ParseError
24 from syslog_rfc5424_parser.constants import SyslogSeverity
25 from util import ppc, ppp
26 from vpp_papi import VppEnum
29 @tag_fixme_vpp_workers
30 class TestNAT64(VppTestCase):
31 """ NAT64 Test Cases """
34 def SYSLOG_SEVERITY(self):
35 return VppEnum.vl_api_syslog_severity_t
38 def config_flags(self):
39 return VppEnum.vl_api_nat_config_flags_t
43 super(TestNAT64, cls).setUpClass()
45 cls.tcp_port_in = 6303
46 cls.tcp_port_out = 6303
47 cls.udp_port_in = 6304
48 cls.udp_port_out = 6304
50 cls.icmp_id_out = 6305
51 cls.tcp_external_port = 80
52 cls.nat_addr = '10.0.0.3'
53 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
55 cls.vrf1_nat_addr = '10.0.10.3'
56 cls.ipfix_src_port = 4739
57 cls.ipfix_domain_id = 1
59 cls.create_pg_interfaces(range(6))
60 cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
61 cls.ip6_interfaces.append(cls.pg_interfaces[2])
62 cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
64 cls.vapi.ip_table_add_del(is_add=1,
65 table={'table_id': cls.vrf1_id,
68 cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
70 cls.pg0.generate_remote_hosts(2)
72 for i in cls.ip6_interfaces:
75 i.configure_ipv6_neighbors()
77 for i in cls.ip4_interfaces:
86 cls.pg3.configure_ipv6_neighbors()
92 def tearDownClass(cls):
93 super(TestNAT64, cls).tearDownClass()
96 super(TestNAT64, self).setUp()
97 self.vapi.nat64_plugin_enable_disable(enable=1,
98 bib_buckets=128, st_buckets=256)
101 super(TestNAT64, self).tearDown()
102 if not self.vpp_dead:
103 self.vapi.nat64_plugin_enable_disable(enable=0)
105 def show_commands_at_teardown(self):
106 self.logger.info(self.vapi.cli("show nat64 pool"))
107 self.logger.info(self.vapi.cli("show nat64 interfaces"))
108 self.logger.info(self.vapi.cli("show nat64 prefix"))
109 self.logger.info(self.vapi.cli("show nat64 bib all"))
110 self.logger.info(self.vapi.cli("show nat64 session table all"))
112 def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
114 Create IPv6 packet stream for inside network
116 :param in_if: Inside interface
117 :param out_if: Outside interface
118 :param ttl: Hop Limit of generated packets
119 :param pref: NAT64 prefix
120 :param plen: NAT64 prefix length
124 dst = ''.join(['64:ff9b::', out_if.remote_ip4])
126 dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
129 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
130 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
131 TCP(sport=self.tcp_port_in, dport=20))
135 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
136 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
137 UDP(sport=self.udp_port_in, dport=20))
141 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
142 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
143 ICMPv6EchoRequest(id=self.icmp_id_in))
148 def create_stream_out(self, out_if, dst_ip=None, ttl=64,
149 use_inside_ports=False):
151 Create packet stream for outside network
153 :param out_if: Outside interface
154 :param dst_ip: Destination IP address (Default use global NAT address)
155 :param ttl: TTL of generated packets
156 :param use_inside_ports: Use inside NAT ports as destination ports
157 instead of outside ports
160 dst_ip = self.nat_addr
161 if not use_inside_ports:
162 tcp_port = self.tcp_port_out
163 udp_port = self.udp_port_out
164 icmp_id = self.icmp_id_out
166 tcp_port = self.tcp_port_in
167 udp_port = self.udp_port_in
168 icmp_id = self.icmp_id_in
171 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
172 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
173 TCP(dport=tcp_port, sport=20))
177 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
178 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
179 UDP(dport=udp_port, sport=20))
183 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
184 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
185 ICMP(id=icmp_id, type='echo-reply'))
190 def verify_capture_out(self, capture, nat_ip=None, same_port=False,
191 dst_ip=None, is_ip6=False, ignore_port=False):
193 Verify captured packets on outside network
195 :param capture: Captured packets
196 :param nat_ip: Translated IP address (Default use global NAT address)
197 :param same_port: Source port number is not translated (Default False)
198 :param dst_ip: Destination IP address (Default do not verify)
199 :param is_ip6: If L3 protocol is IPv6 (Default False)
203 ICMP46 = ICMPv6EchoRequest
208 nat_ip = self.nat_addr
209 for packet in capture:
212 self.assert_packet_checksums_valid(packet)
213 self.assertEqual(packet[IP46].src, nat_ip)
214 if dst_ip is not None:
215 self.assertEqual(packet[IP46].dst, dst_ip)
216 if packet.haslayer(TCP):
220 packet[TCP].sport, self.tcp_port_in)
223 packet[TCP].sport, self.tcp_port_in)
224 self.tcp_port_out = packet[TCP].sport
225 self.assert_packet_checksums_valid(packet)
226 elif packet.haslayer(UDP):
230 packet[UDP].sport, self.udp_port_in)
233 packet[UDP].sport, self.udp_port_in)
234 self.udp_port_out = packet[UDP].sport
239 packet[ICMP46].id, self.icmp_id_in)
242 packet[ICMP46].id, self.icmp_id_in)
243 self.icmp_id_out = packet[ICMP46].id
244 self.assert_packet_checksums_valid(packet)
246 self.logger.error(ppp("Unexpected or invalid packet "
247 "(outside network):", packet))
250 def verify_capture_in_ip6(self, capture, src_ip, dst_ip):
252 Verify captured IPv6 packets on inside network
254 :param capture: Captured packets
255 :param src_ip: Source IP
256 :param dst_ip: Destination IP address
258 for packet in capture:
260 self.assertEqual(packet[IPv6].src, src_ip)
261 self.assertEqual(packet[IPv6].dst, dst_ip)
262 self.assert_packet_checksums_valid(packet)
263 if packet.haslayer(TCP):
264 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
265 elif packet.haslayer(UDP):
266 self.assertEqual(packet[UDP].dport, self.udp_port_in)
268 self.assertEqual(packet[ICMPv6EchoReply].id,
271 self.logger.error(ppp("Unexpected or invalid packet "
272 "(inside network):", packet))
275 def create_stream_frag(self, src_if, dst, sport, dport, data,
276 proto=IP_PROTOS.tcp, echo_reply=False):
278 Create fragmented packet stream
280 :param src_if: Source interface
281 :param dst: Destination IPv4 address
282 :param sport: Source port
283 :param dport: Destination port
284 :param data: Payload data
285 :param proto: protocol (TCP, UDP, ICMP)
286 :param echo_reply: use echo_reply if protocol is ICMP
289 if proto == IP_PROTOS.tcp:
290 p = (IP(src=src_if.remote_ip4, dst=dst) /
291 TCP(sport=sport, dport=dport) /
293 p = p.__class__(scapy.compat.raw(p))
294 chksum = p[TCP].chksum
295 proto_header = TCP(sport=sport, dport=dport, chksum=chksum)
296 elif proto == IP_PROTOS.udp:
297 proto_header = UDP(sport=sport, dport=dport)
298 elif proto == IP_PROTOS.icmp:
300 proto_header = ICMP(id=sport, type='echo-request')
302 proto_header = ICMP(id=sport, type='echo-reply')
304 raise Exception("Unsupported protocol")
305 id = random.randint(0, 65535)
307 if proto == IP_PROTOS.tcp:
310 raw = Raw(data[0:16])
311 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
312 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id) /
316 if proto == IP_PROTOS.tcp:
317 raw = Raw(data[4:20])
319 raw = Raw(data[16:32])
320 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
321 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id,
325 if proto == IP_PROTOS.tcp:
329 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
330 IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=proto,
336 def create_stream_frag_ip6(self, src_if, dst, sport, dport, data,
337 pref=None, plen=0, frag_size=128):
339 Create fragmented packet stream
341 :param src_if: Source interface
342 :param dst: Destination IPv4 address
343 :param sport: Source TCP port
344 :param dport: Destination TCP port
345 :param data: Payload data
346 :param pref: NAT64 prefix
347 :param plen: NAT64 prefix length
348 :param fragsize: size of fragments
352 dst_ip6 = ''.join(['64:ff9b::', dst])
354 dst_ip6 = self.compose_ip6(dst, pref, plen)
356 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
357 IPv6(src=src_if.remote_ip6, dst=dst_ip6) /
358 IPv6ExtHdrFragment(id=random.randint(0, 65535)) /
359 TCP(sport=sport, dport=dport) /
362 return fragment6(p, frag_size)
364 def reass_frags_and_verify(self, frags, src, dst):
366 Reassemble and verify fragmented packet
368 :param frags: Captured fragments
369 :param src: Source IPv4 address to verify
370 :param dst: Destination IPv4 address to verify
372 :returns: Reassembled IPv4 packet
376 self.assertEqual(p[IP].src, src)
377 self.assertEqual(p[IP].dst, dst)
378 self.assert_ip_checksum_valid(p)
379 buffer.seek(p[IP].frag * 8)
380 buffer.write(bytes(p[IP].payload))
381 ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst,
382 proto=frags[0][IP].proto)
383 if ip.proto == IP_PROTOS.tcp:
384 p = (ip / TCP(buffer.getvalue()))
385 self.logger.debug(ppp("Reassembled:", p))
386 self.assert_tcp_checksum_valid(p)
387 elif ip.proto == IP_PROTOS.udp:
388 p = (ip / UDP(buffer.getvalue()[:8]) /
389 Raw(buffer.getvalue()[8:]))
390 elif ip.proto == IP_PROTOS.icmp:
391 p = (ip / ICMP(buffer.getvalue()))
394 def reass_frags_and_verify_ip6(self, frags, src, dst):
396 Reassemble and verify fragmented packet
398 :param frags: Captured fragments
399 :param src: Source IPv6 address to verify
400 :param dst: Destination IPv6 address to verify
402 :returns: Reassembled IPv6 packet
406 self.assertEqual(p[IPv6].src, src)
407 self.assertEqual(p[IPv6].dst, dst)
408 buffer.seek(p[IPv6ExtHdrFragment].offset * 8)
409 buffer.write(bytes(p[IPv6ExtHdrFragment].payload))
410 ip = IPv6(src=frags[0][IPv6].src, dst=frags[0][IPv6].dst,
411 nh=frags[0][IPv6ExtHdrFragment].nh)
412 if ip.nh == IP_PROTOS.tcp:
413 p = (ip / TCP(buffer.getvalue()))
414 elif ip.nh == IP_PROTOS.udp:
415 p = (ip / UDP(buffer.getvalue()))
416 self.logger.debug(ppp("Reassembled:", p))
417 self.assert_packet_checksums_valid(p)
420 def verify_ipfix_max_bibs(self, data, limit):
422 Verify IPFIX maximum BIB entries exceeded event
424 :param data: Decoded IPFIX data records
425 :param limit: Number of maximum BIB entries that can be created.
427 self.assertEqual(1, len(data))
430 self.assertEqual(scapy.compat.orb(record[230]), 13)
431 # natQuotaExceededEvent
432 self.assertEqual(struct.pack("I", 2), record[466])
434 self.assertEqual(struct.pack("I", limit), record[472])
436 def verify_ipfix_bib(self, data, is_create, src_addr):
438 Verify IPFIX NAT64 BIB create and delete events
440 :param data: Decoded IPFIX data records
441 :param is_create: Create event if nonzero value otherwise delete event
442 :param src_addr: IPv6 source address
444 self.assertEqual(1, len(data))
448 self.assertEqual(scapy.compat.orb(record[230]), 10)
450 self.assertEqual(scapy.compat.orb(record[230]), 11)
452 self.assertEqual(src_addr, str(ipaddress.IPv6Address(record[27])))
453 # postNATSourceIPv4Address
454 self.assertEqual(self.nat_addr_n, record[225])
456 self.assertEqual(IP_PROTOS.tcp, scapy.compat.orb(record[4]))
458 self.assertEqual(struct.pack("!I", 0), record[234])
459 # sourceTransportPort
460 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
461 # postNAPTSourceTransportPort
462 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
464 def verify_ipfix_nat64_ses(self, data, is_create, src_addr, dst_addr,
467 Verify IPFIX NAT64 session create and delete events
469 :param data: Decoded IPFIX data records
470 :param is_create: Create event if nonzero value otherwise delete event
471 :param src_addr: IPv6 source address
472 :param dst_addr: IPv4 destination address
473 :param dst_port: destination TCP port
475 self.assertEqual(1, len(data))
479 self.assertEqual(scapy.compat.orb(record[230]), 6)
481 self.assertEqual(scapy.compat.orb(record[230]), 7)
483 self.assertEqual(src_addr, str(ipaddress.IPv6Address(record[27])))
484 # destinationIPv6Address
485 self.assertEqual(socket.inet_pton(socket.AF_INET6,
486 self.compose_ip6(dst_addr,
490 # postNATSourceIPv4Address
491 self.assertEqual(self.nat_addr_n, record[225])
492 # postNATDestinationIPv4Address
493 self.assertEqual(socket.inet_pton(socket.AF_INET, dst_addr),
496 self.assertEqual(IP_PROTOS.tcp, scapy.compat.orb(record[4]))
498 self.assertEqual(struct.pack("!I", 0), record[234])
499 # sourceTransportPort
500 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
501 # postNAPTSourceTransportPort
502 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
503 # destinationTransportPort
504 self.assertEqual(struct.pack("!H", dst_port), record[11])
505 # postNAPTDestinationTransportPort
506 self.assertEqual(struct.pack("!H", dst_port), record[228])
508 def verify_syslog_sess(self, data, is_add=True, is_ip6=False):
509 message = data.decode('utf-8')
511 message = SyslogMessage.parse(message)
512 except ParseError as e:
516 self.assertEqual(message.severity, SyslogSeverity.info)
517 self.assertEqual(message.appname, 'NAT')
518 self.assertEqual(message.msgid, 'SADD' if is_add else 'SDEL')
519 sd_params = message.sd.get('nsess')
520 self.assertTrue(sd_params is not None)
522 self.assertEqual(sd_params.get('IATYP'), 'IPv6')
523 self.assertEqual(sd_params.get('ISADDR'), self.pg0.remote_ip6)
525 self.assertEqual(sd_params.get('IATYP'), 'IPv4')
526 self.assertEqual(sd_params.get('ISADDR'), self.pg0.remote_ip4)
527 self.assertTrue(sd_params.get('SSUBIX') is not None)
528 self.assertEqual(sd_params.get('ISPORT'), "%d" % self.tcp_port_in)
529 self.assertEqual(sd_params.get('XATYP'), 'IPv4')
530 self.assertEqual(sd_params.get('XSADDR'), self.nat_addr)
531 self.assertEqual(sd_params.get('XSPORT'), "%d" % self.tcp_port_out)
532 self.assertEqual(sd_params.get('PROTO'), "%d" % IP_PROTOS.tcp)
533 self.assertEqual(sd_params.get('SVLAN'), '0')
534 self.assertEqual(sd_params.get('XDADDR'), self.pg1.remote_ip4)
535 self.assertEqual(sd_params.get('XDPORT'),
536 "%d" % self.tcp_external_port)
538 def compose_ip6(self, ip4, pref, plen):
540 Compose IPv4-embedded IPv6 addresses
542 :param ip4: IPv4 address
543 :param pref: IPv6 prefix
544 :param plen: IPv6 prefix length
545 :returns: IPv4-embedded IPv6 addresses
547 pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
548 ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
563 pref_n[10] = ip4_n[3]
567 pref_n[10] = ip4_n[2]
568 pref_n[11] = ip4_n[3]
571 pref_n[10] = ip4_n[1]
572 pref_n[11] = ip4_n[2]
573 pref_n[12] = ip4_n[3]
575 pref_n[12] = ip4_n[0]
576 pref_n[13] = ip4_n[1]
577 pref_n[14] = ip4_n[2]
578 pref_n[15] = ip4_n[3]
579 packed_pref_n = b''.join([scapy.compat.chb(x) for x in pref_n])
580 return socket.inet_ntop(socket.AF_INET6, packed_pref_n)
582 def verify_ipfix_max_sessions(self, data, limit):
584 Verify IPFIX maximum session entries exceeded event
586 :param data: Decoded IPFIX data records
587 :param limit: Number of maximum session entries that can be created.
589 self.assertEqual(1, len(data))
592 self.assertEqual(scapy.compat.orb(record[230]), 13)
593 # natQuotaExceededEvent
594 self.assertEqual(struct.pack("I", 1), record[466])
596 self.assertEqual(struct.pack("I", limit), record[471])
598 def test_nat64_inside_interface_handles_neighbor_advertisement(self):
599 """ NAT64 inside interface handles Neighbor Advertisement """
601 flags = self.config_flags.NAT_IS_INSIDE
602 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
603 sw_if_index=self.pg5.sw_if_index)
606 ping = (Ether(dst=self.pg5.local_mac, src=self.pg5.remote_mac) /
607 IPv6(src=self.pg5.remote_ip6, dst=self.pg5.local_ip6) /
610 self.pg5.add_stream(pkts)
611 self.pg_enable_capture(self.pg_interfaces)
614 # Wait for Neighbor Solicitation
615 capture = self.pg5.get_capture(len(pkts))
618 self.assertEqual(packet[IPv6].src, self.pg5.local_ip6)
619 self.assertEqual(packet.haslayer(ICMPv6ND_NS), 1)
620 tgt = packet[ICMPv6ND_NS].tgt
622 self.logger.error(ppp("Unexpected or invalid packet:", packet))
625 # Send Neighbor Advertisement
626 p = (Ether(dst=self.pg5.local_mac, src=self.pg5.remote_mac) /
627 IPv6(src=self.pg5.remote_ip6, dst=self.pg5.local_ip6) /
628 ICMPv6ND_NA(tgt=tgt) /
629 ICMPv6NDOptDstLLAddr(lladdr=self.pg5.remote_mac))
631 self.pg5.add_stream(pkts)
632 self.pg_enable_capture(self.pg_interfaces)
635 # Try to send ping again
637 self.pg5.add_stream(pkts)
638 self.pg_enable_capture(self.pg_interfaces)
641 # Wait for ping reply
642 capture = self.pg5.get_capture(len(pkts))
645 self.assertEqual(packet[IPv6].src, self.pg5.local_ip6)
646 self.assertEqual(packet[IPv6].dst, self.pg5.remote_ip6)
647 self.assertEqual(packet.haslayer(ICMPv6EchoReply), 1)
649 self.logger.error(ppp("Unexpected or invalid packet:", packet))
653 """ Add/delete address to NAT64 pool """
656 self.vapi.nat64_add_del_pool_addr_range(start_addr=nat_addr,
658 vrf_id=0xFFFFFFFF, is_add=1)
660 addresses = self.vapi.nat64_pool_addr_dump()
661 self.assertEqual(len(addresses), 1)
662 self.assertEqual(str(addresses[0].address), nat_addr)
664 self.vapi.nat64_add_del_pool_addr_range(start_addr=nat_addr,
666 vrf_id=0xFFFFFFFF, is_add=0)
668 addresses = self.vapi.nat64_pool_addr_dump()
669 self.assertEqual(len(addresses), 0)
671 def test_interface(self):
672 """ Enable/disable NAT64 feature on the interface """
673 flags = self.config_flags.NAT_IS_INSIDE
674 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
675 sw_if_index=self.pg0.sw_if_index)
676 self.vapi.nat64_add_del_interface(is_add=1, flags=0,
677 sw_if_index=self.pg1.sw_if_index)
679 interfaces = self.vapi.nat64_interface_dump()
680 self.assertEqual(len(interfaces), 2)
683 for intf in interfaces:
684 if intf.sw_if_index == self.pg0.sw_if_index:
685 self.assertEqual(intf.flags, self.config_flags.NAT_IS_INSIDE)
687 elif intf.sw_if_index == self.pg1.sw_if_index:
688 self.assertEqual(intf.flags, self.config_flags.NAT_IS_OUTSIDE)
690 self.assertTrue(pg0_found)
691 self.assertTrue(pg1_found)
693 features = self.vapi.cli("show interface features pg0")
694 self.assertIn('nat64-in2out', features)
695 features = self.vapi.cli("show interface features pg1")
696 self.assertIn('nat64-out2in', features)
698 self.vapi.nat64_add_del_interface(is_add=0, flags=flags,
699 sw_if_index=self.pg0.sw_if_index)
700 self.vapi.nat64_add_del_interface(is_add=0, flags=flags,
701 sw_if_index=self.pg1.sw_if_index)
703 interfaces = self.vapi.nat64_interface_dump()
704 self.assertEqual(len(interfaces), 0)
706 def test_static_bib(self):
707 """ Add/delete static BIB entry """
708 in_addr = '2001:db8:85a3::8a2e:370:7334'
709 out_addr = '10.1.1.3'
712 proto = IP_PROTOS.tcp
714 self.vapi.nat64_add_del_static_bib(i_addr=in_addr, o_addr=out_addr,
715 i_port=in_port, o_port=out_port,
716 proto=proto, vrf_id=0, is_add=1)
717 bib = self.vapi.nat64_bib_dump(proto=IP_PROTOS.tcp)
720 if bibe.flags & self.config_flags.NAT_IS_STATIC:
722 self.assertEqual(str(bibe.i_addr), in_addr)
723 self.assertEqual(str(bibe.o_addr), out_addr)
724 self.assertEqual(bibe.i_port, in_port)
725 self.assertEqual(bibe.o_port, out_port)
726 self.assertEqual(static_bib_num, 1)
727 bibs = self.statistics.get_counter('/nat64/total-bibs')
728 self.assertEqual(bibs[0][0], 1)
730 self.vapi.nat64_add_del_static_bib(i_addr=in_addr, o_addr=out_addr,
731 i_port=in_port, o_port=out_port,
732 proto=proto, vrf_id=0, is_add=0)
733 bib = self.vapi.nat64_bib_dump(proto=IP_PROTOS.tcp)
736 if bibe.flags & self.config_flags.NAT_IS_STATIC:
738 self.assertEqual(static_bib_num, 0)
739 bibs = self.statistics.get_counter('/nat64/total-bibs')
740 self.assertEqual(bibs[0][0], 0)
742 def test_set_timeouts(self):
743 """ Set NAT64 timeouts """
744 # verify default values
745 timeouts = self.vapi.nat64_get_timeouts()
746 self.assertEqual(timeouts.udp, 300)
747 self.assertEqual(timeouts.icmp, 60)
748 self.assertEqual(timeouts.tcp_transitory, 240)
749 self.assertEqual(timeouts.tcp_established, 7440)
751 # set and verify custom values
752 self.vapi.nat64_set_timeouts(udp=200, tcp_established=7450,
753 tcp_transitory=250, icmp=30)
754 timeouts = self.vapi.nat64_get_timeouts()
755 self.assertEqual(timeouts.udp, 200)
756 self.assertEqual(timeouts.icmp, 30)
757 self.assertEqual(timeouts.tcp_transitory, 250)
758 self.assertEqual(timeouts.tcp_established, 7450)
760 def test_dynamic(self):
761 """ NAT64 dynamic translation test """
762 self.tcp_port_in = 6303
763 self.udp_port_in = 6304
764 self.icmp_id_in = 6305
766 ses_num_start = self.nat64_get_ses_num()
768 self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
769 end_addr=self.nat_addr,
772 flags = self.config_flags.NAT_IS_INSIDE
773 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
774 sw_if_index=self.pg0.sw_if_index)
775 self.vapi.nat64_add_del_interface(is_add=1, flags=0,
776 sw_if_index=self.pg1.sw_if_index)
779 tcpn = self.statistics.get_counter('/nat64/in2out/tcp')[0]
780 udpn = self.statistics.get_counter('/nat64/in2out/udp')[0]
781 icmpn = self.statistics.get_counter('/nat64/in2out/icmp')[0]
782 drops = self.statistics.get_counter('/nat64/in2out/drops')[0]
784 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
785 self.pg0.add_stream(pkts)
786 self.pg_enable_capture(self.pg_interfaces)
788 capture = self.pg1.get_capture(len(pkts))
789 self.verify_capture_out(capture, nat_ip=self.nat_addr,
790 dst_ip=self.pg1.remote_ip4)
792 if_idx = self.pg0.sw_if_index
793 cnt = self.statistics.get_counter('/nat64/in2out/tcp')[0]
794 self.assertEqual(cnt[if_idx] - tcpn[if_idx], 1)
795 cnt = self.statistics.get_counter('/nat64/in2out/udp')[0]
796 self.assertEqual(cnt[if_idx] - udpn[if_idx], 1)
797 cnt = self.statistics.get_counter('/nat64/in2out/icmp')[0]
798 self.assertEqual(cnt[if_idx] - icmpn[if_idx], 1)
799 cnt = self.statistics.get_counter('/nat64/in2out/drops')[0]
800 self.assertEqual(cnt[if_idx] - drops[if_idx], 0)
803 tcpn = self.statistics.get_counter('/nat64/out2in/tcp')[0]
804 udpn = self.statistics.get_counter('/nat64/out2in/udp')[0]
805 icmpn = self.statistics.get_counter('/nat64/out2in/icmp')[0]
806 drops = self.statistics.get_counter('/nat64/out2in/drops')[0]
808 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
809 self.pg1.add_stream(pkts)
810 self.pg_enable_capture(self.pg_interfaces)
812 capture = self.pg0.get_capture(len(pkts))
813 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
814 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
816 if_idx = self.pg1.sw_if_index
817 cnt = self.statistics.get_counter('/nat64/out2in/tcp')[0]
818 self.assertEqual(cnt[if_idx] - tcpn[if_idx], 2)
819 cnt = self.statistics.get_counter('/nat64/out2in/udp')[0]
820 self.assertEqual(cnt[if_idx] - udpn[if_idx], 1)
821 cnt = self.statistics.get_counter('/nat64/out2in/icmp')[0]
822 self.assertEqual(cnt[if_idx] - icmpn[if_idx], 1)
823 cnt = self.statistics.get_counter('/nat64/out2in/drops')[0]
824 self.assertEqual(cnt[if_idx] - drops[if_idx], 0)
826 bibs = self.statistics.get_counter('/nat64/total-bibs')
827 self.assertEqual(bibs[0][0], 3)
828 sessions = self.statistics.get_counter('/nat64/total-sessions')
829 self.assertEqual(sessions[0][0], 3)
832 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
833 self.pg0.add_stream(pkts)
834 self.pg_enable_capture(self.pg_interfaces)
836 capture = self.pg1.get_capture(len(pkts))
837 self.verify_capture_out(capture, nat_ip=self.nat_addr,
838 dst_ip=self.pg1.remote_ip4)
841 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
842 self.pg1.add_stream(pkts)
843 self.pg_enable_capture(self.pg_interfaces)
845 capture = self.pg0.get_capture(len(pkts))
846 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
848 ses_num_end = self.nat64_get_ses_num()
850 self.assertEqual(ses_num_end - ses_num_start, 3)
852 # tenant with specific VRF
853 self.vapi.nat64_add_del_pool_addr_range(start_addr=self.vrf1_nat_addr,
854 end_addr=self.vrf1_nat_addr,
855 vrf_id=self.vrf1_id, is_add=1)
856 flags = self.config_flags.NAT_IS_INSIDE
857 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
858 sw_if_index=self.pg2.sw_if_index)
860 pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
861 self.pg2.add_stream(pkts)
862 self.pg_enable_capture(self.pg_interfaces)
864 capture = self.pg1.get_capture(len(pkts))
865 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
866 dst_ip=self.pg1.remote_ip4)
868 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
869 self.pg1.add_stream(pkts)
870 self.pg_enable_capture(self.pg_interfaces)
872 capture = self.pg2.get_capture(len(pkts))
873 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
875 def test_static(self):
876 """ NAT64 static translation test """
877 self.tcp_port_in = 60303
878 self.udp_port_in = 60304
879 self.icmp_id_in = 60305
880 self.tcp_port_out = 60303
881 self.udp_port_out = 60304
882 self.icmp_id_out = 60305
884 ses_num_start = self.nat64_get_ses_num()
886 self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
887 end_addr=self.nat_addr,
890 flags = self.config_flags.NAT_IS_INSIDE
891 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
892 sw_if_index=self.pg0.sw_if_index)
893 self.vapi.nat64_add_del_interface(is_add=1, flags=0,
894 sw_if_index=self.pg1.sw_if_index)
896 self.vapi.nat64_add_del_static_bib(i_addr=self.pg0.remote_ip6,
897 o_addr=self.nat_addr,
898 i_port=self.tcp_port_in,
899 o_port=self.tcp_port_out,
900 proto=IP_PROTOS.tcp, vrf_id=0,
902 self.vapi.nat64_add_del_static_bib(i_addr=self.pg0.remote_ip6,
903 o_addr=self.nat_addr,
904 i_port=self.udp_port_in,
905 o_port=self.udp_port_out,
906 proto=IP_PROTOS.udp, vrf_id=0,
908 self.vapi.nat64_add_del_static_bib(i_addr=self.pg0.remote_ip6,
909 o_addr=self.nat_addr,
910 i_port=self.icmp_id_in,
911 o_port=self.icmp_id_out,
912 proto=IP_PROTOS.icmp, vrf_id=0,
916 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
917 self.pg0.add_stream(pkts)
918 self.pg_enable_capture(self.pg_interfaces)
920 capture = self.pg1.get_capture(len(pkts))
921 self.verify_capture_out(capture, nat_ip=self.nat_addr,
922 dst_ip=self.pg1.remote_ip4, same_port=True)
925 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
926 self.pg1.add_stream(pkts)
927 self.pg_enable_capture(self.pg_interfaces)
929 capture = self.pg0.get_capture(len(pkts))
930 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
931 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
933 ses_num_end = self.nat64_get_ses_num()
935 self.assertEqual(ses_num_end - ses_num_start, 3)
937 @unittest.skipUnless(running_extended_tests, "part of extended tests")
938 def test_session_timeout(self):
939 """ NAT64 session timeout """
940 self.icmp_id_in = 1234
941 self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
942 end_addr=self.nat_addr,
945 flags = self.config_flags.NAT_IS_INSIDE
946 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
947 sw_if_index=self.pg0.sw_if_index)
948 self.vapi.nat64_add_del_interface(is_add=1, flags=0,
949 sw_if_index=self.pg1.sw_if_index)
950 self.vapi.nat64_set_timeouts(udp=300, tcp_established=5,
954 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
955 self.pg0.add_stream(pkts)
956 self.pg_enable_capture(self.pg_interfaces)
958 capture = self.pg1.get_capture(len(pkts))
960 ses_num_before_timeout = self.nat64_get_ses_num()
964 # ICMP and TCP session after timeout
965 ses_num_after_timeout = self.nat64_get_ses_num()
966 self.assertEqual(ses_num_before_timeout - ses_num_after_timeout, 2)
968 def test_icmp_error(self):
969 """ NAT64 ICMP Error message translation """
970 self.tcp_port_in = 6303
971 self.udp_port_in = 6304
972 self.icmp_id_in = 6305
974 self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
975 end_addr=self.nat_addr,
978 flags = self.config_flags.NAT_IS_INSIDE
979 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
980 sw_if_index=self.pg0.sw_if_index)
981 self.vapi.nat64_add_del_interface(is_add=1, flags=0,
982 sw_if_index=self.pg1.sw_if_index)
984 # send some packets to create sessions
985 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
986 self.pg0.add_stream(pkts)
987 self.pg_enable_capture(self.pg_interfaces)
989 capture_ip4 = self.pg1.get_capture(len(pkts))
990 self.verify_capture_out(capture_ip4,
991 nat_ip=self.nat_addr,
992 dst_ip=self.pg1.remote_ip4)
994 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
995 self.pg1.add_stream(pkts)
996 self.pg_enable_capture(self.pg_interfaces)
998 capture_ip6 = self.pg0.get_capture(len(pkts))
999 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
1000 self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
1001 self.pg0.remote_ip6)
1004 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1005 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
1006 ICMPv6DestUnreach(code=1) /
1007 packet[IPv6] for packet in capture_ip6]
1008 self.pg0.add_stream(pkts)
1009 self.pg_enable_capture(self.pg_interfaces)
1011 capture = self.pg1.get_capture(len(pkts))
1012 for packet in capture:
1014 self.assertEqual(packet[IP].src, self.nat_addr)
1015 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1016 self.assertEqual(packet[ICMP].type, 3)
1017 self.assertEqual(packet[ICMP].code, 13)
1018 inner = packet[IPerror]
1019 self.assertEqual(inner.src, self.pg1.remote_ip4)
1020 self.assertEqual(inner.dst, self.nat_addr)
1021 self.assert_packet_checksums_valid(packet)
1022 if inner.haslayer(TCPerror):
1023 self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
1024 elif inner.haslayer(UDPerror):
1025 self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
1027 self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
1029 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1033 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1034 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1035 ICMP(type=3, code=13) /
1036 packet[IP] for packet in capture_ip4]
1037 self.pg1.add_stream(pkts)
1038 self.pg_enable_capture(self.pg_interfaces)
1040 capture = self.pg0.get_capture(len(pkts))
1041 for packet in capture:
1043 self.assertEqual(packet[IPv6].src, ip.src)
1044 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
1045 icmp = packet[ICMPv6DestUnreach]
1046 self.assertEqual(icmp.code, 1)
1047 inner = icmp[IPerror6]
1048 self.assertEqual(inner.src, self.pg0.remote_ip6)
1049 self.assertEqual(inner.dst, ip.src)
1050 self.assert_icmpv6_checksum_valid(packet)
1051 if inner.haslayer(TCPerror):
1052 self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
1053 elif inner.haslayer(UDPerror):
1054 self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
1056 self.assertEqual(inner[ICMPv6EchoRequest].id,
1059 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1062 def test_hairpinning(self):
1063 """ NAT64 hairpinning """
1065 client = self.pg0.remote_hosts[0]
1066 server = self.pg0.remote_hosts[1]
1067 server_tcp_in_port = 22
1068 server_tcp_out_port = 4022
1069 server_udp_in_port = 23
1070 server_udp_out_port = 4023
1071 client_tcp_in_port = 1234
1072 client_udp_in_port = 1235
1073 client_tcp_out_port = 0
1074 client_udp_out_port = 0
1075 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
1076 nat_addr_ip6 = ip.src
1078 self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1079 end_addr=self.nat_addr,
1082 flags = self.config_flags.NAT_IS_INSIDE
1083 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1084 sw_if_index=self.pg0.sw_if_index)
1085 self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1086 sw_if_index=self.pg1.sw_if_index)
1088 self.vapi.nat64_add_del_static_bib(i_addr=server.ip6n,
1089 o_addr=self.nat_addr,
1090 i_port=server_tcp_in_port,
1091 o_port=server_tcp_out_port,
1092 proto=IP_PROTOS.tcp, vrf_id=0,
1094 self.vapi.nat64_add_del_static_bib(i_addr=server.ip6n,
1095 o_addr=self.nat_addr,
1096 i_port=server_udp_in_port,
1097 o_port=server_udp_out_port,
1098 proto=IP_PROTOS.udp, vrf_id=0,
1103 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1104 IPv6(src=client.ip6, dst=nat_addr_ip6) /
1105 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
1107 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1108 IPv6(src=client.ip6, dst=nat_addr_ip6) /
1109 UDP(sport=client_udp_in_port, dport=server_udp_out_port))
1111 self.pg0.add_stream(pkts)
1112 self.pg_enable_capture(self.pg_interfaces)
1114 capture = self.pg0.get_capture(len(pkts))
1115 for packet in capture:
1117 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
1118 self.assertEqual(packet[IPv6].dst, server.ip6)
1119 self.assert_packet_checksums_valid(packet)
1120 if packet.haslayer(TCP):
1121 self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
1122 self.assertEqual(packet[TCP].dport, server_tcp_in_port)
1123 client_tcp_out_port = packet[TCP].sport
1125 self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
1126 self.assertEqual(packet[UDP].dport, server_udp_in_port)
1127 client_udp_out_port = packet[UDP].sport
1129 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1134 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1135 IPv6(src=server.ip6, dst=nat_addr_ip6) /
1136 TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
1138 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1139 IPv6(src=server.ip6, dst=nat_addr_ip6) /
1140 UDP(sport=server_udp_in_port, dport=client_udp_out_port))
1142 self.pg0.add_stream(pkts)
1143 self.pg_enable_capture(self.pg_interfaces)
1145 capture = self.pg0.get_capture(len(pkts))
1146 for packet in capture:
1148 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
1149 self.assertEqual(packet[IPv6].dst, client.ip6)
1150 self.assert_packet_checksums_valid(packet)
1151 if packet.haslayer(TCP):
1152 self.assertEqual(packet[TCP].sport, server_tcp_out_port)
1153 self.assertEqual(packet[TCP].dport, client_tcp_in_port)
1155 self.assertEqual(packet[UDP].sport, server_udp_out_port)
1156 self.assertEqual(packet[UDP].dport, client_udp_in_port)
1158 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1163 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1164 IPv6(src=client.ip6, dst=nat_addr_ip6) /
1165 ICMPv6DestUnreach(code=1) /
1166 packet[IPv6] for packet in capture]
1167 self.pg0.add_stream(pkts)
1168 self.pg_enable_capture(self.pg_interfaces)
1170 capture = self.pg0.get_capture(len(pkts))
1171 for packet in capture:
1173 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
1174 self.assertEqual(packet[IPv6].dst, server.ip6)
1175 icmp = packet[ICMPv6DestUnreach]
1176 self.assertEqual(icmp.code, 1)
1177 inner = icmp[IPerror6]
1178 self.assertEqual(inner.src, server.ip6)
1179 self.assertEqual(inner.dst, nat_addr_ip6)
1180 self.assert_packet_checksums_valid(packet)
1181 if inner.haslayer(TCPerror):
1182 self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
1183 self.assertEqual(inner[TCPerror].dport,
1184 client_tcp_out_port)
1186 self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
1187 self.assertEqual(inner[UDPerror].dport,
1188 client_udp_out_port)
1190 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1193 def test_prefix(self):
1194 """ NAT64 Network-Specific Prefix """
1196 self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1197 end_addr=self.nat_addr,
1200 flags = self.config_flags.NAT_IS_INSIDE
1201 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1202 sw_if_index=self.pg0.sw_if_index)
1203 self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1204 sw_if_index=self.pg1.sw_if_index)
1205 self.vapi.nat64_add_del_pool_addr_range(start_addr=self.vrf1_nat_addr,
1206 end_addr=self.vrf1_nat_addr,
1207 vrf_id=self.vrf1_id, is_add=1)
1208 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1209 sw_if_index=self.pg2.sw_if_index)
1212 global_pref64 = "2001:db8::"
1213 global_pref64_len = 32
1214 global_pref64_str = "{}/{}".format(global_pref64, global_pref64_len)
1215 self.vapi.nat64_add_del_prefix(prefix=global_pref64_str, vrf_id=0,
1218 prefix = self.vapi.nat64_prefix_dump()
1219 self.assertEqual(len(prefix), 1)
1220 self.assertEqual(str(prefix[0].prefix), global_pref64_str)
1221 self.assertEqual(prefix[0].vrf_id, 0)
1223 # Add tenant specific prefix
1224 vrf1_pref64 = "2001:db8:122:300::"
1225 vrf1_pref64_len = 56
1226 vrf1_pref64_str = "{}/{}".format(vrf1_pref64, vrf1_pref64_len)
1227 self.vapi.nat64_add_del_prefix(prefix=vrf1_pref64_str,
1228 vrf_id=self.vrf1_id, is_add=1)
1230 prefix = self.vapi.nat64_prefix_dump()
1231 self.assertEqual(len(prefix), 2)
1234 pkts = self.create_stream_in_ip6(self.pg0,
1237 plen=global_pref64_len)
1238 self.pg0.add_stream(pkts)
1239 self.pg_enable_capture(self.pg_interfaces)
1241 capture = self.pg1.get_capture(len(pkts))
1242 self.verify_capture_out(capture, nat_ip=self.nat_addr,
1243 dst_ip=self.pg1.remote_ip4)
1245 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
1246 self.pg1.add_stream(pkts)
1247 self.pg_enable_capture(self.pg_interfaces)
1249 capture = self.pg0.get_capture(len(pkts))
1250 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
1253 self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
1255 # Tenant specific prefix
1256 pkts = self.create_stream_in_ip6(self.pg2,
1259 plen=vrf1_pref64_len)
1260 self.pg2.add_stream(pkts)
1261 self.pg_enable_capture(self.pg_interfaces)
1263 capture = self.pg1.get_capture(len(pkts))
1264 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
1265 dst_ip=self.pg1.remote_ip4)
1267 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
1268 self.pg1.add_stream(pkts)
1269 self.pg_enable_capture(self.pg_interfaces)
1271 capture = self.pg2.get_capture(len(pkts))
1272 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
1275 self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
1277 def test_unknown_proto(self):
1278 """ NAT64 translate packet with unknown protocol """
1280 self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1281 end_addr=self.nat_addr,
1284 flags = self.config_flags.NAT_IS_INSIDE
1285 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1286 sw_if_index=self.pg0.sw_if_index)
1287 self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1288 sw_if_index=self.pg1.sw_if_index)
1289 remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
1292 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1293 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
1294 TCP(sport=self.tcp_port_in, dport=20))
1295 self.pg0.add_stream(p)
1296 self.pg_enable_capture(self.pg_interfaces)
1298 p = self.pg1.get_capture(1)
1300 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1301 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47) /
1303 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
1304 TCP(sport=1234, dport=1234))
1305 self.pg0.add_stream(p)
1306 self.pg_enable_capture(self.pg_interfaces)
1308 p = self.pg1.get_capture(1)
1311 self.assertEqual(packet[IP].src, self.nat_addr)
1312 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1313 self.assertEqual(packet.haslayer(GRE), 1)
1314 self.assert_packet_checksums_valid(packet)
1316 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1320 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1321 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1323 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
1324 TCP(sport=1234, dport=1234))
1325 self.pg1.add_stream(p)
1326 self.pg_enable_capture(self.pg_interfaces)
1328 p = self.pg0.get_capture(1)
1331 self.assertEqual(packet[IPv6].src, remote_ip6)
1332 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
1333 self.assertEqual(packet[IPv6].nh, 47)
1335 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1338 def test_hairpinning_unknown_proto(self):
1339 """ NAT64 translate packet with unknown protocol - hairpinning """
1341 client = self.pg0.remote_hosts[0]
1342 server = self.pg0.remote_hosts[1]
1343 server_tcp_in_port = 22
1344 server_tcp_out_port = 4022
1345 client_tcp_in_port = 1234
1346 client_tcp_out_port = 1235
1347 server_nat_ip = "10.0.0.100"
1348 client_nat_ip = "10.0.0.110"
1349 server_nat_ip6 = self.compose_ip6(server_nat_ip, '64:ff9b::', 96)
1350 client_nat_ip6 = self.compose_ip6(client_nat_ip, '64:ff9b::', 96)
1352 self.vapi.nat64_add_del_pool_addr_range(start_addr=server_nat_ip,
1353 end_addr=client_nat_ip,
1356 flags = self.config_flags.NAT_IS_INSIDE
1357 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1358 sw_if_index=self.pg0.sw_if_index)
1359 self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1360 sw_if_index=self.pg1.sw_if_index)
1362 self.vapi.nat64_add_del_static_bib(i_addr=server.ip6n,
1363 o_addr=server_nat_ip,
1364 i_port=server_tcp_in_port,
1365 o_port=server_tcp_out_port,
1366 proto=IP_PROTOS.tcp, vrf_id=0,
1369 self.vapi.nat64_add_del_static_bib(i_addr=server.ip6n,
1370 o_addr=server_nat_ip, i_port=0,
1372 proto=IP_PROTOS.gre, vrf_id=0,
1375 self.vapi.nat64_add_del_static_bib(i_addr=client.ip6n,
1376 o_addr=client_nat_ip,
1377 i_port=client_tcp_in_port,
1378 o_port=client_tcp_out_port,
1379 proto=IP_PROTOS.tcp, vrf_id=0,
1383 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1384 IPv6(src=client.ip6, dst=server_nat_ip6) /
1385 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
1386 self.pg0.add_stream(p)
1387 self.pg_enable_capture(self.pg_interfaces)
1389 p = self.pg0.get_capture(1)
1391 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1392 IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre) /
1394 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
1395 TCP(sport=1234, dport=1234))
1396 self.pg0.add_stream(p)
1397 self.pg_enable_capture(self.pg_interfaces)
1399 p = self.pg0.get_capture(1)
1402 self.assertEqual(packet[IPv6].src, client_nat_ip6)
1403 self.assertEqual(packet[IPv6].dst, server.ip6)
1404 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
1406 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1410 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1411 IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre) /
1413 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
1414 TCP(sport=1234, dport=1234))
1415 self.pg0.add_stream(p)
1416 self.pg_enable_capture(self.pg_interfaces)
1418 p = self.pg0.get_capture(1)
1421 self.assertEqual(packet[IPv6].src, server_nat_ip6)
1422 self.assertEqual(packet[IPv6].dst, client.ip6)
1423 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
1425 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1428 def test_one_armed_nat64(self):
1429 """ One armed NAT64 """
1431 remote_host_ip6 = self.compose_ip6(self.pg3.remote_ip4,
1435 self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1436 end_addr=self.nat_addr,
1439 flags = self.config_flags.NAT_IS_INSIDE
1440 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1441 sw_if_index=self.pg3.sw_if_index)
1442 self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1443 sw_if_index=self.pg3.sw_if_index)
1446 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
1447 IPv6(src=self.pg3.remote_ip6, dst=remote_host_ip6) /
1448 TCP(sport=12345, dport=80))
1449 self.pg3.add_stream(p)
1450 self.pg_enable_capture(self.pg_interfaces)
1452 capture = self.pg3.get_capture(1)
1457 self.assertEqual(ip.src, self.nat_addr)
1458 self.assertEqual(ip.dst, self.pg3.remote_ip4)
1459 self.assertNotEqual(tcp.sport, 12345)
1460 external_port = tcp.sport
1461 self.assertEqual(tcp.dport, 80)
1462 self.assert_packet_checksums_valid(p)
1464 self.logger.error(ppp("Unexpected or invalid packet:", p))
1468 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
1469 IP(src=self.pg3.remote_ip4, dst=self.nat_addr) /
1470 TCP(sport=80, dport=external_port))
1471 self.pg3.add_stream(p)
1472 self.pg_enable_capture(self.pg_interfaces)
1474 capture = self.pg3.get_capture(1)
1479 self.assertEqual(ip.src, remote_host_ip6)
1480 self.assertEqual(ip.dst, self.pg3.remote_ip6)
1481 self.assertEqual(tcp.sport, 80)
1482 self.assertEqual(tcp.dport, 12345)
1483 self.assert_packet_checksums_valid(p)
1485 self.logger.error(ppp("Unexpected or invalid packet:", p))
1488 def test_frag_in_order(self):
1489 """ NAT64 translate fragments arriving in order """
1490 self.tcp_port_in = random.randint(1025, 65535)
1492 self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1493 end_addr=self.nat_addr,
1496 flags = self.config_flags.NAT_IS_INSIDE
1497 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1498 sw_if_index=self.pg0.sw_if_index)
1499 self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1500 sw_if_index=self.pg1.sw_if_index)
1504 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
1505 self.tcp_port_in, 20, data)
1506 self.pg0.add_stream(pkts)
1507 self.pg_enable_capture(self.pg_interfaces)
1509 frags = self.pg1.get_capture(len(pkts))
1510 p = self.reass_frags_and_verify(frags,
1512 self.pg1.remote_ip4)
1513 self.assertEqual(p[TCP].dport, 20)
1514 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
1515 self.tcp_port_out = p[TCP].sport
1516 self.assertEqual(data, p[Raw].load)
1519 data = b"A" * 4 + b"b" * 16 + b"C" * 3
1520 pkts = self.create_stream_frag(self.pg1,
1525 self.pg1.add_stream(pkts)
1526 self.pg_enable_capture(self.pg_interfaces)
1528 frags = self.pg0.get_capture(len(pkts))
1529 self.logger.debug(ppc("Captured:", frags))
1530 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
1531 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
1532 self.assertEqual(p[TCP].sport, 20)
1533 self.assertEqual(p[TCP].dport, self.tcp_port_in)
1534 self.assertEqual(data, p[Raw].load)
1536 def test_reass_hairpinning(self):
1537 """ NAT64 fragments hairpinning """
1539 server = self.pg0.remote_hosts[1]
1540 server_in_port = random.randint(1025, 65535)
1541 server_out_port = random.randint(1025, 65535)
1542 client_in_port = random.randint(1025, 65535)
1543 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
1544 nat_addr_ip6 = ip.src
1546 self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1547 end_addr=self.nat_addr,
1550 flags = self.config_flags.NAT_IS_INSIDE
1551 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1552 sw_if_index=self.pg0.sw_if_index)
1553 self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1554 sw_if_index=self.pg1.sw_if_index)
1556 # add static BIB entry for server
1557 self.vapi.nat64_add_del_static_bib(i_addr=server.ip6n,
1558 o_addr=self.nat_addr,
1559 i_port=server_in_port,
1560 o_port=server_out_port,
1561 proto=IP_PROTOS.tcp, vrf_id=0,
1564 # send packet from host to server
1565 pkts = self.create_stream_frag_ip6(self.pg0,
1570 self.pg0.add_stream(pkts)
1571 self.pg_enable_capture(self.pg_interfaces)
1573 frags = self.pg0.get_capture(len(pkts))
1574 self.logger.debug(ppc("Captured:", frags))
1575 p = self.reass_frags_and_verify_ip6(frags, nat_addr_ip6, server.ip6)
1576 self.assertNotEqual(p[TCP].sport, client_in_port)
1577 self.assertEqual(p[TCP].dport, server_in_port)
1578 self.assertEqual(data, p[Raw].load)
1580 def test_frag_out_of_order(self):
1581 """ NAT64 translate fragments arriving out of order """
1582 self.tcp_port_in = random.randint(1025, 65535)
1584 self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1585 end_addr=self.nat_addr,
1588 flags = self.config_flags.NAT_IS_INSIDE
1589 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1590 sw_if_index=self.pg0.sw_if_index)
1591 self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1592 sw_if_index=self.pg1.sw_if_index)
1596 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
1597 self.tcp_port_in, 20, data)
1599 self.pg0.add_stream(pkts)
1600 self.pg_enable_capture(self.pg_interfaces)
1602 frags = self.pg1.get_capture(len(pkts))
1603 p = self.reass_frags_and_verify(frags,
1605 self.pg1.remote_ip4)
1606 self.assertEqual(p[TCP].dport, 20)
1607 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
1608 self.tcp_port_out = p[TCP].sport
1609 self.assertEqual(data, p[Raw].load)
1612 data = b"A" * 4 + b"B" * 16 + b"C" * 3
1613 pkts = self.create_stream_frag(self.pg1,
1619 self.pg1.add_stream(pkts)
1620 self.pg_enable_capture(self.pg_interfaces)
1622 frags = self.pg0.get_capture(len(pkts))
1623 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
1624 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
1625 self.assertEqual(p[TCP].sport, 20)
1626 self.assertEqual(p[TCP].dport, self.tcp_port_in)
1627 self.assertEqual(data, p[Raw].load)
1629 def test_interface_addr(self):
1630 """ Acquire NAT64 pool addresses from interface """
1631 self.vapi.nat64_add_del_interface_addr(
1633 sw_if_index=self.pg4.sw_if_index)
1635 # no address in NAT64 pool
1636 addresses = self.vapi.nat44_address_dump()
1637 self.assertEqual(0, len(addresses))
1639 # configure interface address and check NAT64 address pool
1640 self.pg4.config_ip4()
1641 addresses = self.vapi.nat64_pool_addr_dump()
1642 self.assertEqual(len(addresses), 1)
1644 self.assertEqual(str(addresses[0].address),
1647 # remove interface address and check NAT64 address pool
1648 self.pg4.unconfig_ip4()
1649 addresses = self.vapi.nat64_pool_addr_dump()
1650 self.assertEqual(0, len(addresses))
1652 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1653 def test_ipfix_max_bibs_sessions(self):
1654 """ IPFIX logging maximum session and BIB entries exceeded """
1657 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
1661 self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1662 end_addr=self.nat_addr,
1665 flags = self.config_flags.NAT_IS_INSIDE
1666 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1667 sw_if_index=self.pg0.sw_if_index)
1668 self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1669 sw_if_index=self.pg1.sw_if_index)
1673 for i in range(0, max_bibs):
1674 src = "fd01:aa::%x" % (i)
1675 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1676 IPv6(src=src, dst=remote_host_ip6) /
1677 TCP(sport=12345, dport=80))
1679 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1680 IPv6(src=src, dst=remote_host_ip6) /
1681 TCP(sport=12345, dport=22))
1683 self.pg0.add_stream(pkts)
1684 self.pg_enable_capture(self.pg_interfaces)
1686 self.pg1.get_capture(max_sessions)
1688 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4,
1689 src_address=self.pg3.local_ip4,
1691 template_interval=10)
1692 self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
1693 src_port=self.ipfix_src_port,
1696 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1697 IPv6(src=src, dst=remote_host_ip6) /
1698 TCP(sport=12345, dport=25))
1699 self.pg0.add_stream(p)
1700 self.pg_enable_capture(self.pg_interfaces)
1702 self.pg1.assert_nothing_captured()
1704 self.vapi.ipfix_flush()
1705 capture = self.pg3.get_capture(7)
1706 ipfix = IPFIXDecoder()
1707 # first load template
1709 self.assertTrue(p.haslayer(IPFIX))
1710 self.assertEqual(p[IP].src, self.pg3.local_ip4)
1711 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1712 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1713 self.assertEqual(p[UDP].dport, 4739)
1714 self.assertEqual(p[IPFIX].observationDomainID,
1715 self.ipfix_domain_id)
1716 if p.haslayer(Template):
1717 ipfix.add_template(p.getlayer(Template))
1718 # verify events in data set
1720 if p.haslayer(Data):
1721 data = ipfix.decode_data_set(p.getlayer(Set))
1722 self.verify_ipfix_max_sessions(data, max_sessions)
1724 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1725 IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
1726 TCP(sport=12345, dport=80))
1727 self.pg0.add_stream(p)
1728 self.pg_enable_capture(self.pg_interfaces)
1730 self.pg1.assert_nothing_captured()
1732 self.vapi.ipfix_flush()
1733 capture = self.pg3.get_capture(1)
1734 # verify events in data set
1736 self.assertTrue(p.haslayer(IPFIX))
1737 self.assertEqual(p[IP].src, self.pg3.local_ip4)
1738 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1739 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1740 self.assertEqual(p[UDP].dport, 4739)
1741 self.assertEqual(p[IPFIX].observationDomainID,
1742 self.ipfix_domain_id)
1743 if p.haslayer(Data):
1744 data = ipfix.decode_data_set(p.getlayer(Set))
1745 self.verify_ipfix_max_bibs(data, max_bibs)
1747 def test_ipfix_bib_ses(self):
1748 """ IPFIX logging NAT64 BIB/session create and delete events """
1749 self.tcp_port_in = random.randint(1025, 65535)
1750 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
1754 self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1755 end_addr=self.nat_addr,
1758 flags = self.config_flags.NAT_IS_INSIDE
1759 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1760 sw_if_index=self.pg0.sw_if_index)
1761 self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1762 sw_if_index=self.pg1.sw_if_index)
1763 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4,
1764 src_address=self.pg3.local_ip4,
1766 template_interval=10)
1767 self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
1768 src_port=self.ipfix_src_port,
1772 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1773 IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
1774 TCP(sport=self.tcp_port_in, dport=25))
1775 self.pg0.add_stream(p)
1776 self.pg_enable_capture(self.pg_interfaces)
1778 p = self.pg1.get_capture(1)
1779 self.tcp_port_out = p[0][TCP].sport
1780 self.vapi.ipfix_flush()
1781 capture = self.pg3.get_capture(8)
1782 ipfix = IPFIXDecoder()
1783 # first load template
1785 self.assertTrue(p.haslayer(IPFIX))
1786 self.assertEqual(p[IP].src, self.pg3.local_ip4)
1787 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1788 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1789 self.assertEqual(p[UDP].dport, 4739)
1790 self.assertEqual(p[IPFIX].observationDomainID,
1791 self.ipfix_domain_id)
1792 if p.haslayer(Template):
1793 ipfix.add_template(p.getlayer(Template))
1794 # verify events in data set
1796 if p.haslayer(Data):
1797 data = ipfix.decode_data_set(p.getlayer(Set))
1798 if scapy.compat.orb(data[0][230]) == 10:
1799 self.verify_ipfix_bib(data, 1, self.pg0.remote_ip6)
1800 elif scapy.compat.orb(data[0][230]) == 6:
1801 self.verify_ipfix_nat64_ses(data,
1803 self.pg0.remote_ip6,
1804 self.pg1.remote_ip4,
1807 self.logger.error(ppp("Unexpected or invalid packet: ", p))
1810 self.pg_enable_capture(self.pg_interfaces)
1811 self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1812 end_addr=self.nat_addr,
1815 self.vapi.ipfix_flush()
1816 capture = self.pg3.get_capture(2)
1817 # verify events in data set
1819 self.assertTrue(p.haslayer(IPFIX))
1820 self.assertEqual(p[IP].src, self.pg3.local_ip4)
1821 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1822 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1823 self.assertEqual(p[UDP].dport, 4739)
1824 self.assertEqual(p[IPFIX].observationDomainID,
1825 self.ipfix_domain_id)
1826 if p.haslayer(Data):
1827 data = ipfix.decode_data_set(p.getlayer(Set))
1828 if scapy.compat.orb(data[0][230]) == 11:
1829 self.verify_ipfix_bib(data, 0, self.pg0.remote_ip6)
1830 elif scapy.compat.orb(data[0][230]) == 7:
1831 self.verify_ipfix_nat64_ses(data,
1833 self.pg0.remote_ip6,
1834 self.pg1.remote_ip4,
1837 self.logger.error(ppp("Unexpected or invalid packet: ", p))
1839 def test_syslog_sess(self):
1840 """ Test syslog session creation and deletion """
1841 self.tcp_port_in = random.randint(1025, 65535)
1842 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
1846 self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1847 end_addr=self.nat_addr,
1850 flags = self.config_flags.NAT_IS_INSIDE
1851 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1852 sw_if_index=self.pg0.sw_if_index)
1853 self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1854 sw_if_index=self.pg1.sw_if_index)
1855 self.vapi.syslog_set_filter(
1856 self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_INFO)
1857 self.vapi.syslog_set_sender(self.pg3.local_ip4, self.pg3.remote_ip4)
1859 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1860 IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
1861 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
1862 self.pg0.add_stream(p)
1863 self.pg_enable_capture(self.pg_interfaces)
1865 p = self.pg1.get_capture(1)
1866 self.tcp_port_out = p[0][TCP].sport
1867 capture = self.pg3.get_capture(1)
1868 self.verify_syslog_sess(capture[0][Raw].load, is_ip6=True)
1870 self.pg_enable_capture(self.pg_interfaces)
1872 self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1873 end_addr=self.nat_addr,
1876 capture = self.pg3.get_capture(1)
1877 self.verify_syslog_sess(capture[0][Raw].load, False, True)
1879 def nat64_get_ses_num(self):
1881 Return number of active NAT64 sessions.
1883 st = self.vapi.nat64_st_dump(proto=255)
1886 def clear_nat64(self):
1888 Clear NAT64 configuration.
1890 self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
1891 src_port=self.ipfix_src_port,
1893 self.ipfix_src_port = 4739
1894 self.ipfix_domain_id = 1
1896 self.vapi.syslog_set_filter(
1897 self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_EMERG)
1899 self.vapi.nat64_set_timeouts(udp=300, tcp_established=7440,
1900 tcp_transitory=240, icmp=60)
1902 interfaces = self.vapi.nat64_interface_dump()
1903 for intf in interfaces:
1904 self.vapi.nat64_add_del_interface(is_add=0, flags=intf.flags,
1905 sw_if_index=intf.sw_if_index)
1907 bib = self.vapi.nat64_bib_dump(proto=255)
1909 if bibe.flags & self.config_flags.NAT_IS_STATIC:
1910 self.vapi.nat64_add_del_static_bib(i_addr=bibe.i_addr,
1918 adresses = self.vapi.nat64_pool_addr_dump()
1919 for addr in adresses:
1920 self.vapi.nat64_add_del_pool_addr_range(start_addr=addr.address,
1921 end_addr=addr.address,
1925 prefixes = self.vapi.nat64_prefix_dump()
1926 for prefix in prefixes:
1927 self.vapi.nat64_add_del_prefix(prefix=str(prefix.prefix),
1928 vrf_id=prefix.vrf_id, is_add=0)
1930 bibs = self.statistics.get_counter('/nat64/total-bibs')
1931 self.assertEqual(bibs[0][0], 0)
1932 sessions = self.statistics.get_counter('/nat64/total-sessions')
1933 self.assertEqual(sessions[0][0], 0)
1936 if __name__ == '__main__':
1937 unittest.main(testRunner=VppTestRunner)