12 from framework import VppTestCase, VppTestRunner, running_extended_tests
13 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
14 from scapy.data import IP_PROTOS
15 from scapy.layers.inet import IP, TCP, UDP, ICMP
16 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
17 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
18 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply, \
19 ICMPv6ND_NS, ICMPv6ND_NA, ICMPv6NDOptDstLLAddr, fragment6
20 from scapy.layers.l2 import Ether, GRE
21 from scapy.packet import Raw
22 from syslog_rfc5424_parser import SyslogMessage, ParseError
23 from syslog_rfc5424_parser.constants import SyslogSeverity
24 from util import ppc, ppp
25 from vpp_papi import VppEnum
28 class TestNAT64(VppTestCase):
29 """ NAT64 Test Cases """
32 def SYSLOG_SEVERITY(self):
33 return VppEnum.vl_api_syslog_severity_t
36 def config_flags(self):
37 return VppEnum.vl_api_nat_config_flags_t
41 super(TestNAT64, cls).setUpClass()
43 cls.tcp_port_in = 6303
44 cls.tcp_port_out = 6303
45 cls.udp_port_in = 6304
46 cls.udp_port_out = 6304
48 cls.icmp_id_out = 6305
49 cls.tcp_external_port = 80
50 cls.nat_addr = '10.0.0.3'
51 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
53 cls.vrf1_nat_addr = '10.0.10.3'
54 cls.ipfix_src_port = 4739
55 cls.ipfix_domain_id = 1
57 cls.create_pg_interfaces(range(6))
58 cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
59 cls.ip6_interfaces.append(cls.pg_interfaces[2])
60 cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
62 cls.vapi.ip_table_add_del(is_add=1,
63 table={'table_id': cls.vrf1_id,
66 cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
68 cls.pg0.generate_remote_hosts(2)
70 for i in cls.ip6_interfaces:
73 i.configure_ipv6_neighbors()
75 for i in cls.ip4_interfaces:
84 cls.pg3.configure_ipv6_neighbors()
90 def tearDownClass(cls):
91 super(TestNAT64, cls).tearDownClass()
94 super(TestNAT64, self).setUp()
95 self.vapi.nat64_plugin_enable_disable(enable=1,
96 bib_buckets=128, st_buckets=256)
99 super(TestNAT64, self).tearDown()
100 if not self.vpp_dead:
101 self.vapi.nat64_plugin_enable_disable(enable=0)
103 def show_commands_at_teardown(self):
104 self.logger.info(self.vapi.cli("show nat64 pool"))
105 self.logger.info(self.vapi.cli("show nat64 interfaces"))
106 self.logger.info(self.vapi.cli("show nat64 prefix"))
107 self.logger.info(self.vapi.cli("show nat64 bib all"))
108 self.logger.info(self.vapi.cli("show nat64 session table all"))
110 def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
112 Create IPv6 packet stream for inside network
114 :param in_if: Inside interface
115 :param out_if: Outside interface
116 :param ttl: Hop Limit of generated packets
117 :param pref: NAT64 prefix
118 :param plen: NAT64 prefix length
122 dst = ''.join(['64:ff9b::', out_if.remote_ip4])
124 dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
127 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
128 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
129 TCP(sport=self.tcp_port_in, dport=20))
133 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
134 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
135 UDP(sport=self.udp_port_in, dport=20))
139 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
140 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
141 ICMPv6EchoRequest(id=self.icmp_id_in))
146 def create_stream_out(self, out_if, dst_ip=None, ttl=64,
147 use_inside_ports=False):
149 Create packet stream for outside network
151 :param out_if: Outside interface
152 :param dst_ip: Destination IP address (Default use global NAT address)
153 :param ttl: TTL of generated packets
154 :param use_inside_ports: Use inside NAT ports as destination ports
155 instead of outside ports
158 dst_ip = self.nat_addr
159 if not use_inside_ports:
160 tcp_port = self.tcp_port_out
161 udp_port = self.udp_port_out
162 icmp_id = self.icmp_id_out
164 tcp_port = self.tcp_port_in
165 udp_port = self.udp_port_in
166 icmp_id = self.icmp_id_in
169 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
170 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
171 TCP(dport=tcp_port, sport=20))
175 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
176 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
177 UDP(dport=udp_port, sport=20))
181 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
182 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
183 ICMP(id=icmp_id, type='echo-reply'))
188 def verify_capture_out(self, capture, nat_ip=None, same_port=False,
189 dst_ip=None, is_ip6=False, ignore_port=False):
191 Verify captured packets on outside network
193 :param capture: Captured packets
194 :param nat_ip: Translated IP address (Default use global NAT address)
195 :param same_port: Source port number is not translated (Default False)
196 :param dst_ip: Destination IP address (Default do not verify)
197 :param is_ip6: If L3 protocol is IPv6 (Default False)
201 ICMP46 = ICMPv6EchoRequest
206 nat_ip = self.nat_addr
207 for packet in capture:
210 self.assert_packet_checksums_valid(packet)
211 self.assertEqual(packet[IP46].src, nat_ip)
212 if dst_ip is not None:
213 self.assertEqual(packet[IP46].dst, dst_ip)
214 if packet.haslayer(TCP):
218 packet[TCP].sport, self.tcp_port_in)
221 packet[TCP].sport, self.tcp_port_in)
222 self.tcp_port_out = packet[TCP].sport
223 self.assert_packet_checksums_valid(packet)
224 elif packet.haslayer(UDP):
228 packet[UDP].sport, self.udp_port_in)
231 packet[UDP].sport, self.udp_port_in)
232 self.udp_port_out = packet[UDP].sport
237 packet[ICMP46].id, self.icmp_id_in)
240 packet[ICMP46].id, self.icmp_id_in)
241 self.icmp_id_out = packet[ICMP46].id
242 self.assert_packet_checksums_valid(packet)
244 self.logger.error(ppp("Unexpected or invalid packet "
245 "(outside network):", packet))
248 def verify_capture_in_ip6(self, capture, src_ip, dst_ip):
250 Verify captured IPv6 packets on inside network
252 :param capture: Captured packets
253 :param src_ip: Source IP
254 :param dst_ip: Destination IP address
256 for packet in capture:
258 self.assertEqual(packet[IPv6].src, src_ip)
259 self.assertEqual(packet[IPv6].dst, dst_ip)
260 self.assert_packet_checksums_valid(packet)
261 if packet.haslayer(TCP):
262 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
263 elif packet.haslayer(UDP):
264 self.assertEqual(packet[UDP].dport, self.udp_port_in)
266 self.assertEqual(packet[ICMPv6EchoReply].id,
269 self.logger.error(ppp("Unexpected or invalid packet "
270 "(inside network):", packet))
273 def create_stream_frag(self, src_if, dst, sport, dport, data,
274 proto=IP_PROTOS.tcp, echo_reply=False):
276 Create fragmented packet stream
278 :param src_if: Source interface
279 :param dst: Destination IPv4 address
280 :param sport: Source port
281 :param dport: Destination port
282 :param data: Payload data
283 :param proto: protocol (TCP, UDP, ICMP)
284 :param echo_reply: use echo_reply if protocol is ICMP
287 if proto == IP_PROTOS.tcp:
288 p = (IP(src=src_if.remote_ip4, dst=dst) /
289 TCP(sport=sport, dport=dport) /
291 p = p.__class__(scapy.compat.raw(p))
292 chksum = p[TCP].chksum
293 proto_header = TCP(sport=sport, dport=dport, chksum=chksum)
294 elif proto == IP_PROTOS.udp:
295 proto_header = UDP(sport=sport, dport=dport)
296 elif proto == IP_PROTOS.icmp:
298 proto_header = ICMP(id=sport, type='echo-request')
300 proto_header = ICMP(id=sport, type='echo-reply')
302 raise Exception("Unsupported protocol")
303 id = random.randint(0, 65535)
305 if proto == IP_PROTOS.tcp:
308 raw = Raw(data[0:16])
309 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
310 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id) /
314 if proto == IP_PROTOS.tcp:
315 raw = Raw(data[4:20])
317 raw = Raw(data[16:32])
318 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
319 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id,
323 if proto == IP_PROTOS.tcp:
327 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
328 IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=proto,
334 def create_stream_frag_ip6(self, src_if, dst, sport, dport, data,
335 pref=None, plen=0, frag_size=128):
337 Create fragmented packet stream
339 :param src_if: Source interface
340 :param dst: Destination IPv4 address
341 :param sport: Source TCP port
342 :param dport: Destination TCP port
343 :param data: Payload data
344 :param pref: NAT64 prefix
345 :param plen: NAT64 prefix length
346 :param fragsize: size of fragments
350 dst_ip6 = ''.join(['64:ff9b::', dst])
352 dst_ip6 = self.compose_ip6(dst, pref, plen)
354 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
355 IPv6(src=src_if.remote_ip6, dst=dst_ip6) /
356 IPv6ExtHdrFragment(id=random.randint(0, 65535)) /
357 TCP(sport=sport, dport=dport) /
360 return fragment6(p, frag_size)
362 def reass_frags_and_verify(self, frags, src, dst):
364 Reassemble and verify fragmented packet
366 :param frags: Captured fragments
367 :param src: Source IPv4 address to verify
368 :param dst: Destination IPv4 address to verify
370 :returns: Reassembled IPv4 packet
374 self.assertEqual(p[IP].src, src)
375 self.assertEqual(p[IP].dst, dst)
376 self.assert_ip_checksum_valid(p)
377 buffer.seek(p[IP].frag * 8)
378 buffer.write(bytes(p[IP].payload))
379 ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst,
380 proto=frags[0][IP].proto)
381 if ip.proto == IP_PROTOS.tcp:
382 p = (ip / TCP(buffer.getvalue()))
383 self.logger.debug(ppp("Reassembled:", p))
384 self.assert_tcp_checksum_valid(p)
385 elif ip.proto == IP_PROTOS.udp:
386 p = (ip / UDP(buffer.getvalue()[:8]) /
387 Raw(buffer.getvalue()[8:]))
388 elif ip.proto == IP_PROTOS.icmp:
389 p = (ip / ICMP(buffer.getvalue()))
392 def reass_frags_and_verify_ip6(self, frags, src, dst):
394 Reassemble and verify fragmented packet
396 :param frags: Captured fragments
397 :param src: Source IPv6 address to verify
398 :param dst: Destination IPv6 address to verify
400 :returns: Reassembled IPv6 packet
404 self.assertEqual(p[IPv6].src, src)
405 self.assertEqual(p[IPv6].dst, dst)
406 buffer.seek(p[IPv6ExtHdrFragment].offset * 8)
407 buffer.write(bytes(p[IPv6ExtHdrFragment].payload))
408 ip = IPv6(src=frags[0][IPv6].src, dst=frags[0][IPv6].dst,
409 nh=frags[0][IPv6ExtHdrFragment].nh)
410 if ip.nh == IP_PROTOS.tcp:
411 p = (ip / TCP(buffer.getvalue()))
412 elif ip.nh == IP_PROTOS.udp:
413 p = (ip / UDP(buffer.getvalue()))
414 self.logger.debug(ppp("Reassembled:", p))
415 self.assert_packet_checksums_valid(p)
418 # TODO: ipfix needs to be separated from NAT base plugin
419 @unittest.skipUnless(running_extended_tests, "part of extended tests")
420 def verify_ipfix_max_bibs(self, data, limit):
422 Verify IPFIX maximum BIB entries exceeded event
424 :param data: Decoded IPFIX data records
425 :param limit: Number of maximum BIB entries that can be created.
427 self.assertEqual(1, len(data))
430 self.assertEqual(scapy.compat.orb(record[230]), 13)
431 # natQuotaExceededEvent
432 self.assertEqual(struct.pack("I", 2), record[466])
434 self.assertEqual(struct.pack("I", limit), record[472])
436 # TODO: ipfix needs to be separated from NAT base plugin
437 @unittest.skipUnless(running_extended_tests, "part of extended tests")
438 def verify_ipfix_bib(self, data, is_create, src_addr):
440 Verify IPFIX NAT64 BIB create and delete events
442 :param data: Decoded IPFIX data records
443 :param is_create: Create event if nonzero value otherwise delete event
444 :param src_addr: IPv6 source address
446 self.assertEqual(1, len(data))
450 self.assertEqual(scapy.compat.orb(record[230]), 10)
452 self.assertEqual(scapy.compat.orb(record[230]), 11)
454 self.assertEqual(src_addr, str(ipaddress.IPv6Address(record[27])))
455 # postNATSourceIPv4Address
456 self.assertEqual(self.nat_addr_n, record[225])
458 self.assertEqual(IP_PROTOS.tcp, scapy.compat.orb(record[4]))
460 self.assertEqual(struct.pack("!I", 0), record[234])
461 # sourceTransportPort
462 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
463 # postNAPTSourceTransportPort
464 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
466 # TODO: ipfix needs to be separated from NAT base plugin
467 @unittest.skipUnless(running_extended_tests, "part of extended tests")
468 def verify_ipfix_nat64_ses(self, data, is_create, src_addr, dst_addr,
471 Verify IPFIX NAT64 session create and delete events
473 :param data: Decoded IPFIX data records
474 :param is_create: Create event if nonzero value otherwise delete event
475 :param src_addr: IPv6 source address
476 :param dst_addr: IPv4 destination address
477 :param dst_port: destination TCP port
479 self.assertEqual(1, len(data))
483 self.assertEqual(scapy.compat.orb(record[230]), 6)
485 self.assertEqual(scapy.compat.orb(record[230]), 7)
487 self.assertEqual(src_addr, str(ipaddress.IPv6Address(record[27])))
488 # destinationIPv6Address
489 self.assertEqual(socket.inet_pton(socket.AF_INET6,
490 self.compose_ip6(dst_addr,
494 # postNATSourceIPv4Address
495 self.assertEqual(self.nat_addr_n, record[225])
496 # postNATDestinationIPv4Address
497 self.assertEqual(socket.inet_pton(socket.AF_INET, dst_addr),
500 self.assertEqual(IP_PROTOS.tcp, scapy.compat.orb(record[4]))
502 self.assertEqual(struct.pack("!I", 0), record[234])
503 # sourceTransportPort
504 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
505 # postNAPTSourceTransportPort
506 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
507 # destinationTransportPort
508 self.assertEqual(struct.pack("!H", dst_port), record[11])
509 # postNAPTDestinationTransportPort
510 self.assertEqual(struct.pack("!H", dst_port), record[228])
512 def verify_syslog_sess(self, data, is_add=True, is_ip6=False):
513 message = data.decode('utf-8')
515 message = SyslogMessage.parse(message)
516 except ParseError as e:
520 self.assertEqual(message.severity, SyslogSeverity.info)
521 self.assertEqual(message.appname, 'NAT')
522 self.assertEqual(message.msgid, 'SADD' if is_add else 'SDEL')
523 sd_params = message.sd.get('nsess')
524 self.assertTrue(sd_params is not None)
526 self.assertEqual(sd_params.get('IATYP'), 'IPv6')
527 self.assertEqual(sd_params.get('ISADDR'), self.pg0.remote_ip6)
529 self.assertEqual(sd_params.get('IATYP'), 'IPv4')
530 self.assertEqual(sd_params.get('ISADDR'), self.pg0.remote_ip4)
531 self.assertTrue(sd_params.get('SSUBIX') is not None)
532 self.assertEqual(sd_params.get('ISPORT'), "%d" % self.tcp_port_in)
533 self.assertEqual(sd_params.get('XATYP'), 'IPv4')
534 self.assertEqual(sd_params.get('XSADDR'), self.nat_addr)
535 self.assertEqual(sd_params.get('XSPORT'), "%d" % self.tcp_port_out)
536 self.assertEqual(sd_params.get('PROTO'), "%d" % IP_PROTOS.tcp)
537 self.assertEqual(sd_params.get('SVLAN'), '0')
538 self.assertEqual(sd_params.get('XDADDR'), self.pg1.remote_ip4)
539 self.assertEqual(sd_params.get('XDPORT'),
540 "%d" % self.tcp_external_port)
542 def compose_ip6(self, ip4, pref, plen):
544 Compose IPv4-embedded IPv6 addresses
546 :param ip4: IPv4 address
547 :param pref: IPv6 prefix
548 :param plen: IPv6 prefix length
549 :returns: IPv4-embedded IPv6 addresses
551 pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
552 ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
567 pref_n[10] = ip4_n[3]
571 pref_n[10] = ip4_n[2]
572 pref_n[11] = ip4_n[3]
575 pref_n[10] = ip4_n[1]
576 pref_n[11] = ip4_n[2]
577 pref_n[12] = ip4_n[3]
579 pref_n[12] = ip4_n[0]
580 pref_n[13] = ip4_n[1]
581 pref_n[14] = ip4_n[2]
582 pref_n[15] = ip4_n[3]
583 packed_pref_n = b''.join([scapy.compat.chb(x) for x in pref_n])
584 return socket.inet_ntop(socket.AF_INET6, packed_pref_n)
586 # TODO: ipfix needs to be separated from NAT base plugin
587 @unittest.skipUnless(running_extended_tests, "part of extended tests")
588 def verify_ipfix_max_sessions(self, data, limit):
590 Verify IPFIX maximum session entries exceeded event
592 :param data: Decoded IPFIX data records
593 :param limit: Number of maximum session entries that can be created.
595 self.assertEqual(1, len(data))
598 self.assertEqual(scapy.compat.orb(record[230]), 13)
599 # natQuotaExceededEvent
600 self.assertEqual(struct.pack("I", 1), record[466])
602 self.assertEqual(struct.pack("I", limit), record[471])
604 def test_nat64_inside_interface_handles_neighbor_advertisement(self):
605 """ NAT64 inside interface handles Neighbor Advertisement """
607 flags = self.config_flags.NAT_IS_INSIDE
608 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
609 sw_if_index=self.pg5.sw_if_index)
612 ping = (Ether(dst=self.pg5.local_mac, src=self.pg5.remote_mac) /
613 IPv6(src=self.pg5.remote_ip6, dst=self.pg5.local_ip6) /
616 self.pg5.add_stream(pkts)
617 self.pg_enable_capture(self.pg_interfaces)
620 # Wait for Neighbor Solicitation
621 capture = self.pg5.get_capture(len(pkts))
624 self.assertEqual(packet[IPv6].src, self.pg5.local_ip6)
625 self.assertEqual(packet.haslayer(ICMPv6ND_NS), 1)
626 tgt = packet[ICMPv6ND_NS].tgt
628 self.logger.error(ppp("Unexpected or invalid packet:", packet))
631 # Send Neighbor Advertisement
632 p = (Ether(dst=self.pg5.local_mac, src=self.pg5.remote_mac) /
633 IPv6(src=self.pg5.remote_ip6, dst=self.pg5.local_ip6) /
634 ICMPv6ND_NA(tgt=tgt) /
635 ICMPv6NDOptDstLLAddr(lladdr=self.pg5.remote_mac))
637 self.pg5.add_stream(pkts)
638 self.pg_enable_capture(self.pg_interfaces)
641 # Try to send ping again
643 self.pg5.add_stream(pkts)
644 self.pg_enable_capture(self.pg_interfaces)
647 # Wait for ping reply
648 capture = self.pg5.get_capture(len(pkts))
651 self.assertEqual(packet[IPv6].src, self.pg5.local_ip6)
652 self.assertEqual(packet[IPv6].dst, self.pg5.remote_ip6)
653 self.assertEqual(packet.haslayer(ICMPv6EchoReply), 1)
655 self.logger.error(ppp("Unexpected or invalid packet:", packet))
659 """ Add/delete address to NAT64 pool """
662 self.vapi.nat64_add_del_pool_addr_range(start_addr=nat_addr,
664 vrf_id=0xFFFFFFFF, is_add=1)
666 addresses = self.vapi.nat64_pool_addr_dump()
667 self.assertEqual(len(addresses), 1)
668 self.assertEqual(str(addresses[0].address), nat_addr)
670 self.vapi.nat64_add_del_pool_addr_range(start_addr=nat_addr,
672 vrf_id=0xFFFFFFFF, is_add=0)
674 addresses = self.vapi.nat64_pool_addr_dump()
675 self.assertEqual(len(addresses), 0)
677 def test_interface(self):
678 """ Enable/disable NAT64 feature on the interface """
679 flags = self.config_flags.NAT_IS_INSIDE
680 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
681 sw_if_index=self.pg0.sw_if_index)
682 self.vapi.nat64_add_del_interface(is_add=1, flags=0,
683 sw_if_index=self.pg1.sw_if_index)
685 interfaces = self.vapi.nat64_interface_dump()
686 self.assertEqual(len(interfaces), 2)
689 for intf in interfaces:
690 if intf.sw_if_index == self.pg0.sw_if_index:
691 self.assertEqual(intf.flags, self.config_flags.NAT_IS_INSIDE)
693 elif intf.sw_if_index == self.pg1.sw_if_index:
694 self.assertEqual(intf.flags, self.config_flags.NAT_IS_OUTSIDE)
696 self.assertTrue(pg0_found)
697 self.assertTrue(pg1_found)
699 features = self.vapi.cli("show interface features pg0")
700 self.assertIn('nat64-in2out', features)
701 features = self.vapi.cli("show interface features pg1")
702 self.assertIn('nat64-out2in', features)
704 self.vapi.nat64_add_del_interface(is_add=0, flags=flags,
705 sw_if_index=self.pg0.sw_if_index)
706 self.vapi.nat64_add_del_interface(is_add=0, flags=flags,
707 sw_if_index=self.pg1.sw_if_index)
709 interfaces = self.vapi.nat64_interface_dump()
710 self.assertEqual(len(interfaces), 0)
712 def test_static_bib(self):
713 """ Add/delete static BIB entry """
714 in_addr = '2001:db8:85a3::8a2e:370:7334'
715 out_addr = '10.1.1.3'
718 proto = IP_PROTOS.tcp
720 self.vapi.nat64_add_del_static_bib(i_addr=in_addr, o_addr=out_addr,
721 i_port=in_port, o_port=out_port,
722 proto=proto, vrf_id=0, is_add=1)
723 bib = self.vapi.nat64_bib_dump(proto=IP_PROTOS.tcp)
726 if bibe.flags & self.config_flags.NAT_IS_STATIC:
728 self.assertEqual(str(bibe.i_addr), in_addr)
729 self.assertEqual(str(bibe.o_addr), out_addr)
730 self.assertEqual(bibe.i_port, in_port)
731 self.assertEqual(bibe.o_port, out_port)
732 self.assertEqual(static_bib_num, 1)
733 bibs = self.statistics.get_counter('/nat64/total-bibs')
734 self.assertEqual(bibs[0][0], 1)
736 self.vapi.nat64_add_del_static_bib(i_addr=in_addr, o_addr=out_addr,
737 i_port=in_port, o_port=out_port,
738 proto=proto, vrf_id=0, is_add=0)
739 bib = self.vapi.nat64_bib_dump(proto=IP_PROTOS.tcp)
742 if bibe.flags & self.config_flags.NAT_IS_STATIC:
744 self.assertEqual(static_bib_num, 0)
745 bibs = self.statistics.get_counter('/nat64/total-bibs')
746 self.assertEqual(bibs[0][0], 0)
748 def test_set_timeouts(self):
749 """ Set NAT64 timeouts """
750 # verify default values
751 timeouts = self.vapi.nat64_get_timeouts()
752 self.assertEqual(timeouts.udp, 300)
753 self.assertEqual(timeouts.icmp, 60)
754 self.assertEqual(timeouts.tcp_transitory, 240)
755 self.assertEqual(timeouts.tcp_established, 7440)
757 # set and verify custom values
758 self.vapi.nat64_set_timeouts(udp=200, tcp_established=7450,
759 tcp_transitory=250, icmp=30)
760 timeouts = self.vapi.nat64_get_timeouts()
761 self.assertEqual(timeouts.udp, 200)
762 self.assertEqual(timeouts.icmp, 30)
763 self.assertEqual(timeouts.tcp_transitory, 250)
764 self.assertEqual(timeouts.tcp_established, 7450)
766 def test_dynamic(self):
767 """ NAT64 dynamic translation test """
768 self.tcp_port_in = 6303
769 self.udp_port_in = 6304
770 self.icmp_id_in = 6305
772 ses_num_start = self.nat64_get_ses_num()
774 self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
775 end_addr=self.nat_addr,
778 flags = self.config_flags.NAT_IS_INSIDE
779 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
780 sw_if_index=self.pg0.sw_if_index)
781 self.vapi.nat64_add_del_interface(is_add=1, flags=0,
782 sw_if_index=self.pg1.sw_if_index)
785 tcpn = self.statistics.get_counter('/nat64/in2out/tcp')[0]
786 udpn = self.statistics.get_counter('/nat64/in2out/udp')[0]
787 icmpn = self.statistics.get_counter('/nat64/in2out/icmp')[0]
788 drops = self.statistics.get_counter('/nat64/in2out/drops')[0]
790 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
791 self.pg0.add_stream(pkts)
792 self.pg_enable_capture(self.pg_interfaces)
794 capture = self.pg1.get_capture(len(pkts))
795 self.verify_capture_out(capture, nat_ip=self.nat_addr,
796 dst_ip=self.pg1.remote_ip4)
798 if_idx = self.pg0.sw_if_index
799 cnt = self.statistics.get_counter('/nat64/in2out/tcp')[0]
800 self.assertEqual(cnt[if_idx] - tcpn[if_idx], 1)
801 cnt = self.statistics.get_counter('/nat64/in2out/udp')[0]
802 self.assertEqual(cnt[if_idx] - udpn[if_idx], 1)
803 cnt = self.statistics.get_counter('/nat64/in2out/icmp')[0]
804 self.assertEqual(cnt[if_idx] - icmpn[if_idx], 1)
805 cnt = self.statistics.get_counter('/nat64/in2out/drops')[0]
806 self.assertEqual(cnt[if_idx] - drops[if_idx], 0)
809 tcpn = self.statistics.get_counter('/nat64/out2in/tcp')[0]
810 udpn = self.statistics.get_counter('/nat64/out2in/udp')[0]
811 icmpn = self.statistics.get_counter('/nat64/out2in/icmp')[0]
812 drops = self.statistics.get_counter('/nat64/out2in/drops')[0]
814 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
815 self.pg1.add_stream(pkts)
816 self.pg_enable_capture(self.pg_interfaces)
818 capture = self.pg0.get_capture(len(pkts))
819 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
820 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
822 if_idx = self.pg1.sw_if_index
823 cnt = self.statistics.get_counter('/nat64/out2in/tcp')[0]
824 self.assertEqual(cnt[if_idx] - tcpn[if_idx], 2)
825 cnt = self.statistics.get_counter('/nat64/out2in/udp')[0]
826 self.assertEqual(cnt[if_idx] - udpn[if_idx], 1)
827 cnt = self.statistics.get_counter('/nat64/out2in/icmp')[0]
828 self.assertEqual(cnt[if_idx] - icmpn[if_idx], 1)
829 cnt = self.statistics.get_counter('/nat64/out2in/drops')[0]
830 self.assertEqual(cnt[if_idx] - drops[if_idx], 0)
832 bibs = self.statistics.get_counter('/nat64/total-bibs')
833 self.assertEqual(bibs[0][0], 3)
834 sessions = self.statistics.get_counter('/nat64/total-sessions')
835 self.assertEqual(sessions[0][0], 3)
838 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
839 self.pg0.add_stream(pkts)
840 self.pg_enable_capture(self.pg_interfaces)
842 capture = self.pg1.get_capture(len(pkts))
843 self.verify_capture_out(capture, nat_ip=self.nat_addr,
844 dst_ip=self.pg1.remote_ip4)
847 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
848 self.pg1.add_stream(pkts)
849 self.pg_enable_capture(self.pg_interfaces)
851 capture = self.pg0.get_capture(len(pkts))
852 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
854 ses_num_end = self.nat64_get_ses_num()
856 self.assertEqual(ses_num_end - ses_num_start, 3)
858 # tenant with specific VRF
859 self.vapi.nat64_add_del_pool_addr_range(start_addr=self.vrf1_nat_addr,
860 end_addr=self.vrf1_nat_addr,
861 vrf_id=self.vrf1_id, is_add=1)
862 flags = self.config_flags.NAT_IS_INSIDE
863 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
864 sw_if_index=self.pg2.sw_if_index)
866 pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
867 self.pg2.add_stream(pkts)
868 self.pg_enable_capture(self.pg_interfaces)
870 capture = self.pg1.get_capture(len(pkts))
871 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
872 dst_ip=self.pg1.remote_ip4)
874 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
875 self.pg1.add_stream(pkts)
876 self.pg_enable_capture(self.pg_interfaces)
878 capture = self.pg2.get_capture(len(pkts))
879 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
881 def test_static(self):
882 """ NAT64 static translation test """
883 self.tcp_port_in = 60303
884 self.udp_port_in = 60304
885 self.icmp_id_in = 60305
886 self.tcp_port_out = 60303
887 self.udp_port_out = 60304
888 self.icmp_id_out = 60305
890 ses_num_start = self.nat64_get_ses_num()
892 self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
893 end_addr=self.nat_addr,
896 flags = self.config_flags.NAT_IS_INSIDE
897 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
898 sw_if_index=self.pg0.sw_if_index)
899 self.vapi.nat64_add_del_interface(is_add=1, flags=0,
900 sw_if_index=self.pg1.sw_if_index)
902 self.vapi.nat64_add_del_static_bib(i_addr=self.pg0.remote_ip6,
903 o_addr=self.nat_addr,
904 i_port=self.tcp_port_in,
905 o_port=self.tcp_port_out,
906 proto=IP_PROTOS.tcp, vrf_id=0,
908 self.vapi.nat64_add_del_static_bib(i_addr=self.pg0.remote_ip6,
909 o_addr=self.nat_addr,
910 i_port=self.udp_port_in,
911 o_port=self.udp_port_out,
912 proto=IP_PROTOS.udp, vrf_id=0,
914 self.vapi.nat64_add_del_static_bib(i_addr=self.pg0.remote_ip6,
915 o_addr=self.nat_addr,
916 i_port=self.icmp_id_in,
917 o_port=self.icmp_id_out,
918 proto=IP_PROTOS.icmp, vrf_id=0,
922 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
923 self.pg0.add_stream(pkts)
924 self.pg_enable_capture(self.pg_interfaces)
926 capture = self.pg1.get_capture(len(pkts))
927 self.verify_capture_out(capture, nat_ip=self.nat_addr,
928 dst_ip=self.pg1.remote_ip4, same_port=True)
931 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
932 self.pg1.add_stream(pkts)
933 self.pg_enable_capture(self.pg_interfaces)
935 capture = self.pg0.get_capture(len(pkts))
936 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
937 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
939 ses_num_end = self.nat64_get_ses_num()
941 self.assertEqual(ses_num_end - ses_num_start, 3)
943 @unittest.skipUnless(running_extended_tests, "part of extended tests")
944 def test_session_timeout(self):
945 """ NAT64 session timeout """
946 self.icmp_id_in = 1234
947 self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
948 end_addr=self.nat_addr,
951 flags = self.config_flags.NAT_IS_INSIDE
952 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
953 sw_if_index=self.pg0.sw_if_index)
954 self.vapi.nat64_add_del_interface(is_add=1, flags=0,
955 sw_if_index=self.pg1.sw_if_index)
956 self.vapi.nat64_set_timeouts(udp=300, tcp_established=5,
960 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
961 self.pg0.add_stream(pkts)
962 self.pg_enable_capture(self.pg_interfaces)
964 capture = self.pg1.get_capture(len(pkts))
966 ses_num_before_timeout = self.nat64_get_ses_num()
970 # ICMP and TCP session after timeout
971 ses_num_after_timeout = self.nat64_get_ses_num()
972 self.assertEqual(ses_num_before_timeout - ses_num_after_timeout, 2)
974 def test_icmp_error(self):
975 """ NAT64 ICMP Error message translation """
976 self.tcp_port_in = 6303
977 self.udp_port_in = 6304
978 self.icmp_id_in = 6305
980 self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
981 end_addr=self.nat_addr,
984 flags = self.config_flags.NAT_IS_INSIDE
985 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
986 sw_if_index=self.pg0.sw_if_index)
987 self.vapi.nat64_add_del_interface(is_add=1, flags=0,
988 sw_if_index=self.pg1.sw_if_index)
990 # send some packets to create sessions
991 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
992 self.pg0.add_stream(pkts)
993 self.pg_enable_capture(self.pg_interfaces)
995 capture_ip4 = self.pg1.get_capture(len(pkts))
996 self.verify_capture_out(capture_ip4,
997 nat_ip=self.nat_addr,
998 dst_ip=self.pg1.remote_ip4)
1000 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
1001 self.pg1.add_stream(pkts)
1002 self.pg_enable_capture(self.pg_interfaces)
1004 capture_ip6 = self.pg0.get_capture(len(pkts))
1005 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
1006 self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
1007 self.pg0.remote_ip6)
1010 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1011 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
1012 ICMPv6DestUnreach(code=1) /
1013 packet[IPv6] for packet in capture_ip6]
1014 self.pg0.add_stream(pkts)
1015 self.pg_enable_capture(self.pg_interfaces)
1017 capture = self.pg1.get_capture(len(pkts))
1018 for packet in capture:
1020 self.assertEqual(packet[IP].src, self.nat_addr)
1021 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1022 self.assertEqual(packet[ICMP].type, 3)
1023 self.assertEqual(packet[ICMP].code, 13)
1024 inner = packet[IPerror]
1025 self.assertEqual(inner.src, self.pg1.remote_ip4)
1026 self.assertEqual(inner.dst, self.nat_addr)
1027 self.assert_packet_checksums_valid(packet)
1028 if inner.haslayer(TCPerror):
1029 self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
1030 elif inner.haslayer(UDPerror):
1031 self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
1033 self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
1035 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1039 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1040 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1041 ICMP(type=3, code=13) /
1042 packet[IP] for packet in capture_ip4]
1043 self.pg1.add_stream(pkts)
1044 self.pg_enable_capture(self.pg_interfaces)
1046 capture = self.pg0.get_capture(len(pkts))
1047 for packet in capture:
1049 self.assertEqual(packet[IPv6].src, ip.src)
1050 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
1051 icmp = packet[ICMPv6DestUnreach]
1052 self.assertEqual(icmp.code, 1)
1053 inner = icmp[IPerror6]
1054 self.assertEqual(inner.src, self.pg0.remote_ip6)
1055 self.assertEqual(inner.dst, ip.src)
1056 self.assert_icmpv6_checksum_valid(packet)
1057 if inner.haslayer(TCPerror):
1058 self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
1059 elif inner.haslayer(UDPerror):
1060 self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
1062 self.assertEqual(inner[ICMPv6EchoRequest].id,
1065 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1068 def test_hairpinning(self):
1069 """ NAT64 hairpinning """
1071 client = self.pg0.remote_hosts[0]
1072 server = self.pg0.remote_hosts[1]
1073 server_tcp_in_port = 22
1074 server_tcp_out_port = 4022
1075 server_udp_in_port = 23
1076 server_udp_out_port = 4023
1077 client_tcp_in_port = 1234
1078 client_udp_in_port = 1235
1079 client_tcp_out_port = 0
1080 client_udp_out_port = 0
1081 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
1082 nat_addr_ip6 = ip.src
1084 self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1085 end_addr=self.nat_addr,
1088 flags = self.config_flags.NAT_IS_INSIDE
1089 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1090 sw_if_index=self.pg0.sw_if_index)
1091 self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1092 sw_if_index=self.pg1.sw_if_index)
1094 self.vapi.nat64_add_del_static_bib(i_addr=server.ip6n,
1095 o_addr=self.nat_addr,
1096 i_port=server_tcp_in_port,
1097 o_port=server_tcp_out_port,
1098 proto=IP_PROTOS.tcp, vrf_id=0,
1100 self.vapi.nat64_add_del_static_bib(i_addr=server.ip6n,
1101 o_addr=self.nat_addr,
1102 i_port=server_udp_in_port,
1103 o_port=server_udp_out_port,
1104 proto=IP_PROTOS.udp, vrf_id=0,
1109 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1110 IPv6(src=client.ip6, dst=nat_addr_ip6) /
1111 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
1113 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1114 IPv6(src=client.ip6, dst=nat_addr_ip6) /
1115 UDP(sport=client_udp_in_port, dport=server_udp_out_port))
1117 self.pg0.add_stream(pkts)
1118 self.pg_enable_capture(self.pg_interfaces)
1120 capture = self.pg0.get_capture(len(pkts))
1121 for packet in capture:
1123 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
1124 self.assertEqual(packet[IPv6].dst, server.ip6)
1125 self.assert_packet_checksums_valid(packet)
1126 if packet.haslayer(TCP):
1127 self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
1128 self.assertEqual(packet[TCP].dport, server_tcp_in_port)
1129 client_tcp_out_port = packet[TCP].sport
1131 self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
1132 self.assertEqual(packet[UDP].dport, server_udp_in_port)
1133 client_udp_out_port = packet[UDP].sport
1135 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1140 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1141 IPv6(src=server.ip6, dst=nat_addr_ip6) /
1142 TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
1144 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1145 IPv6(src=server.ip6, dst=nat_addr_ip6) /
1146 UDP(sport=server_udp_in_port, dport=client_udp_out_port))
1148 self.pg0.add_stream(pkts)
1149 self.pg_enable_capture(self.pg_interfaces)
1151 capture = self.pg0.get_capture(len(pkts))
1152 for packet in capture:
1154 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
1155 self.assertEqual(packet[IPv6].dst, client.ip6)
1156 self.assert_packet_checksums_valid(packet)
1157 if packet.haslayer(TCP):
1158 self.assertEqual(packet[TCP].sport, server_tcp_out_port)
1159 self.assertEqual(packet[TCP].dport, client_tcp_in_port)
1161 self.assertEqual(packet[UDP].sport, server_udp_out_port)
1162 self.assertEqual(packet[UDP].dport, client_udp_in_port)
1164 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1169 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1170 IPv6(src=client.ip6, dst=nat_addr_ip6) /
1171 ICMPv6DestUnreach(code=1) /
1172 packet[IPv6] for packet in capture]
1173 self.pg0.add_stream(pkts)
1174 self.pg_enable_capture(self.pg_interfaces)
1176 capture = self.pg0.get_capture(len(pkts))
1177 for packet in capture:
1179 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
1180 self.assertEqual(packet[IPv6].dst, server.ip6)
1181 icmp = packet[ICMPv6DestUnreach]
1182 self.assertEqual(icmp.code, 1)
1183 inner = icmp[IPerror6]
1184 self.assertEqual(inner.src, server.ip6)
1185 self.assertEqual(inner.dst, nat_addr_ip6)
1186 self.assert_packet_checksums_valid(packet)
1187 if inner.haslayer(TCPerror):
1188 self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
1189 self.assertEqual(inner[TCPerror].dport,
1190 client_tcp_out_port)
1192 self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
1193 self.assertEqual(inner[UDPerror].dport,
1194 client_udp_out_port)
1196 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1199 def test_prefix(self):
1200 """ NAT64 Network-Specific Prefix """
1202 self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1203 end_addr=self.nat_addr,
1206 flags = self.config_flags.NAT_IS_INSIDE
1207 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1208 sw_if_index=self.pg0.sw_if_index)
1209 self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1210 sw_if_index=self.pg1.sw_if_index)
1211 self.vapi.nat64_add_del_pool_addr_range(start_addr=self.vrf1_nat_addr,
1212 end_addr=self.vrf1_nat_addr,
1213 vrf_id=self.vrf1_id, is_add=1)
1214 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1215 sw_if_index=self.pg2.sw_if_index)
1218 global_pref64 = "2001:db8::"
1219 global_pref64_len = 32
1220 global_pref64_str = "{}/{}".format(global_pref64, global_pref64_len)
1221 self.vapi.nat64_add_del_prefix(prefix=global_pref64_str, vrf_id=0,
1224 prefix = self.vapi.nat64_prefix_dump()
1225 self.assertEqual(len(prefix), 1)
1226 self.assertEqual(str(prefix[0].prefix), global_pref64_str)
1227 self.assertEqual(prefix[0].vrf_id, 0)
1229 # Add tenant specific prefix
1230 vrf1_pref64 = "2001:db8:122:300::"
1231 vrf1_pref64_len = 56
1232 vrf1_pref64_str = "{}/{}".format(vrf1_pref64, vrf1_pref64_len)
1233 self.vapi.nat64_add_del_prefix(prefix=vrf1_pref64_str,
1234 vrf_id=self.vrf1_id, is_add=1)
1236 prefix = self.vapi.nat64_prefix_dump()
1237 self.assertEqual(len(prefix), 2)
1240 pkts = self.create_stream_in_ip6(self.pg0,
1243 plen=global_pref64_len)
1244 self.pg0.add_stream(pkts)
1245 self.pg_enable_capture(self.pg_interfaces)
1247 capture = self.pg1.get_capture(len(pkts))
1248 self.verify_capture_out(capture, nat_ip=self.nat_addr,
1249 dst_ip=self.pg1.remote_ip4)
1251 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
1252 self.pg1.add_stream(pkts)
1253 self.pg_enable_capture(self.pg_interfaces)
1255 capture = self.pg0.get_capture(len(pkts))
1256 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
1259 self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
1261 # Tenant specific prefix
1262 pkts = self.create_stream_in_ip6(self.pg2,
1265 plen=vrf1_pref64_len)
1266 self.pg2.add_stream(pkts)
1267 self.pg_enable_capture(self.pg_interfaces)
1269 capture = self.pg1.get_capture(len(pkts))
1270 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
1271 dst_ip=self.pg1.remote_ip4)
1273 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
1274 self.pg1.add_stream(pkts)
1275 self.pg_enable_capture(self.pg_interfaces)
1277 capture = self.pg2.get_capture(len(pkts))
1278 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
1281 self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
1283 def test_unknown_proto(self):
1284 """ NAT64 translate packet with unknown protocol """
1286 self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1287 end_addr=self.nat_addr,
1290 flags = self.config_flags.NAT_IS_INSIDE
1291 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1292 sw_if_index=self.pg0.sw_if_index)
1293 self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1294 sw_if_index=self.pg1.sw_if_index)
1295 remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
1298 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1299 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
1300 TCP(sport=self.tcp_port_in, dport=20))
1301 self.pg0.add_stream(p)
1302 self.pg_enable_capture(self.pg_interfaces)
1304 p = self.pg1.get_capture(1)
1306 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1307 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47) /
1309 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
1310 TCP(sport=1234, dport=1234))
1311 self.pg0.add_stream(p)
1312 self.pg_enable_capture(self.pg_interfaces)
1314 p = self.pg1.get_capture(1)
1317 self.assertEqual(packet[IP].src, self.nat_addr)
1318 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1319 self.assertEqual(packet.haslayer(GRE), 1)
1320 self.assert_packet_checksums_valid(packet)
1322 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1326 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1327 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1329 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
1330 TCP(sport=1234, dport=1234))
1331 self.pg1.add_stream(p)
1332 self.pg_enable_capture(self.pg_interfaces)
1334 p = self.pg0.get_capture(1)
1337 self.assertEqual(packet[IPv6].src, remote_ip6)
1338 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
1339 self.assertEqual(packet[IPv6].nh, 47)
1341 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1344 def test_hairpinning_unknown_proto(self):
1345 """ NAT64 translate packet with unknown protocol - hairpinning """
1347 client = self.pg0.remote_hosts[0]
1348 server = self.pg0.remote_hosts[1]
1349 server_tcp_in_port = 22
1350 server_tcp_out_port = 4022
1351 client_tcp_in_port = 1234
1352 client_tcp_out_port = 1235
1353 server_nat_ip = "10.0.0.100"
1354 client_nat_ip = "10.0.0.110"
1355 server_nat_ip6 = self.compose_ip6(server_nat_ip, '64:ff9b::', 96)
1356 client_nat_ip6 = self.compose_ip6(client_nat_ip, '64:ff9b::', 96)
1358 self.vapi.nat64_add_del_pool_addr_range(start_addr=server_nat_ip,
1359 end_addr=client_nat_ip,
1362 flags = self.config_flags.NAT_IS_INSIDE
1363 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1364 sw_if_index=self.pg0.sw_if_index)
1365 self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1366 sw_if_index=self.pg1.sw_if_index)
1368 self.vapi.nat64_add_del_static_bib(i_addr=server.ip6n,
1369 o_addr=server_nat_ip,
1370 i_port=server_tcp_in_port,
1371 o_port=server_tcp_out_port,
1372 proto=IP_PROTOS.tcp, vrf_id=0,
1375 self.vapi.nat64_add_del_static_bib(i_addr=server.ip6n,
1376 o_addr=server_nat_ip, i_port=0,
1378 proto=IP_PROTOS.gre, vrf_id=0,
1381 self.vapi.nat64_add_del_static_bib(i_addr=client.ip6n,
1382 o_addr=client_nat_ip,
1383 i_port=client_tcp_in_port,
1384 o_port=client_tcp_out_port,
1385 proto=IP_PROTOS.tcp, vrf_id=0,
1389 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1390 IPv6(src=client.ip6, dst=server_nat_ip6) /
1391 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
1392 self.pg0.add_stream(p)
1393 self.pg_enable_capture(self.pg_interfaces)
1395 p = self.pg0.get_capture(1)
1397 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1398 IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre) /
1400 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
1401 TCP(sport=1234, dport=1234))
1402 self.pg0.add_stream(p)
1403 self.pg_enable_capture(self.pg_interfaces)
1405 p = self.pg0.get_capture(1)
1408 self.assertEqual(packet[IPv6].src, client_nat_ip6)
1409 self.assertEqual(packet[IPv6].dst, server.ip6)
1410 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
1412 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1416 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1417 IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre) /
1419 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
1420 TCP(sport=1234, dport=1234))
1421 self.pg0.add_stream(p)
1422 self.pg_enable_capture(self.pg_interfaces)
1424 p = self.pg0.get_capture(1)
1427 self.assertEqual(packet[IPv6].src, server_nat_ip6)
1428 self.assertEqual(packet[IPv6].dst, client.ip6)
1429 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
1431 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1434 def test_one_armed_nat64(self):
1435 """ One armed NAT64 """
1437 remote_host_ip6 = self.compose_ip6(self.pg3.remote_ip4,
1441 self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1442 end_addr=self.nat_addr,
1445 flags = self.config_flags.NAT_IS_INSIDE
1446 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1447 sw_if_index=self.pg3.sw_if_index)
1448 self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1449 sw_if_index=self.pg3.sw_if_index)
1452 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
1453 IPv6(src=self.pg3.remote_ip6, dst=remote_host_ip6) /
1454 TCP(sport=12345, dport=80))
1455 self.pg3.add_stream(p)
1456 self.pg_enable_capture(self.pg_interfaces)
1458 capture = self.pg3.get_capture(1)
1463 self.assertEqual(ip.src, self.nat_addr)
1464 self.assertEqual(ip.dst, self.pg3.remote_ip4)
1465 self.assertNotEqual(tcp.sport, 12345)
1466 external_port = tcp.sport
1467 self.assertEqual(tcp.dport, 80)
1468 self.assert_packet_checksums_valid(p)
1470 self.logger.error(ppp("Unexpected or invalid packet:", p))
1474 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
1475 IP(src=self.pg3.remote_ip4, dst=self.nat_addr) /
1476 TCP(sport=80, dport=external_port))
1477 self.pg3.add_stream(p)
1478 self.pg_enable_capture(self.pg_interfaces)
1480 capture = self.pg3.get_capture(1)
1485 self.assertEqual(ip.src, remote_host_ip6)
1486 self.assertEqual(ip.dst, self.pg3.remote_ip6)
1487 self.assertEqual(tcp.sport, 80)
1488 self.assertEqual(tcp.dport, 12345)
1489 self.assert_packet_checksums_valid(p)
1491 self.logger.error(ppp("Unexpected or invalid packet:", p))
1494 def test_frag_in_order(self):
1495 """ NAT64 translate fragments arriving in order """
1496 self.tcp_port_in = random.randint(1025, 65535)
1498 self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1499 end_addr=self.nat_addr,
1502 flags = self.config_flags.NAT_IS_INSIDE
1503 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1504 sw_if_index=self.pg0.sw_if_index)
1505 self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1506 sw_if_index=self.pg1.sw_if_index)
1510 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
1511 self.tcp_port_in, 20, data)
1512 self.pg0.add_stream(pkts)
1513 self.pg_enable_capture(self.pg_interfaces)
1515 frags = self.pg1.get_capture(len(pkts))
1516 p = self.reass_frags_and_verify(frags,
1518 self.pg1.remote_ip4)
1519 self.assertEqual(p[TCP].dport, 20)
1520 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
1521 self.tcp_port_out = p[TCP].sport
1522 self.assertEqual(data, p[Raw].load)
1525 data = b"A" * 4 + b"b" * 16 + b"C" * 3
1526 pkts = self.create_stream_frag(self.pg1,
1531 self.pg1.add_stream(pkts)
1532 self.pg_enable_capture(self.pg_interfaces)
1534 frags = self.pg0.get_capture(len(pkts))
1535 self.logger.debug(ppc("Captured:", frags))
1536 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
1537 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
1538 self.assertEqual(p[TCP].sport, 20)
1539 self.assertEqual(p[TCP].dport, self.tcp_port_in)
1540 self.assertEqual(data, p[Raw].load)
1542 def test_reass_hairpinning(self):
1543 """ NAT64 fragments hairpinning """
1545 server = self.pg0.remote_hosts[1]
1546 server_in_port = random.randint(1025, 65535)
1547 server_out_port = random.randint(1025, 65535)
1548 client_in_port = random.randint(1025, 65535)
1549 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
1550 nat_addr_ip6 = ip.src
1552 self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1553 end_addr=self.nat_addr,
1556 flags = self.config_flags.NAT_IS_INSIDE
1557 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1558 sw_if_index=self.pg0.sw_if_index)
1559 self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1560 sw_if_index=self.pg1.sw_if_index)
1562 # add static BIB entry for server
1563 self.vapi.nat64_add_del_static_bib(i_addr=server.ip6n,
1564 o_addr=self.nat_addr,
1565 i_port=server_in_port,
1566 o_port=server_out_port,
1567 proto=IP_PROTOS.tcp, vrf_id=0,
1570 # send packet from host to server
1571 pkts = self.create_stream_frag_ip6(self.pg0,
1576 self.pg0.add_stream(pkts)
1577 self.pg_enable_capture(self.pg_interfaces)
1579 frags = self.pg0.get_capture(len(pkts))
1580 self.logger.debug(ppc("Captured:", frags))
1581 p = self.reass_frags_and_verify_ip6(frags, nat_addr_ip6, server.ip6)
1582 self.assertNotEqual(p[TCP].sport, client_in_port)
1583 self.assertEqual(p[TCP].dport, server_in_port)
1584 self.assertEqual(data, p[Raw].load)
1586 def test_frag_out_of_order(self):
1587 """ NAT64 translate fragments arriving out of order """
1588 self.tcp_port_in = random.randint(1025, 65535)
1590 self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1591 end_addr=self.nat_addr,
1594 flags = self.config_flags.NAT_IS_INSIDE
1595 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1596 sw_if_index=self.pg0.sw_if_index)
1597 self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1598 sw_if_index=self.pg1.sw_if_index)
1602 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
1603 self.tcp_port_in, 20, data)
1605 self.pg0.add_stream(pkts)
1606 self.pg_enable_capture(self.pg_interfaces)
1608 frags = self.pg1.get_capture(len(pkts))
1609 p = self.reass_frags_and_verify(frags,
1611 self.pg1.remote_ip4)
1612 self.assertEqual(p[TCP].dport, 20)
1613 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
1614 self.tcp_port_out = p[TCP].sport
1615 self.assertEqual(data, p[Raw].load)
1618 data = b"A" * 4 + b"B" * 16 + b"C" * 3
1619 pkts = self.create_stream_frag(self.pg1,
1625 self.pg1.add_stream(pkts)
1626 self.pg_enable_capture(self.pg_interfaces)
1628 frags = self.pg0.get_capture(len(pkts))
1629 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
1630 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
1631 self.assertEqual(p[TCP].sport, 20)
1632 self.assertEqual(p[TCP].dport, self.tcp_port_in)
1633 self.assertEqual(data, p[Raw].load)
1635 def test_interface_addr(self):
1636 """ Acquire NAT64 pool addresses from interface """
1637 self.vapi.nat64_add_del_interface_addr(
1639 sw_if_index=self.pg4.sw_if_index)
1641 # no address in NAT64 pool
1642 addresses = self.vapi.nat44_address_dump()
1643 self.assertEqual(0, len(addresses))
1645 # configure interface address and check NAT64 address pool
1646 self.pg4.config_ip4()
1647 addresses = self.vapi.nat64_pool_addr_dump()
1648 self.assertEqual(len(addresses), 1)
1650 self.assertEqual(str(addresses[0].address),
1653 # remove interface address and check NAT64 address pool
1654 self.pg4.unconfig_ip4()
1655 addresses = self.vapi.nat64_pool_addr_dump()
1656 self.assertEqual(0, len(addresses))
1658 # TODO: ipfix needs to be separated from NAT base plugin
1659 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1660 def test_ipfix_max_bibs_sessions(self):
1661 """ IPFIX logging maximum session and BIB entries exceeded """
1664 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
1668 self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1669 end_addr=self.nat_addr,
1672 flags = self.config_flags.NAT_IS_INSIDE
1673 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1674 sw_if_index=self.pg0.sw_if_index)
1675 self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1676 sw_if_index=self.pg1.sw_if_index)
1680 for i in range(0, max_bibs):
1681 src = "fd01:aa::%x" % (i)
1682 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1683 IPv6(src=src, dst=remote_host_ip6) /
1684 TCP(sport=12345, dport=80))
1686 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1687 IPv6(src=src, dst=remote_host_ip6) /
1688 TCP(sport=12345, dport=22))
1690 self.pg0.add_stream(pkts)
1691 self.pg_enable_capture(self.pg_interfaces)
1693 self.pg1.get_capture(max_sessions)
1695 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4,
1696 src_address=self.pg3.local_ip4,
1698 template_interval=10)
1699 self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
1700 src_port=self.ipfix_src_port,
1703 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1704 IPv6(src=src, dst=remote_host_ip6) /
1705 TCP(sport=12345, dport=25))
1706 self.pg0.add_stream(p)
1707 self.pg_enable_capture(self.pg_interfaces)
1709 self.pg1.assert_nothing_captured()
1711 self.vapi.ipfix_flush()
1712 capture = self.pg3.get_capture(7)
1713 ipfix = IPFIXDecoder()
1714 # first load template
1716 self.assertTrue(p.haslayer(IPFIX))
1717 self.assertEqual(p[IP].src, self.pg3.local_ip4)
1718 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1719 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1720 self.assertEqual(p[UDP].dport, 4739)
1721 self.assertEqual(p[IPFIX].observationDomainID,
1722 self.ipfix_domain_id)
1723 if p.haslayer(Template):
1724 ipfix.add_template(p.getlayer(Template))
1725 # verify events in data set
1727 if p.haslayer(Data):
1728 data = ipfix.decode_data_set(p.getlayer(Set))
1729 self.verify_ipfix_max_sessions(data, max_sessions)
1731 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1732 IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
1733 TCP(sport=12345, dport=80))
1734 self.pg0.add_stream(p)
1735 self.pg_enable_capture(self.pg_interfaces)
1737 self.pg1.assert_nothing_captured()
1739 self.vapi.ipfix_flush()
1740 capture = self.pg3.get_capture(1)
1741 # verify events in data set
1743 self.assertTrue(p.haslayer(IPFIX))
1744 self.assertEqual(p[IP].src, self.pg3.local_ip4)
1745 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1746 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1747 self.assertEqual(p[UDP].dport, 4739)
1748 self.assertEqual(p[IPFIX].observationDomainID,
1749 self.ipfix_domain_id)
1750 if p.haslayer(Data):
1751 data = ipfix.decode_data_set(p.getlayer(Set))
1752 self.verify_ipfix_max_bibs(data, max_bibs)
1754 # TODO: ipfix needs to be separated from NAT base plugin
1755 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1756 def test_ipfix_bib_ses(self):
1757 """ IPFIX logging NAT64 BIB/session create and delete events """
1758 self.tcp_port_in = random.randint(1025, 65535)
1759 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
1763 self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1764 end_addr=self.nat_addr,
1767 flags = self.config_flags.NAT_IS_INSIDE
1768 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1769 sw_if_index=self.pg0.sw_if_index)
1770 self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1771 sw_if_index=self.pg1.sw_if_index)
1772 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4,
1773 src_address=self.pg3.local_ip4,
1775 template_interval=10)
1776 self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
1777 src_port=self.ipfix_src_port,
1781 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1782 IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
1783 TCP(sport=self.tcp_port_in, dport=25))
1784 self.pg0.add_stream(p)
1785 self.pg_enable_capture(self.pg_interfaces)
1787 p = self.pg1.get_capture(1)
1788 self.tcp_port_out = p[0][TCP].sport
1789 self.vapi.ipfix_flush()
1790 capture = self.pg3.get_capture(8)
1791 ipfix = IPFIXDecoder()
1792 # first load template
1794 self.assertTrue(p.haslayer(IPFIX))
1795 self.assertEqual(p[IP].src, self.pg3.local_ip4)
1796 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1797 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1798 self.assertEqual(p[UDP].dport, 4739)
1799 self.assertEqual(p[IPFIX].observationDomainID,
1800 self.ipfix_domain_id)
1801 if p.haslayer(Template):
1802 ipfix.add_template(p.getlayer(Template))
1803 # verify events in data set
1805 if p.haslayer(Data):
1806 data = ipfix.decode_data_set(p.getlayer(Set))
1807 if scapy.compat.orb(data[0][230]) == 10:
1808 self.verify_ipfix_bib(data, 1, self.pg0.remote_ip6)
1809 elif scapy.compat.orb(data[0][230]) == 6:
1810 self.verify_ipfix_nat64_ses(data,
1812 self.pg0.remote_ip6,
1813 self.pg1.remote_ip4,
1816 self.logger.error(ppp("Unexpected or invalid packet: ", p))
1819 self.pg_enable_capture(self.pg_interfaces)
1820 self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1821 end_addr=self.nat_addr,
1824 self.vapi.ipfix_flush()
1825 capture = self.pg3.get_capture(2)
1826 # verify events in data set
1828 self.assertTrue(p.haslayer(IPFIX))
1829 self.assertEqual(p[IP].src, self.pg3.local_ip4)
1830 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1831 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1832 self.assertEqual(p[UDP].dport, 4739)
1833 self.assertEqual(p[IPFIX].observationDomainID,
1834 self.ipfix_domain_id)
1835 if p.haslayer(Data):
1836 data = ipfix.decode_data_set(p.getlayer(Set))
1837 if scapy.compat.orb(data[0][230]) == 11:
1838 self.verify_ipfix_bib(data, 0, self.pg0.remote_ip6)
1839 elif scapy.compat.orb(data[0][230]) == 7:
1840 self.verify_ipfix_nat64_ses(data,
1842 self.pg0.remote_ip6,
1843 self.pg1.remote_ip4,
1846 self.logger.error(ppp("Unexpected or invalid packet: ", p))
1848 def test_syslog_sess(self):
1849 """ Test syslog session creation and deletion """
1850 self.tcp_port_in = random.randint(1025, 65535)
1851 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
1855 self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1856 end_addr=self.nat_addr,
1859 flags = self.config_flags.NAT_IS_INSIDE
1860 self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
1861 sw_if_index=self.pg0.sw_if_index)
1862 self.vapi.nat64_add_del_interface(is_add=1, flags=0,
1863 sw_if_index=self.pg1.sw_if_index)
1864 self.vapi.syslog_set_filter(
1865 self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_INFO)
1866 self.vapi.syslog_set_sender(self.pg3.local_ip4, self.pg3.remote_ip4)
1868 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1869 IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
1870 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
1871 self.pg0.add_stream(p)
1872 self.pg_enable_capture(self.pg_interfaces)
1874 p = self.pg1.get_capture(1)
1875 self.tcp_port_out = p[0][TCP].sport
1876 capture = self.pg3.get_capture(1)
1877 self.verify_syslog_sess(capture[0][Raw].load, is_ip6=True)
1879 self.pg_enable_capture(self.pg_interfaces)
1881 self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
1882 end_addr=self.nat_addr,
1885 capture = self.pg3.get_capture(1)
1886 self.verify_syslog_sess(capture[0][Raw].load, False, True)
1888 def nat64_get_ses_num(self):
1890 Return number of active NAT64 sessions.
1892 st = self.vapi.nat64_st_dump(proto=255)
1895 def clear_nat64(self):
1897 Clear NAT64 configuration.
1899 self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
1900 src_port=self.ipfix_src_port,
1902 self.ipfix_src_port = 4739
1903 self.ipfix_domain_id = 1
1905 self.vapi.syslog_set_filter(
1906 self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_EMERG)
1908 self.vapi.nat64_set_timeouts(udp=300, tcp_established=7440,
1909 tcp_transitory=240, icmp=60)
1911 interfaces = self.vapi.nat64_interface_dump()
1912 for intf in interfaces:
1913 self.vapi.nat64_add_del_interface(is_add=0, flags=intf.flags,
1914 sw_if_index=intf.sw_if_index)
1916 bib = self.vapi.nat64_bib_dump(proto=255)
1918 if bibe.flags & self.config_flags.NAT_IS_STATIC:
1919 self.vapi.nat64_add_del_static_bib(i_addr=bibe.i_addr,
1927 adresses = self.vapi.nat64_pool_addr_dump()
1928 for addr in adresses:
1929 self.vapi.nat64_add_del_pool_addr_range(start_addr=addr.address,
1930 end_addr=addr.address,
1934 prefixes = self.vapi.nat64_prefix_dump()
1935 for prefix in prefixes:
1936 self.vapi.nat64_add_del_prefix(prefix=str(prefix.prefix),
1937 vrf_id=prefix.vrf_id, is_add=0)
1939 bibs = self.statistics.get_counter('/nat64/total-bibs')
1940 self.assertEqual(bibs[0][0], 0)
1941 sessions = self.statistics.get_counter('/nat64/total-sessions')
1942 self.assertEqual(sessions[0][0], 0)
1945 if __name__ == '__main__':
1946 unittest.main(testRunner=VppTestRunner)