12 from framework import VppTestCase, VppTestRunner, running_extended_tests
13 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
14 from scapy.data import IP_PROTOS
15 from scapy.layers.inet import IP, TCP, UDP, ICMP
16 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
17 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
18 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply, \
19 ICMPv6ND_NS, ICMPv6ND_NA, ICMPv6NDOptDstLLAddr, fragment6
20 from scapy.layers.l2 import Ether, GRE
21 from scapy.packet import Raw
22 from syslog_rfc5424_parser import SyslogMessage, ParseError
23 from syslog_rfc5424_parser.constants import SyslogSeverity
24 from util import ppc, ppp
25 from vpp_papi import VppEnum
28 class TestNAT64(VppTestCase):
29 """ NAT64 Test Cases """
32 def SYSLOG_SEVERITY(self):
33 return VppEnum.vl_api_syslog_severity_t
36 def config_flags(self):
37 return VppEnum.vl_api_nat_config_flags_t
41 super(TestNAT64, cls).setUpClass()
43 cls.tcp_port_in = 6303
44 cls.tcp_port_out = 6303
45 cls.udp_port_in = 6304
46 cls.udp_port_out = 6304
48 cls.icmp_id_out = 6305
49 cls.tcp_external_port = 80
50 cls.nat_addr = '10.0.0.3'
51 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
53 cls.vrf1_nat_addr = '10.0.10.3'
54 cls.ipfix_src_port = 4739
55 cls.ipfix_domain_id = 1
57 cls.create_pg_interfaces(range(6))
58 cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
59 cls.ip6_interfaces.append(cls.pg_interfaces[2])
60 cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
62 cls.vapi.ip_table_add_del(is_add=1,
63 table={'table_id': cls.vrf1_id,
66 cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
68 cls.pg0.generate_remote_hosts(2)
70 for i in cls.ip6_interfaces:
73 i.configure_ipv6_neighbors()
75 for i in cls.ip4_interfaces:
84 cls.pg3.configure_ipv6_neighbors()
90 def tearDownClass(cls):
91 super(TestNAT64, cls).tearDownClass()
94 super(TestNAT64, self).setUp()
95 self.vapi.nat64_plugin_enable_disable(enable=1,
96 bib_buckets=128, st_buckets=256)
99 super(TestNAT64, self).tearDown()
100 if not self.vpp_dead:
101 self.vapi.nat64_plugin_enable_disable(enable=0)
103 def show_commands_at_teardown(self):
104 self.logger.info(self.vapi.cli("show nat64 pool"))
105 self.logger.info(self.vapi.cli("show nat64 interfaces"))
106 self.logger.info(self.vapi.cli("show nat64 prefix"))
107 self.logger.info(self.vapi.cli("show nat64 bib all"))
108 self.logger.info(self.vapi.cli("show nat64 session table all"))
110 def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
112 Create IPv6 packet stream for inside network
114 :param in_if: Inside interface
115 :param out_if: Outside interface
116 :param ttl: Hop Limit of generated packets
117 :param pref: NAT64 prefix
118 :param plen: NAT64 prefix length
122 dst = ''.join(['64:ff9b::', out_if.remote_ip4])
124 dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
127 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
128 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
129 TCP(sport=self.tcp_port_in, dport=20))
133 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
134 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
135 UDP(sport=self.udp_port_in, dport=20))
139 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
140 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
141 ICMPv6EchoRequest(id=self.icmp_id_in))
146 def create_stream_out(self, out_if, dst_ip=None, ttl=64,
147 use_inside_ports=False):
149 Create packet stream for outside network
151 :param out_if: Outside interface
152 :param dst_ip: Destination IP address (Default use global NAT address)
153 :param ttl: TTL of generated packets
154 :param use_inside_ports: Use inside NAT ports as destination ports
155 instead of outside ports
158 dst_ip = self.nat_addr
159 if not use_inside_ports:
160 tcp_port = self.tcp_port_out
161 udp_port = self.udp_port_out
162 icmp_id = self.icmp_id_out
164 tcp_port = self.tcp_port_in
165 udp_port = self.udp_port_in
166 icmp_id = self.icmp_id_in
169 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
170 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
171 TCP(dport=tcp_port, sport=20))
175 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
176 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
177 UDP(dport=udp_port, sport=20))
181 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
182 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
183 ICMP(id=icmp_id, type='echo-reply'))
188 def verify_capture_out(self, capture, nat_ip=None, same_port=False,
189 dst_ip=None, is_ip6=False, ignore_port=False):
191 Verify captured packets on outside network
193 :param capture: Captured packets
194 :param nat_ip: Translated IP address (Default use global NAT address)
195 :param same_port: Source port number is not translated (Default False)
196 :param dst_ip: Destination IP address (Default do not verify)
197 :param is_ip6: If L3 protocol is IPv6 (Default False)
201 ICMP46 = ICMPv6EchoRequest
206 nat_ip = self.nat_addr
207 for packet in capture:
210 self.assert_packet_checksums_valid(packet)
211 self.assertEqual(packet[IP46].src, nat_ip)
212 if dst_ip is not None:
213 self.assertEqual(packet[IP46].dst, dst_ip)
214 if packet.haslayer(TCP):
218 packet[TCP].sport, self.tcp_port_in)
221 packet[TCP].sport, self.tcp_port_in)
222 self.tcp_port_out = packet[TCP].sport
223 self.assert_packet_checksums_valid(packet)
224 elif packet.haslayer(UDP):
228 packet[UDP].sport, self.udp_port_in)
231 packet[UDP].sport, self.udp_port_in)
232 self.udp_port_out = packet[UDP].sport
237 packet[ICMP46].id, self.icmp_id_in)
240 packet[ICMP46].id, self.icmp_id_in)
241 self.icmp_id_out = packet[ICMP46].id
242 self.assert_packet_checksums_valid(packet)
244 self.logger.error(ppp("Unexpected or invalid packet "
245 "(outside network):", packet))
248 def verify_capture_in_ip6(self, capture, src_ip, dst_ip):
250 Verify captured IPv6 packets on inside network
252 :param capture: Captured packets
253 :param src_ip: Source IP
254 :param dst_ip: Destination IP address
256 for packet in capture:
258 self.assertEqual(packet[IPv6].src, src_ip)
259 self.assertEqual(packet[IPv6].dst, dst_ip)
260 self.assert_packet_checksums_valid(packet)
261 if packet.haslayer(TCP):
262 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
263 elif packet.haslayer(UDP):
264 self.assertEqual(packet[UDP].dport, self.udp_port_in)
266 self.assertEqual(packet[ICMPv6EchoReply].id,
269 self.logger.error(ppp("Unexpected or invalid packet "
270 "(inside network):", packet))
273 def create_stream_frag(self, src_if, dst, sport, dport, data,
274 proto=IP_PROTOS.tcp, echo_reply=False):
276 Create fragmented packet stream
278 :param src_if: Source interface
279 :param dst: Destination IPv4 address
280 :param sport: Source port
281 :param dport: Destination port
282 :param data: Payload data
283 :param proto: protocol (TCP, UDP, ICMP)
284 :param echo_reply: use echo_reply if protocol is ICMP
287 if proto == IP_PROTOS.tcp:
288 p = (IP(src=src_if.remote_ip4, dst=dst) /
289 TCP(sport=sport, dport=dport) /
291 p = p.__class__(scapy.compat.raw(p))
292 chksum = p[TCP].chksum
293 proto_header = TCP(sport=sport, dport=dport, chksum=chksum)
294 elif proto == IP_PROTOS.udp:
295 proto_header = UDP(sport=sport, dport=dport)
296 elif proto == IP_PROTOS.icmp:
298 proto_header = ICMP(id=sport, type='echo-request')
300 proto_header = ICMP(id=sport, type='echo-reply')
302 raise Exception("Unsupported protocol")
303 id = random.randint(0, 65535)
305 if proto == IP_PROTOS.tcp:
308 raw = Raw(data[0:16])
309 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
310 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id) /
314 if proto == IP_PROTOS.tcp:
315 raw = Raw(data[4:20])
317 raw = Raw(data[16:32])
318 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
319 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id,
323 if proto == IP_PROTOS.tcp:
327 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
328 IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=proto,
334 def create_stream_frag_ip6(self, src_if, dst, sport, dport, data,
335 pref=None, plen=0, frag_size=128):
337 Create fragmented packet stream
339 :param src_if: Source interface
340 :param dst: Destination IPv4 address
341 :param sport: Source TCP port
342 :param dport: Destination TCP port
343 :param data: Payload data
344 :param pref: NAT64 prefix
345 :param plen: NAT64 prefix length
346 :param fragsize: size of fragments
350 dst_ip6 = ''.join(['64:ff9b::', dst])
352 dst_ip6 = self.compose_ip6(dst, pref, plen)
354 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
355 IPv6(src=src_if.remote_ip6, dst=dst_ip6) /
356 IPv6ExtHdrFragment(id=random.randint(0, 65535)) /
357 TCP(sport=sport, dport=dport) /
360 return fragment6(p, frag_size)
362 def reass_frags_and_verify(self, frags, src, dst):
364 Reassemble and verify fragmented packet
366 :param frags: Captured fragments
367 :param src: Source IPv4 address to verify
368 :param dst: Destination IPv4 address to verify
370 :returns: Reassembled IPv4 packet
374 self.assertEqual(p[IP].src, src)
375 self.assertEqual(p[IP].dst, dst)
376 self.assert_ip_checksum_valid(p)
377 buffer.seek(p[IP].frag * 8)
378 buffer.write(bytes(p[IP].payload))
379 ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst,
380 proto=frags[0][IP].proto)
381 if ip.proto == IP_PROTOS.tcp:
382 p = (ip / TCP(buffer.getvalue()))
383 self.logger.debug(ppp("Reassembled:", p))
384 self.assert_tcp_checksum_valid(p)
385 elif ip.proto == IP_PROTOS.udp:
386 p = (ip / UDP(buffer.getvalue()[:8]) /
387 Raw(buffer.getvalue()[8:]))
388 elif ip.proto == IP_PROTOS.icmp:
389 p = (ip / ICMP(buffer.getvalue()))
392 def reass_frags_and_verify_ip6(self, frags, src, dst):
394 Reassemble and verify fragmented packet
396 :param frags: Captured fragments
397 :param src: Source IPv6 address to verify
398 :param dst: Destination IPv6 address to verify
400 :returns: Reassembled IPv6 packet
404 self.assertEqual(p[IPv6].src, src)
405 self.assertEqual(p[IPv6].dst, dst)
406 buffer.seek(p[IPv6ExtHdrFragment].offset * 8)
407 buffer.write(bytes(p[IPv6ExtHdrFragment].payload))
408 ip = IPv6(src=frags[0][IPv6].src, dst=frags[0][IPv6].dst,
409 nh=frags[0][IPv6ExtHdrFragment].nh)
410 if ip.nh == IP_PROTOS.tcp:
411 p = (ip / TCP(buffer.getvalue()))
412 elif ip.nh == IP_PROTOS.udp:
413 p = (ip / UDP(buffer.getvalue()))
414 self.logger.debug(ppp("Reassembled:", p))
415 self.assert_packet_checksums_valid(p)
418 def verify_ipfix_max_bibs(self, data, limit):
420 Verify IPFIX maximum BIB entries exceeded event
422 :param data: Decoded IPFIX data records
423 :param limit: Number of maximum BIB entries that can be created.
425 self.assertEqual(1, len(data))
428 self.assertEqual(scapy.compat.orb(record[230]), 13)
429 # natQuotaExceededEvent
430 self.assertEqual(struct.pack("I", 2), record[466])
432 self.assertEqual(struct.pack("I", limit), record[472])
434 def verify_ipfix_bib(self, data, is_create, src_addr):
436 Verify IPFIX NAT64 BIB create and delete events
438 :param data: Decoded IPFIX data records
439 :param is_create: Create event if nonzero value otherwise delete event
440 :param src_addr: IPv6 source address
442 self.assertEqual(1, len(data))
446 self.assertEqual(scapy.compat.orb(record[230]), 10)
448 self.assertEqual(scapy.compat.orb(record[230]), 11)
450 self.assertEqual(src_addr, str(ipaddress.IPv6Address(record[27])))
451 # postNATSourceIPv4Address
452 self.assertEqual(self.nat_addr_n, record[225])
454 self.assertEqual(IP_PROTOS.tcp, scapy.compat.orb(record[4]))
456 self.assertEqual(struct.pack("!I", 0), record[234])
457 # sourceTransportPort
458 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
459 # postNAPTSourceTransportPort
460 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
462 def verify_ipfix_nat64_ses(self, data, is_create, src_addr, dst_addr,
465 Verify IPFIX NAT64 session create and delete events
467 :param data: Decoded IPFIX data records
468 :param is_create: Create event if nonzero value otherwise delete event
469 :param src_addr: IPv6 source address
470 :param dst_addr: IPv4 destination address
471 :param dst_port: destination TCP port
473 self.assertEqual(1, len(data))
477 self.assertEqual(scapy.compat.orb(record[230]), 6)
479 self.assertEqual(scapy.compat.orb(record[230]), 7)
481 self.assertEqual(src_addr, str(ipaddress.IPv6Address(record[27])))
482 # destinationIPv6Address
483 self.assertEqual(socket.inet_pton(socket.AF_INET6,
484 self.compose_ip6(dst_addr,
488 # postNATSourceIPv4Address
489 self.assertEqual(self.nat_addr_n, record[225])
490 # postNATDestinationIPv4Address
491 self.assertEqual(socket.inet_pton(socket.AF_INET, dst_addr),
494 self.assertEqual(IP_PROTOS.tcp, scapy.compat.orb(record[4]))
496 self.assertEqual(struct.pack("!I", 0), record[234])
497 # sourceTransportPort
498 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
499 # postNAPTSourceTransportPort
500 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
501 # destinationTransportPort
502 self.assertEqual(struct.pack("!H", dst_port), record[11])
503 # postNAPTDestinationTransportPort
504 self.assertEqual(struct.pack("!H", dst_port), record[228])
506 def verify_syslog_sess(self, data, is_add=True, is_ip6=False):
507 message = data.decode('utf-8')
509 message = SyslogMessage.parse(message)
510 except ParseError as e:
514 self.assertEqual(message.severity, SyslogSeverity.info)
515 self.assertEqual(message.appname, 'NAT')
516 self.assertEqual(message.msgid, 'SADD' if is_add else 'SDEL')
517 sd_params = message.sd.get('nsess')
518 self.assertTrue(sd_params is not None)
520 self.assertEqual(sd_params.get('IATYP'), 'IPv6')
521 self.assertEqual(sd_params.get('ISADDR'), self.pg0.remote_ip6)
523 self.assertEqual(sd_params.get('IATYP'), 'IPv4')
524 self.assertEqual(sd_params.get('ISADDR'), self.pg0.remote_ip4)
525 self.assertTrue(sd_params.get('SSUBIX') is not None)
526 self.assertEqual(sd_params.get('ISPORT'), "%d" % self.tcp_port_in)
527 self.assertEqual(sd_params.get('XATYP'), 'IPv4')
528 self.assertEqual(sd_params.get('XSADDR'), self.nat_addr)
529 self.assertEqual(sd_params.get('XSPORT'), "%d" % self.tcp_port_out)
530 self.assertEqual(sd_params.get('PROTO'), "%d" % IP_PROTOS.tcp)
531 self.assertEqual(sd_params.get('SVLAN'), '0')
532 self.assertEqual(sd_params.get('XDADDR'), self.pg1.remote_ip4)
533 self.assertEqual(sd_params.get('XDPORT'),
534 "%d" % self.tcp_external_port)
536 def compose_ip6(self, ip4, pref, plen):
538 Compose IPv4-embedded IPv6 addresses
540 :param ip4: IPv4 address
541 :param pref: IPv6 prefix
542 :param plen: IPv6 prefix length
543 :returns: IPv4-embedded IPv6 addresses
545 pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
546 ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
561 pref_n[10] = ip4_n[3]
565 pref_n[10] = ip4_n[2]
566 pref_n[11] = ip4_n[3]
569 pref_n[10] = ip4_n[1]
570 pref_n[11] = ip4_n[2]
571 pref_n[12] = ip4_n[3]
573 pref_n[12] = ip4_n[0]
574 pref_n[13] = ip4_n[1]
575 pref_n[14] = ip4_n[2]
576 pref_n[15] = ip4_n[3]
577 packed_pref_n = b''.join([scapy.compat.chb(x) for x in pref_n])
578 return socket.inet_ntop(socket.AF_INET6, packed_pref_n)
580 def verify_ipfix_max_sessions(self, data, limit):
582 Verify IPFIX maximum session entries exceeded event
584 :param data: Decoded IPFIX data records
585 :param limit: Number of maximum session entries that can be created.
587 self.assertEqual(1, len(data))
590 self.assertEqual(scapy.compat.orb(record[230]), 13)
591 # natQuotaExceededEvent
592 self.assertEqual(struct.pack("I", 1), record[466])
594 self.assertEqual(struct.pack("I", limit), record[471])
596 def test_nat64_inside_interface_handles_neighbor_advertisement(self):
597 """ NAT64 inside interface handles Neighbor Advertisement """
599 flags = self.config_flags.NAT_IS_INSIDE
600 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
601 sw_if_index=self.pg5.sw_if_index)
604 ping = (Ether(dst=self.pg5.local_mac, src=self.pg5.remote_mac) /
605 IPv6(src=self.pg5.remote_ip6, dst=self.pg5.local_ip6) /
608 self.pg5.add_stream(pkts)
609 self.pg_enable_capture(self.pg_interfaces)
612 # Wait for Neighbor Solicitation
613 capture = self.pg5.get_capture(len(pkts))
616 self.assertEqual(packet[IPv6].src, self.pg5.local_ip6)
617 self.assertEqual(packet.haslayer(ICMPv6ND_NS), 1)
618 tgt = packet[ICMPv6ND_NS].tgt
620 self.logger.error(ppp("Unexpected or invalid packet:", packet))
623 # Send Neighbor Advertisement
624 p = (Ether(dst=self.pg5.local_mac, src=self.pg5.remote_mac) /
625 IPv6(src=self.pg5.remote_ip6, dst=self.pg5.local_ip6) /
626 ICMPv6ND_NA(tgt=tgt) /
627 ICMPv6NDOptDstLLAddr(lladdr=self.pg5.remote_mac))
629 self.pg5.add_stream(pkts)
630 self.pg_enable_capture(self.pg_interfaces)
633 # Try to send ping again
635 self.pg5.add_stream(pkts)
636 self.pg_enable_capture(self.pg_interfaces)
639 # Wait for ping reply
640 capture = self.pg5.get_capture(len(pkts))
643 self.assertEqual(packet[IPv6].src, self.pg5.local_ip6)
644 self.assertEqual(packet[IPv6].dst, self.pg5.remote_ip6)
645 self.assertEqual(packet.haslayer(ICMPv6EchoReply), 1)
647 self.logger.error(ppp("Unexpected or invalid packet:", packet))
651 """ Add/delete address to NAT64 pool """
654 self.vapi.nat64_add_del_pool_addr_range(start_addr=nat_addr,
656 vrf_id=0xFFFFFFFF, is_add=1)
658 addresses = self.vapi.nat64_pool_addr_dump()
659 self.assertEqual(len(addresses), 1)
660 self.assertEqual(str(addresses[0].address), nat_addr)
662 self.vapi.nat64_add_del_pool_addr_range(start_addr=nat_addr,
664 vrf_id=0xFFFFFFFF, is_add=0)
666 addresses = self.vapi.nat64_pool_addr_dump()
667 self.assertEqual(len(addresses), 0)
669 def test_interface(self):
670 """ Enable/disable NAT64 feature on the interface """
671 flags = self.config_flags.NAT_IS_INSIDE
672 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
673 sw_if_index=self.pg0.sw_if_index)
674 self.vapi.nat64_add_del_interface(is_add=1, flags=0,
675 sw_if_index=self.pg1.sw_if_index)
677 interfaces = self.vapi.nat64_interface_dump()
678 self.assertEqual(len(interfaces), 2)
681 for intf in interfaces:
682 if intf.sw_if_index == self.pg0.sw_if_index:
683 self.assertEqual(intf.flags, self.config_flags.NAT_IS_INSIDE)
685 elif intf.sw_if_index == self.pg1.sw_if_index:
686 self.assertEqual(intf.flags, self.config_flags.NAT_IS_OUTSIDE)
688 self.assertTrue(pg0_found)
689 self.assertTrue(pg1_found)
691 features = self.vapi.cli("show interface features pg0")
692 self.assertIn('nat64-in2out', features)
693 features = self.vapi.cli("show interface features pg1")
694 self.assertIn('nat64-out2in', features)
696 self.vapi.nat64_add_del_interface(is_add=0, flags=flags,
697 sw_if_index=self.pg0.sw_if_index)
698 self.vapi.nat64_add_del_interface(is_add=0, flags=flags,
699 sw_if_index=self.pg1.sw_if_index)
701 interfaces = self.vapi.nat64_interface_dump()
702 self.assertEqual(len(interfaces), 0)
704 def test_static_bib(self):
705 """ Add/delete static BIB entry """
706 in_addr = '2001:db8:85a3::8a2e:370:7334'
707 out_addr = '10.1.1.3'
710 proto = IP_PROTOS.tcp
712 self.vapi.nat64_add_del_static_bib(i_addr=in_addr, o_addr=out_addr,
713 i_port=in_port, o_port=out_port,
714 proto=proto, vrf_id=0, is_add=1)
715 bib = self.vapi.nat64_bib_dump(proto=IP_PROTOS.tcp)
718 if bibe.flags & self.config_flags.NAT_IS_STATIC:
720 self.assertEqual(str(bibe.i_addr), in_addr)
721 self.assertEqual(str(bibe.o_addr), out_addr)
722 self.assertEqual(bibe.i_port, in_port)
723 self.assertEqual(bibe.o_port, out_port)
724 self.assertEqual(static_bib_num, 1)
725 bibs = self.statistics.get_counter('/nat64/total-bibs')
726 self.assertEqual(bibs[0][0], 1)
728 self.vapi.nat64_add_del_static_bib(i_addr=in_addr, o_addr=out_addr,
729 i_port=in_port, o_port=out_port,
730 proto=proto, vrf_id=0, is_add=0)
731 bib = self.vapi.nat64_bib_dump(proto=IP_PROTOS.tcp)
734 if bibe.flags & self.config_flags.NAT_IS_STATIC:
736 self.assertEqual(static_bib_num, 0)
737 bibs = self.statistics.get_counter('/nat64/total-bibs')
738 self.assertEqual(bibs[0][0], 0)
740 def test_set_timeouts(self):
741 """ Set NAT64 timeouts """
742 # verify default values
743 timeouts = self.vapi.nat64_get_timeouts()
744 self.assertEqual(timeouts.udp, 300)
745 self.assertEqual(timeouts.icmp, 60)
746 self.assertEqual(timeouts.tcp_transitory, 240)
747 self.assertEqual(timeouts.tcp_established, 7440)
749 # set and verify custom values
750 self.vapi.nat64_set_timeouts(udp=200, tcp_established=7450,
751 tcp_transitory=250, icmp=30)
752 timeouts = self.vapi.nat64_get_timeouts()
753 self.assertEqual(timeouts.udp, 200)
754 self.assertEqual(timeouts.icmp, 30)
755 self.assertEqual(timeouts.tcp_transitory, 250)
756 self.assertEqual(timeouts.tcp_established, 7450)
758 def test_dynamic(self):
759 """ NAT64 dynamic translation test """
760 self.tcp_port_in = 6303
761 self.udp_port_in = 6304
762 self.icmp_id_in = 6305
764 ses_num_start = self.nat64_get_ses_num()
766 self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
767 end_addr=self.nat_addr,
770 flags = self.config_flags.NAT_IS_INSIDE
771 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
772 sw_if_index=self.pg0.sw_if_index)
773 self.vapi.nat64_add_del_interface(is_add=1, flags=0,
774 sw_if_index=self.pg1.sw_if_index)
777 tcpn = self.statistics.get_counter('/nat64/in2out/tcp')[0]
778 udpn = self.statistics.get_counter('/nat64/in2out/udp')[0]
779 icmpn = self.statistics.get_counter('/nat64/in2out/icmp')[0]
780 drops = self.statistics.get_counter('/nat64/in2out/drops')[0]
782 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
783 self.pg0.add_stream(pkts)
784 self.pg_enable_capture(self.pg_interfaces)
786 capture = self.pg1.get_capture(len(pkts))
787 self.verify_capture_out(capture, nat_ip=self.nat_addr,
788 dst_ip=self.pg1.remote_ip4)
790 if_idx = self.pg0.sw_if_index
791 cnt = self.statistics.get_counter('/nat64/in2out/tcp')[0]
792 self.assertEqual(cnt[if_idx] - tcpn[if_idx], 1)
793 cnt = self.statistics.get_counter('/nat64/in2out/udp')[0]
794 self.assertEqual(cnt[if_idx] - udpn[if_idx], 1)
795 cnt = self.statistics.get_counter('/nat64/in2out/icmp')[0]
796 self.assertEqual(cnt[if_idx] - icmpn[if_idx], 1)
797 cnt = self.statistics.get_counter('/nat64/in2out/drops')[0]
798 self.assertEqual(cnt[if_idx] - drops[if_idx], 0)
801 tcpn = self.statistics.get_counter('/nat64/out2in/tcp')[0]
802 udpn = self.statistics.get_counter('/nat64/out2in/udp')[0]
803 icmpn = self.statistics.get_counter('/nat64/out2in/icmp')[0]
804 drops = self.statistics.get_counter('/nat64/out2in/drops')[0]
806 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
807 self.pg1.add_stream(pkts)
808 self.pg_enable_capture(self.pg_interfaces)
810 capture = self.pg0.get_capture(len(pkts))
811 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
812 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
814 if_idx = self.pg1.sw_if_index
815 cnt = self.statistics.get_counter('/nat64/out2in/tcp')[0]
816 self.assertEqual(cnt[if_idx] - tcpn[if_idx], 2)
817 cnt = self.statistics.get_counter('/nat64/out2in/udp')[0]
818 self.assertEqual(cnt[if_idx] - udpn[if_idx], 1)
819 cnt = self.statistics.get_counter('/nat64/out2in/icmp')[0]
820 self.assertEqual(cnt[if_idx] - icmpn[if_idx], 1)
821 cnt = self.statistics.get_counter('/nat64/out2in/drops')[0]
822 self.assertEqual(cnt[if_idx] - drops[if_idx], 0)
824 bibs = self.statistics.get_counter('/nat64/total-bibs')
825 self.assertEqual(bibs[0][0], 3)
826 sessions = self.statistics.get_counter('/nat64/total-sessions')
827 self.assertEqual(sessions[0][0], 3)
830 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
831 self.pg0.add_stream(pkts)
832 self.pg_enable_capture(self.pg_interfaces)
834 capture = self.pg1.get_capture(len(pkts))
835 self.verify_capture_out(capture, nat_ip=self.nat_addr,
836 dst_ip=self.pg1.remote_ip4)
839 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
840 self.pg1.add_stream(pkts)
841 self.pg_enable_capture(self.pg_interfaces)
843 capture = self.pg0.get_capture(len(pkts))
844 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
846 ses_num_end = self.nat64_get_ses_num()
848 self.assertEqual(ses_num_end - ses_num_start, 3)
850 # tenant with specific VRF
851 self.vapi.nat64_add_del_pool_addr_range(start_addr=self.vrf1_nat_addr,
852 end_addr=self.vrf1_nat_addr,
853 vrf_id=self.vrf1_id, is_add=1)
854 flags = self.config_flags.NAT_IS_INSIDE
855 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
856 sw_if_index=self.pg2.sw_if_index)
858 pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
859 self.pg2.add_stream(pkts)
860 self.pg_enable_capture(self.pg_interfaces)
862 capture = self.pg1.get_capture(len(pkts))
863 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
864 dst_ip=self.pg1.remote_ip4)
866 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
867 self.pg1.add_stream(pkts)
868 self.pg_enable_capture(self.pg_interfaces)
870 capture = self.pg2.get_capture(len(pkts))
871 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
873 def test_static(self):
874 """ NAT64 static translation test """
875 self.tcp_port_in = 60303
876 self.udp_port_in = 60304
877 self.icmp_id_in = 60305
878 self.tcp_port_out = 60303
879 self.udp_port_out = 60304
880 self.icmp_id_out = 60305
882 ses_num_start = self.nat64_get_ses_num()
884 self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
885 end_addr=self.nat_addr,
888 flags = self.config_flags.NAT_IS_INSIDE
889 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
890 sw_if_index=self.pg0.sw_if_index)
891 self.vapi.nat64_add_del_interface(is_add=1, flags=0,
892 sw_if_index=self.pg1.sw_if_index)
894 self.vapi.nat64_add_del_static_bib(i_addr=self.pg0.remote_ip6,
895 o_addr=self.nat_addr,
896 i_port=self.tcp_port_in,
897 o_port=self.tcp_port_out,
898 proto=IP_PROTOS.tcp, vrf_id=0,
900 self.vapi.nat64_add_del_static_bib(i_addr=self.pg0.remote_ip6,
901 o_addr=self.nat_addr,
902 i_port=self.udp_port_in,
903 o_port=self.udp_port_out,
904 proto=IP_PROTOS.udp, vrf_id=0,
906 self.vapi.nat64_add_del_static_bib(i_addr=self.pg0.remote_ip6,
907 o_addr=self.nat_addr,
908 i_port=self.icmp_id_in,
909 o_port=self.icmp_id_out,
910 proto=IP_PROTOS.icmp, vrf_id=0,
914 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
915 self.pg0.add_stream(pkts)
916 self.pg_enable_capture(self.pg_interfaces)
918 capture = self.pg1.get_capture(len(pkts))
919 self.verify_capture_out(capture, nat_ip=self.nat_addr,
920 dst_ip=self.pg1.remote_ip4, same_port=True)
923 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
924 self.pg1.add_stream(pkts)
925 self.pg_enable_capture(self.pg_interfaces)
927 capture = self.pg0.get_capture(len(pkts))
928 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
929 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
931 ses_num_end = self.nat64_get_ses_num()
933 self.assertEqual(ses_num_end - ses_num_start, 3)
935 @unittest.skipUnless(running_extended_tests, "part of extended tests")
936 def test_session_timeout(self):
937 """ NAT64 session timeout """
938 self.icmp_id_in = 1234
939 self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
940 end_addr=self.nat_addr,
943 flags = self.config_flags.NAT_IS_INSIDE
944 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
945 sw_if_index=self.pg0.sw_if_index)
946 self.vapi.nat64_add_del_interface(is_add=1, flags=0,
947 sw_if_index=self.pg1.sw_if_index)
948 self.vapi.nat64_set_timeouts(udp=300, tcp_established=5,
952 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
953 self.pg0.add_stream(pkts)
954 self.pg_enable_capture(self.pg_interfaces)
956 capture = self.pg1.get_capture(len(pkts))
958 ses_num_before_timeout = self.nat64_get_ses_num()
962 # ICMP and TCP session after timeout
963 ses_num_after_timeout = self.nat64_get_ses_num()
964 self.assertEqual(ses_num_before_timeout - ses_num_after_timeout, 2)
966 def test_icmp_error(self):
967 """ NAT64 ICMP Error message translation """
968 self.tcp_port_in = 6303
969 self.udp_port_in = 6304
970 self.icmp_id_in = 6305
972 self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
973 end_addr=self.nat_addr,
976 flags = self.config_flags.NAT_IS_INSIDE
977 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
978 sw_if_index=self.pg0.sw_if_index)
979 self.vapi.nat64_add_del_interface(is_add=1, flags=0,
980 sw_if_index=self.pg1.sw_if_index)
982 # send some packets to create sessions
983 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
984 self.pg0.add_stream(pkts)
985 self.pg_enable_capture(self.pg_interfaces)
987 capture_ip4 = self.pg1.get_capture(len(pkts))
988 self.verify_capture_out(capture_ip4,
989 nat_ip=self.nat_addr,
990 dst_ip=self.pg1.remote_ip4)
992 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
993 self.pg1.add_stream(pkts)
994 self.pg_enable_capture(self.pg_interfaces)
996 capture_ip6 = self.pg0.get_capture(len(pkts))
997 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
998 self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
1002 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1003 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
1004 ICMPv6DestUnreach(code=1) /
1005 packet[IPv6] for packet in capture_ip6]
1006 self.pg0.add_stream(pkts)
1007 self.pg_enable_capture(self.pg_interfaces)
1009 capture = self.pg1.get_capture(len(pkts))
1010 for packet in capture:
1012 self.assertEqual(packet[IP].src, self.nat_addr)
1013 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1014 self.assertEqual(packet[ICMP].type, 3)
1015 self.assertEqual(packet[ICMP].code, 13)
1016 inner = packet[IPerror]
1017 self.assertEqual(inner.src, self.pg1.remote_ip4)
1018 self.assertEqual(inner.dst, self.nat_addr)
1019 self.assert_packet_checksums_valid(packet)
1020 if inner.haslayer(TCPerror):
1021 self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
1022 elif inner.haslayer(UDPerror):
1023 self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
1025 self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
1027 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1031 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1032 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1033 ICMP(type=3, code=13) /
1034 packet[IP] for packet in capture_ip4]
1035 self.pg1.add_stream(pkts)
1036 self.pg_enable_capture(self.pg_interfaces)
1038 capture = self.pg0.get_capture(len(pkts))
1039 for packet in capture:
1041 self.assertEqual(packet[IPv6].src, ip.src)
1042 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
1043 icmp = packet[ICMPv6DestUnreach]
1044 self.assertEqual(icmp.code, 1)
1045 inner = icmp[IPerror6]
1046 self.assertEqual(inner.src, self.pg0.remote_ip6)
1047 self.assertEqual(inner.dst, ip.src)
1048 self.assert_icmpv6_checksum_valid(packet)
1049 if inner.haslayer(TCPerror):
1050 self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
1051 elif inner.haslayer(UDPerror):
1052 self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
1054 self.assertEqual(inner[ICMPv6EchoRequest].id,
1057 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1060 def test_hairpinning(self):
1061 """ NAT64 hairpinning """
1063 client = self.pg0.remote_hosts[0]
1064 server = self.pg0.remote_hosts[1]
1065 server_tcp_in_port = 22
1066 server_tcp_out_port = 4022
1067 server_udp_in_port = 23
1068 server_udp_out_port = 4023
1069 client_tcp_in_port = 1234
1070 client_udp_in_port = 1235
1071 client_tcp_out_port = 0
1072 client_udp_out_port = 0
1073 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
1074 nat_addr_ip6 = ip.src
1076 self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1077 end_addr=self.nat_addr,
1080 flags = self.config_flags.NAT_IS_INSIDE
1081 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1082 sw_if_index=self.pg0.sw_if_index)
1083 self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1084 sw_if_index=self.pg1.sw_if_index)
1086 self.vapi.nat64_add_del_static_bib(i_addr=server.ip6n,
1087 o_addr=self.nat_addr,
1088 i_port=server_tcp_in_port,
1089 o_port=server_tcp_out_port,
1090 proto=IP_PROTOS.tcp, vrf_id=0,
1092 self.vapi.nat64_add_del_static_bib(i_addr=server.ip6n,
1093 o_addr=self.nat_addr,
1094 i_port=server_udp_in_port,
1095 o_port=server_udp_out_port,
1096 proto=IP_PROTOS.udp, vrf_id=0,
1101 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1102 IPv6(src=client.ip6, dst=nat_addr_ip6) /
1103 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
1105 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1106 IPv6(src=client.ip6, dst=nat_addr_ip6) /
1107 UDP(sport=client_udp_in_port, dport=server_udp_out_port))
1109 self.pg0.add_stream(pkts)
1110 self.pg_enable_capture(self.pg_interfaces)
1112 capture = self.pg0.get_capture(len(pkts))
1113 for packet in capture:
1115 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
1116 self.assertEqual(packet[IPv6].dst, server.ip6)
1117 self.assert_packet_checksums_valid(packet)
1118 if packet.haslayer(TCP):
1119 self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
1120 self.assertEqual(packet[TCP].dport, server_tcp_in_port)
1121 client_tcp_out_port = packet[TCP].sport
1123 self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
1124 self.assertEqual(packet[UDP].dport, server_udp_in_port)
1125 client_udp_out_port = packet[UDP].sport
1127 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1132 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1133 IPv6(src=server.ip6, dst=nat_addr_ip6) /
1134 TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
1136 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1137 IPv6(src=server.ip6, dst=nat_addr_ip6) /
1138 UDP(sport=server_udp_in_port, dport=client_udp_out_port))
1140 self.pg0.add_stream(pkts)
1141 self.pg_enable_capture(self.pg_interfaces)
1143 capture = self.pg0.get_capture(len(pkts))
1144 for packet in capture:
1146 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
1147 self.assertEqual(packet[IPv6].dst, client.ip6)
1148 self.assert_packet_checksums_valid(packet)
1149 if packet.haslayer(TCP):
1150 self.assertEqual(packet[TCP].sport, server_tcp_out_port)
1151 self.assertEqual(packet[TCP].dport, client_tcp_in_port)
1153 self.assertEqual(packet[UDP].sport, server_udp_out_port)
1154 self.assertEqual(packet[UDP].dport, client_udp_in_port)
1156 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1161 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1162 IPv6(src=client.ip6, dst=nat_addr_ip6) /
1163 ICMPv6DestUnreach(code=1) /
1164 packet[IPv6] for packet in capture]
1165 self.pg0.add_stream(pkts)
1166 self.pg_enable_capture(self.pg_interfaces)
1168 capture = self.pg0.get_capture(len(pkts))
1169 for packet in capture:
1171 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
1172 self.assertEqual(packet[IPv6].dst, server.ip6)
1173 icmp = packet[ICMPv6DestUnreach]
1174 self.assertEqual(icmp.code, 1)
1175 inner = icmp[IPerror6]
1176 self.assertEqual(inner.src, server.ip6)
1177 self.assertEqual(inner.dst, nat_addr_ip6)
1178 self.assert_packet_checksums_valid(packet)
1179 if inner.haslayer(TCPerror):
1180 self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
1181 self.assertEqual(inner[TCPerror].dport,
1182 client_tcp_out_port)
1184 self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
1185 self.assertEqual(inner[UDPerror].dport,
1186 client_udp_out_port)
1188 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1191 def test_prefix(self):
1192 """ NAT64 Network-Specific Prefix """
1194 self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1195 end_addr=self.nat_addr,
1198 flags = self.config_flags.NAT_IS_INSIDE
1199 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1200 sw_if_index=self.pg0.sw_if_index)
1201 self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1202 sw_if_index=self.pg1.sw_if_index)
1203 self.vapi.nat64_add_del_pool_addr_range(start_addr=self.vrf1_nat_addr,
1204 end_addr=self.vrf1_nat_addr,
1205 vrf_id=self.vrf1_id, is_add=1)
1206 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1207 sw_if_index=self.pg2.sw_if_index)
1210 global_pref64 = "2001:db8::"
1211 global_pref64_len = 32
1212 global_pref64_str = "{}/{}".format(global_pref64, global_pref64_len)
1213 self.vapi.nat64_add_del_prefix(prefix=global_pref64_str, vrf_id=0,
1216 prefix = self.vapi.nat64_prefix_dump()
1217 self.assertEqual(len(prefix), 1)
1218 self.assertEqual(str(prefix[0].prefix), global_pref64_str)
1219 self.assertEqual(prefix[0].vrf_id, 0)
1221 # Add tenant specific prefix
1222 vrf1_pref64 = "2001:db8:122:300::"
1223 vrf1_pref64_len = 56
1224 vrf1_pref64_str = "{}/{}".format(vrf1_pref64, vrf1_pref64_len)
1225 self.vapi.nat64_add_del_prefix(prefix=vrf1_pref64_str,
1226 vrf_id=self.vrf1_id, is_add=1)
1228 prefix = self.vapi.nat64_prefix_dump()
1229 self.assertEqual(len(prefix), 2)
1232 pkts = self.create_stream_in_ip6(self.pg0,
1235 plen=global_pref64_len)
1236 self.pg0.add_stream(pkts)
1237 self.pg_enable_capture(self.pg_interfaces)
1239 capture = self.pg1.get_capture(len(pkts))
1240 self.verify_capture_out(capture, nat_ip=self.nat_addr,
1241 dst_ip=self.pg1.remote_ip4)
1243 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
1244 self.pg1.add_stream(pkts)
1245 self.pg_enable_capture(self.pg_interfaces)
1247 capture = self.pg0.get_capture(len(pkts))
1248 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
1251 self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
1253 # Tenant specific prefix
1254 pkts = self.create_stream_in_ip6(self.pg2,
1257 plen=vrf1_pref64_len)
1258 self.pg2.add_stream(pkts)
1259 self.pg_enable_capture(self.pg_interfaces)
1261 capture = self.pg1.get_capture(len(pkts))
1262 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
1263 dst_ip=self.pg1.remote_ip4)
1265 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
1266 self.pg1.add_stream(pkts)
1267 self.pg_enable_capture(self.pg_interfaces)
1269 capture = self.pg2.get_capture(len(pkts))
1270 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
1273 self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
1275 def test_unknown_proto(self):
1276 """ NAT64 translate packet with unknown protocol """
1278 self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1279 end_addr=self.nat_addr,
1282 flags = self.config_flags.NAT_IS_INSIDE
1283 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1284 sw_if_index=self.pg0.sw_if_index)
1285 self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1286 sw_if_index=self.pg1.sw_if_index)
1287 remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
1290 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1291 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
1292 TCP(sport=self.tcp_port_in, dport=20))
1293 self.pg0.add_stream(p)
1294 self.pg_enable_capture(self.pg_interfaces)
1296 p = self.pg1.get_capture(1)
1298 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1299 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47) /
1301 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
1302 TCP(sport=1234, dport=1234))
1303 self.pg0.add_stream(p)
1304 self.pg_enable_capture(self.pg_interfaces)
1306 p = self.pg1.get_capture(1)
1309 self.assertEqual(packet[IP].src, self.nat_addr)
1310 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1311 self.assertEqual(packet.haslayer(GRE), 1)
1312 self.assert_packet_checksums_valid(packet)
1314 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1318 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1319 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1321 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
1322 TCP(sport=1234, dport=1234))
1323 self.pg1.add_stream(p)
1324 self.pg_enable_capture(self.pg_interfaces)
1326 p = self.pg0.get_capture(1)
1329 self.assertEqual(packet[IPv6].src, remote_ip6)
1330 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
1331 self.assertEqual(packet[IPv6].nh, 47)
1333 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1336 def test_hairpinning_unknown_proto(self):
1337 """ NAT64 translate packet with unknown protocol - hairpinning """
1339 client = self.pg0.remote_hosts[0]
1340 server = self.pg0.remote_hosts[1]
1341 server_tcp_in_port = 22
1342 server_tcp_out_port = 4022
1343 client_tcp_in_port = 1234
1344 client_tcp_out_port = 1235
1345 server_nat_ip = "10.0.0.100"
1346 client_nat_ip = "10.0.0.110"
1347 server_nat_ip6 = self.compose_ip6(server_nat_ip, '64:ff9b::', 96)
1348 client_nat_ip6 = self.compose_ip6(client_nat_ip, '64:ff9b::', 96)
1350 self.vapi.nat64_add_del_pool_addr_range(start_addr=server_nat_ip,
1351 end_addr=client_nat_ip,
1354 flags = self.config_flags.NAT_IS_INSIDE
1355 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1356 sw_if_index=self.pg0.sw_if_index)
1357 self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1358 sw_if_index=self.pg1.sw_if_index)
1360 self.vapi.nat64_add_del_static_bib(i_addr=server.ip6n,
1361 o_addr=server_nat_ip,
1362 i_port=server_tcp_in_port,
1363 o_port=server_tcp_out_port,
1364 proto=IP_PROTOS.tcp, vrf_id=0,
1367 self.vapi.nat64_add_del_static_bib(i_addr=server.ip6n,
1368 o_addr=server_nat_ip, i_port=0,
1370 proto=IP_PROTOS.gre, vrf_id=0,
1373 self.vapi.nat64_add_del_static_bib(i_addr=client.ip6n,
1374 o_addr=client_nat_ip,
1375 i_port=client_tcp_in_port,
1376 o_port=client_tcp_out_port,
1377 proto=IP_PROTOS.tcp, vrf_id=0,
1381 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1382 IPv6(src=client.ip6, dst=server_nat_ip6) /
1383 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
1384 self.pg0.add_stream(p)
1385 self.pg_enable_capture(self.pg_interfaces)
1387 p = self.pg0.get_capture(1)
1389 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1390 IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre) /
1392 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
1393 TCP(sport=1234, dport=1234))
1394 self.pg0.add_stream(p)
1395 self.pg_enable_capture(self.pg_interfaces)
1397 p = self.pg0.get_capture(1)
1400 self.assertEqual(packet[IPv6].src, client_nat_ip6)
1401 self.assertEqual(packet[IPv6].dst, server.ip6)
1402 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
1404 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1408 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1409 IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre) /
1411 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
1412 TCP(sport=1234, dport=1234))
1413 self.pg0.add_stream(p)
1414 self.pg_enable_capture(self.pg_interfaces)
1416 p = self.pg0.get_capture(1)
1419 self.assertEqual(packet[IPv6].src, server_nat_ip6)
1420 self.assertEqual(packet[IPv6].dst, client.ip6)
1421 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
1423 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1426 def test_one_armed_nat64(self):
1427 """ One armed NAT64 """
1429 remote_host_ip6 = self.compose_ip6(self.pg3.remote_ip4,
1433 self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1434 end_addr=self.nat_addr,
1437 flags = self.config_flags.NAT_IS_INSIDE
1438 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1439 sw_if_index=self.pg3.sw_if_index)
1440 self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1441 sw_if_index=self.pg3.sw_if_index)
1444 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
1445 IPv6(src=self.pg3.remote_ip6, dst=remote_host_ip6) /
1446 TCP(sport=12345, dport=80))
1447 self.pg3.add_stream(p)
1448 self.pg_enable_capture(self.pg_interfaces)
1450 capture = self.pg3.get_capture(1)
1455 self.assertEqual(ip.src, self.nat_addr)
1456 self.assertEqual(ip.dst, self.pg3.remote_ip4)
1457 self.assertNotEqual(tcp.sport, 12345)
1458 external_port = tcp.sport
1459 self.assertEqual(tcp.dport, 80)
1460 self.assert_packet_checksums_valid(p)
1462 self.logger.error(ppp("Unexpected or invalid packet:", p))
1466 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
1467 IP(src=self.pg3.remote_ip4, dst=self.nat_addr) /
1468 TCP(sport=80, dport=external_port))
1469 self.pg3.add_stream(p)
1470 self.pg_enable_capture(self.pg_interfaces)
1472 capture = self.pg3.get_capture(1)
1477 self.assertEqual(ip.src, remote_host_ip6)
1478 self.assertEqual(ip.dst, self.pg3.remote_ip6)
1479 self.assertEqual(tcp.sport, 80)
1480 self.assertEqual(tcp.dport, 12345)
1481 self.assert_packet_checksums_valid(p)
1483 self.logger.error(ppp("Unexpected or invalid packet:", p))
1486 def test_frag_in_order(self):
1487 """ NAT64 translate fragments arriving in order """
1488 self.tcp_port_in = random.randint(1025, 65535)
1490 self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1491 end_addr=self.nat_addr,
1494 flags = self.config_flags.NAT_IS_INSIDE
1495 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1496 sw_if_index=self.pg0.sw_if_index)
1497 self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1498 sw_if_index=self.pg1.sw_if_index)
1502 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
1503 self.tcp_port_in, 20, data)
1504 self.pg0.add_stream(pkts)
1505 self.pg_enable_capture(self.pg_interfaces)
1507 frags = self.pg1.get_capture(len(pkts))
1508 p = self.reass_frags_and_verify(frags,
1510 self.pg1.remote_ip4)
1511 self.assertEqual(p[TCP].dport, 20)
1512 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
1513 self.tcp_port_out = p[TCP].sport
1514 self.assertEqual(data, p[Raw].load)
1517 data = b"A" * 4 + b"b" * 16 + b"C" * 3
1518 pkts = self.create_stream_frag(self.pg1,
1523 self.pg1.add_stream(pkts)
1524 self.pg_enable_capture(self.pg_interfaces)
1526 frags = self.pg0.get_capture(len(pkts))
1527 self.logger.debug(ppc("Captured:", frags))
1528 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
1529 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
1530 self.assertEqual(p[TCP].sport, 20)
1531 self.assertEqual(p[TCP].dport, self.tcp_port_in)
1532 self.assertEqual(data, p[Raw].load)
1534 def test_reass_hairpinning(self):
1535 """ NAT64 fragments hairpinning """
1537 server = self.pg0.remote_hosts[1]
1538 server_in_port = random.randint(1025, 65535)
1539 server_out_port = random.randint(1025, 65535)
1540 client_in_port = random.randint(1025, 65535)
1541 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
1542 nat_addr_ip6 = ip.src
1544 self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1545 end_addr=self.nat_addr,
1548 flags = self.config_flags.NAT_IS_INSIDE
1549 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1550 sw_if_index=self.pg0.sw_if_index)
1551 self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1552 sw_if_index=self.pg1.sw_if_index)
1554 # add static BIB entry for server
1555 self.vapi.nat64_add_del_static_bib(i_addr=server.ip6n,
1556 o_addr=self.nat_addr,
1557 i_port=server_in_port,
1558 o_port=server_out_port,
1559 proto=IP_PROTOS.tcp, vrf_id=0,
1562 # send packet from host to server
1563 pkts = self.create_stream_frag_ip6(self.pg0,
1568 self.pg0.add_stream(pkts)
1569 self.pg_enable_capture(self.pg_interfaces)
1571 frags = self.pg0.get_capture(len(pkts))
1572 self.logger.debug(ppc("Captured:", frags))
1573 p = self.reass_frags_and_verify_ip6(frags, nat_addr_ip6, server.ip6)
1574 self.assertNotEqual(p[TCP].sport, client_in_port)
1575 self.assertEqual(p[TCP].dport, server_in_port)
1576 self.assertEqual(data, p[Raw].load)
1578 def test_frag_out_of_order(self):
1579 """ NAT64 translate fragments arriving out of order """
1580 self.tcp_port_in = random.randint(1025, 65535)
1582 self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1583 end_addr=self.nat_addr,
1586 flags = self.config_flags.NAT_IS_INSIDE
1587 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1588 sw_if_index=self.pg0.sw_if_index)
1589 self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1590 sw_if_index=self.pg1.sw_if_index)
1594 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
1595 self.tcp_port_in, 20, data)
1597 self.pg0.add_stream(pkts)
1598 self.pg_enable_capture(self.pg_interfaces)
1600 frags = self.pg1.get_capture(len(pkts))
1601 p = self.reass_frags_and_verify(frags,
1603 self.pg1.remote_ip4)
1604 self.assertEqual(p[TCP].dport, 20)
1605 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
1606 self.tcp_port_out = p[TCP].sport
1607 self.assertEqual(data, p[Raw].load)
1610 data = b"A" * 4 + b"B" * 16 + b"C" * 3
1611 pkts = self.create_stream_frag(self.pg1,
1617 self.pg1.add_stream(pkts)
1618 self.pg_enable_capture(self.pg_interfaces)
1620 frags = self.pg0.get_capture(len(pkts))
1621 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
1622 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
1623 self.assertEqual(p[TCP].sport, 20)
1624 self.assertEqual(p[TCP].dport, self.tcp_port_in)
1625 self.assertEqual(data, p[Raw].load)
1627 def test_interface_addr(self):
1628 """ Acquire NAT64 pool addresses from interface """
1629 self.vapi.nat64_add_del_interface_addr(
1631 sw_if_index=self.pg4.sw_if_index)
1633 # no address in NAT64 pool
1634 addresses = self.vapi.nat44_address_dump()
1635 self.assertEqual(0, len(addresses))
1637 # configure interface address and check NAT64 address pool
1638 self.pg4.config_ip4()
1639 addresses = self.vapi.nat64_pool_addr_dump()
1640 self.assertEqual(len(addresses), 1)
1642 self.assertEqual(str(addresses[0].address),
1645 # remove interface address and check NAT64 address pool
1646 self.pg4.unconfig_ip4()
1647 addresses = self.vapi.nat64_pool_addr_dump()
1648 self.assertEqual(0, len(addresses))
1650 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1651 def test_ipfix_max_bibs_sessions(self):
1652 """ IPFIX logging maximum session and BIB entries exceeded """
1655 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
1659 self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1660 end_addr=self.nat_addr,
1663 flags = self.config_flags.NAT_IS_INSIDE
1664 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1665 sw_if_index=self.pg0.sw_if_index)
1666 self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1667 sw_if_index=self.pg1.sw_if_index)
1671 for i in range(0, max_bibs):
1672 src = "fd01:aa::%x" % (i)
1673 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1674 IPv6(src=src, dst=remote_host_ip6) /
1675 TCP(sport=12345, dport=80))
1677 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1678 IPv6(src=src, dst=remote_host_ip6) /
1679 TCP(sport=12345, dport=22))
1681 self.pg0.add_stream(pkts)
1682 self.pg_enable_capture(self.pg_interfaces)
1684 self.pg1.get_capture(max_sessions)
1686 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4,
1687 src_address=self.pg3.local_ip4,
1689 template_interval=10)
1690 self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
1691 src_port=self.ipfix_src_port,
1694 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1695 IPv6(src=src, dst=remote_host_ip6) /
1696 TCP(sport=12345, dport=25))
1697 self.pg0.add_stream(p)
1698 self.pg_enable_capture(self.pg_interfaces)
1700 self.pg1.assert_nothing_captured()
1702 self.vapi.ipfix_flush()
1703 capture = self.pg3.get_capture(7)
1704 ipfix = IPFIXDecoder()
1705 # first load template
1707 self.assertTrue(p.haslayer(IPFIX))
1708 self.assertEqual(p[IP].src, self.pg3.local_ip4)
1709 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1710 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1711 self.assertEqual(p[UDP].dport, 4739)
1712 self.assertEqual(p[IPFIX].observationDomainID,
1713 self.ipfix_domain_id)
1714 if p.haslayer(Template):
1715 ipfix.add_template(p.getlayer(Template))
1716 # verify events in data set
1718 if p.haslayer(Data):
1719 data = ipfix.decode_data_set(p.getlayer(Set))
1720 self.verify_ipfix_max_sessions(data, max_sessions)
1722 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1723 IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
1724 TCP(sport=12345, dport=80))
1725 self.pg0.add_stream(p)
1726 self.pg_enable_capture(self.pg_interfaces)
1728 self.pg1.assert_nothing_captured()
1730 self.vapi.ipfix_flush()
1731 capture = self.pg3.get_capture(1)
1732 # verify events in data set
1734 self.assertTrue(p.haslayer(IPFIX))
1735 self.assertEqual(p[IP].src, self.pg3.local_ip4)
1736 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1737 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1738 self.assertEqual(p[UDP].dport, 4739)
1739 self.assertEqual(p[IPFIX].observationDomainID,
1740 self.ipfix_domain_id)
1741 if p.haslayer(Data):
1742 data = ipfix.decode_data_set(p.getlayer(Set))
1743 self.verify_ipfix_max_bibs(data, max_bibs)
1745 def test_ipfix_bib_ses(self):
1746 """ IPFIX logging NAT64 BIB/session create and delete events """
1747 self.tcp_port_in = random.randint(1025, 65535)
1748 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
1752 self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1753 end_addr=self.nat_addr,
1756 flags = self.config_flags.NAT_IS_INSIDE
1757 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1758 sw_if_index=self.pg0.sw_if_index)
1759 self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1760 sw_if_index=self.pg1.sw_if_index)
1761 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4,
1762 src_address=self.pg3.local_ip4,
1764 template_interval=10)
1765 self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
1766 src_port=self.ipfix_src_port,
1770 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1771 IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
1772 TCP(sport=self.tcp_port_in, dport=25))
1773 self.pg0.add_stream(p)
1774 self.pg_enable_capture(self.pg_interfaces)
1776 p = self.pg1.get_capture(1)
1777 self.tcp_port_out = p[0][TCP].sport
1778 self.vapi.ipfix_flush()
1779 capture = self.pg3.get_capture(8)
1780 ipfix = IPFIXDecoder()
1781 # first load template
1783 self.assertTrue(p.haslayer(IPFIX))
1784 self.assertEqual(p[IP].src, self.pg3.local_ip4)
1785 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1786 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1787 self.assertEqual(p[UDP].dport, 4739)
1788 self.assertEqual(p[IPFIX].observationDomainID,
1789 self.ipfix_domain_id)
1790 if p.haslayer(Template):
1791 ipfix.add_template(p.getlayer(Template))
1792 # verify events in data set
1794 if p.haslayer(Data):
1795 data = ipfix.decode_data_set(p.getlayer(Set))
1796 if scapy.compat.orb(data[0][230]) == 10:
1797 self.verify_ipfix_bib(data, 1, self.pg0.remote_ip6)
1798 elif scapy.compat.orb(data[0][230]) == 6:
1799 self.verify_ipfix_nat64_ses(data,
1801 self.pg0.remote_ip6,
1802 self.pg1.remote_ip4,
1805 self.logger.error(ppp("Unexpected or invalid packet: ", p))
1808 self.pg_enable_capture(self.pg_interfaces)
1809 self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1810 end_addr=self.nat_addr,
1813 self.vapi.ipfix_flush()
1814 capture = self.pg3.get_capture(2)
1815 # verify events in data set
1817 self.assertTrue(p.haslayer(IPFIX))
1818 self.assertEqual(p[IP].src, self.pg3.local_ip4)
1819 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1820 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1821 self.assertEqual(p[UDP].dport, 4739)
1822 self.assertEqual(p[IPFIX].observationDomainID,
1823 self.ipfix_domain_id)
1824 if p.haslayer(Data):
1825 data = ipfix.decode_data_set(p.getlayer(Set))
1826 if scapy.compat.orb(data[0][230]) == 11:
1827 self.verify_ipfix_bib(data, 0, self.pg0.remote_ip6)
1828 elif scapy.compat.orb(data[0][230]) == 7:
1829 self.verify_ipfix_nat64_ses(data,
1831 self.pg0.remote_ip6,
1832 self.pg1.remote_ip4,
1835 self.logger.error(ppp("Unexpected or invalid packet: ", p))
1837 def test_syslog_sess(self):
1838 """ Test syslog session creation and deletion """
1839 self.tcp_port_in = random.randint(1025, 65535)
1840 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
1844 self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1845 end_addr=self.nat_addr,
1848 flags = self.config_flags.NAT_IS_INSIDE
1849 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1850 sw_if_index=self.pg0.sw_if_index)
1851 self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1852 sw_if_index=self.pg1.sw_if_index)
1853 self.vapi.syslog_set_filter(
1854 self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_INFO)
1855 self.vapi.syslog_set_sender(self.pg3.local_ip4, self.pg3.remote_ip4)
1857 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1858 IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
1859 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
1860 self.pg0.add_stream(p)
1861 self.pg_enable_capture(self.pg_interfaces)
1863 p = self.pg1.get_capture(1)
1864 self.tcp_port_out = p[0][TCP].sport
1865 capture = self.pg3.get_capture(1)
1866 self.verify_syslog_sess(capture[0][Raw].load, is_ip6=True)
1868 self.pg_enable_capture(self.pg_interfaces)
1870 self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1871 end_addr=self.nat_addr,
1874 capture = self.pg3.get_capture(1)
1875 self.verify_syslog_sess(capture[0][Raw].load, False, True)
1877 def nat64_get_ses_num(self):
1879 Return number of active NAT64 sessions.
1881 st = self.vapi.nat64_st_dump(proto=255)
1884 def clear_nat64(self):
1886 Clear NAT64 configuration.
1888 self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
1889 src_port=self.ipfix_src_port,
1891 self.ipfix_src_port = 4739
1892 self.ipfix_domain_id = 1
1894 self.vapi.syslog_set_filter(
1895 self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_EMERG)
1897 self.vapi.nat64_set_timeouts(udp=300, tcp_established=7440,
1898 tcp_transitory=240, icmp=60)
1900 interfaces = self.vapi.nat64_interface_dump()
1901 for intf in interfaces:
1902 self.vapi.nat64_add_del_interface(is_add=0, flags=intf.flags,
1903 sw_if_index=intf.sw_if_index)
1905 bib = self.vapi.nat64_bib_dump(proto=255)
1907 if bibe.flags & self.config_flags.NAT_IS_STATIC:
1908 self.vapi.nat64_add_del_static_bib(i_addr=bibe.i_addr,
1916 adresses = self.vapi.nat64_pool_addr_dump()
1917 for addr in adresses:
1918 self.vapi.nat64_add_del_pool_addr_range(start_addr=addr.address,
1919 end_addr=addr.address,
1923 prefixes = self.vapi.nat64_prefix_dump()
1924 for prefix in prefixes:
1925 self.vapi.nat64_add_del_prefix(prefix=str(prefix.prefix),
1926 vrf_id=prefix.vrf_id, is_add=0)
1928 bibs = self.statistics.get_counter('/nat64/total-bibs')
1929 self.assertEqual(bibs[0][0], 0)
1930 sessions = self.statistics.get_counter('/nat64/total-sessions')
1931 self.assertEqual(sessions[0][0], 0)
1934 if __name__ == '__main__':
1935 unittest.main(testRunner=VppTestRunner)