X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=src%2Fplugins%2Fnat%2Ftest%2Ftest_nat.py;h=09bf8a2d41e13dcf6ccfdd4c78522be06e4ca9cc;hb=603e75465;hp=19e4c50e8f0a295fbf1f9900d36a36dafef4a034;hpb=2f1563129ad8d34d365f5ef8620ff76ff7b08e70;p=vpp.git diff --git a/src/plugins/nat/test/test_nat.py b/src/plugins/nat/test/test_nat.py index 19e4c50e8f0..09bf8a2d41e 100644 --- a/src/plugins/nat/test/test_nat.py +++ b/src/plugins/nat/test/test_nat.py @@ -1,40 +1,35 @@ #!/usr/bin/env python3 import ipaddress +import random import socket -import unittest import struct -import random - -from framework import VppTestCase, VppTestRunner, running_extended_tests +import unittest +from io import BytesIO +from time import sleep import scapy.compat +from framework import VppTestCase, VppTestRunner, running_extended_tests +from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder +from scapy.all import bind_layers, Packet, ByteEnumField, ShortField, \ + IPField, IntField, LongField, XByteField, FlagsField, FieldLenField, \ + PacketListField +from scapy.data import IP_PROTOS from scapy.layers.inet import IP, TCP, UDP, ICMP from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror +from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply, \ ICMPv6ND_NS, ICMPv6ND_NA, ICMPv6NDOptDstLLAddr, fragment6 -from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment from scapy.layers.l2 import Ether, ARP, GRE -from scapy.data import IP_PROTOS -from scapy.packet import bind_layers, Raw -from util import ppp -from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder -from time import sleep -from util import ip4_range -from vpp_papi import mac_pton +from scapy.packet import Raw from syslog_rfc5424_parser import SyslogMessage, ParseError -from syslog_rfc5424_parser.constants import SyslogFacility, SyslogSeverity -from io import BytesIO -from vpp_papi import VppEnum -from vpp_ip_route import VppIpRoute, VppRoutePath, FibPathType -from vpp_neighbor import VppNeighbor -from scapy.all import bind_layers, Packet, ByteEnumField, ShortField, \ - IPField, IntField, LongField, XByteField, FlagsField, FieldLenField, \ - PacketListField -from ipaddress import IPv6Network +from syslog_rfc5424_parser.constants import SyslogSeverity +from util import ip4_range from util import ppc, ppp -from socket import inet_pton, AF_INET from vpp_acl import AclRule, VppAcl, VppAclInterface +from vpp_ip_route import VppIpRoute, VppRoutePath +from vpp_neighbor import VppNeighbor +from vpp_papi import VppEnum # NAT HA protocol event data @@ -43,7 +38,7 @@ class Event(Packet): fields_desc = [ByteEnumField("event_type", None, {1: "add", 2: "del", 3: "refresh"}), ByteEnumField("protocol", None, - {0: "udp", 1: "tcp", 2: "icmp"}), + {0: "other", 1: "udp", 2: "tcp", 3: "icmp"}), ShortField("flags", 0), IPField("in_addr", None), IPField("out_addr", None), @@ -453,7 +448,7 @@ class MethodHolder(VppTestCase): return pkts def verify_capture_out(self, capture, nat_ip=None, same_port=False, - dst_ip=None, is_ip6=False): + dst_ip=None, is_ip6=False, ignore_port=False): """ Verify captured packets on outside network @@ -479,25 +474,32 @@ class MethodHolder(VppTestCase): if dst_ip is not None: self.assertEqual(packet[IP46].dst, dst_ip) if packet.haslayer(TCP): - if same_port: - self.assertEqual(packet[TCP].sport, self.tcp_port_in) - else: - self.assertNotEqual( - packet[TCP].sport, self.tcp_port_in) + if not ignore_port: + if same_port: + self.assertEqual( + packet[TCP].sport, self.tcp_port_in) + else: + self.assertNotEqual( + packet[TCP].sport, self.tcp_port_in) self.tcp_port_out = packet[TCP].sport self.assert_packet_checksums_valid(packet) elif packet.haslayer(UDP): - if same_port: - self.assertEqual(packet[UDP].sport, self.udp_port_in) - else: - self.assertNotEqual( - packet[UDP].sport, self.udp_port_in) + if not ignore_port: + if same_port: + self.assertEqual( + packet[UDP].sport, self.udp_port_in) + else: + self.assertNotEqual( + packet[UDP].sport, self.udp_port_in) self.udp_port_out = packet[UDP].sport else: - if same_port: - self.assertEqual(packet[ICMP46].id, self.icmp_id_in) - else: - self.assertNotEqual(packet[ICMP46].id, self.icmp_id_in) + if not ignore_port: + if same_port: + self.assertEqual( + packet[ICMP46].id, self.icmp_id_in) + else: + self.assertNotEqual( + packet[ICMP46].id, self.icmp_id_in) self.icmp_id_out = packet[ICMP46].id self.assert_packet_checksums_valid(packet) except: @@ -1110,7 +1112,8 @@ class MethodHolder(VppTestCase): else: raise Exception("Unsupported protocol") - def frag_in_order(self, proto=IP_PROTOS.tcp, dont_translate=False): + def frag_in_order(self, proto=IP_PROTOS.tcp, dont_translate=False, + ignore_port=False): layer = self.proto2layer(proto) if proto == IP_PROTOS.tcp: @@ -1137,14 +1140,16 @@ class MethodHolder(VppTestCase): if proto != IP_PROTOS.icmp: if not dont_translate: self.assertEqual(p[layer].dport, 20) - self.assertNotEqual(p[layer].sport, self.port_in) + if not ignore_port: + self.assertNotEqual(p[layer].sport, self.port_in) else: self.assertEqual(p[layer].sport, self.port_in) else: - if not dont_translate: - self.assertNotEqual(p[layer].id, self.port_in) - else: - self.assertEqual(p[layer].id, self.port_in) + if not ignore_port: + if not dont_translate: + self.assertNotEqual(p[layer].id, self.port_in) + else: + self.assertEqual(p[layer].id, self.port_in) self.assertEqual(data, p[Raw].load) # out2in @@ -1225,7 +1230,7 @@ class MethodHolder(VppTestCase): self.assertEqual(p[layer].id, self.port_in) self.assertEqual(data, p[Raw].load) - def reass_hairpinning(self, proto=IP_PROTOS.tcp): + def reass_hairpinning(self, proto=IP_PROTOS.tcp, ignore_port=False): layer = self.proto2layer(proto) if proto == IP_PROTOS.tcp: @@ -1248,13 +1253,16 @@ class MethodHolder(VppTestCase): self.nat_addr, self.server.ip4) if proto != IP_PROTOS.icmp: - self.assertNotEqual(p[layer].sport, self.host_in_port) + if not ignore_port: + self.assertNotEqual(p[layer].sport, self.host_in_port) self.assertEqual(p[layer].dport, self.server_in_port) else: - self.assertNotEqual(p[layer].id, self.host_in_port) + if not ignore_port: + self.assertNotEqual(p[layer].id, self.host_in_port) self.assertEqual(data, p[Raw].load) - def frag_out_of_order(self, proto=IP_PROTOS.tcp, dont_translate=False): + def frag_out_of_order(self, proto=IP_PROTOS.tcp, dont_translate=False, + ignore_port=False): layer = self.proto2layer(proto) if proto == IP_PROTOS.tcp: @@ -1283,14 +1291,16 @@ class MethodHolder(VppTestCase): if proto != IP_PROTOS.icmp: if not dont_translate: self.assertEqual(p[layer].dport, 20) - self.assertNotEqual(p[layer].sport, self.port_in) + if not ignore_port: + self.assertNotEqual(p[layer].sport, self.port_in) else: self.assertEqual(p[layer].sport, self.port_in) else: - if not dont_translate: - self.assertNotEqual(p[layer].id, self.port_in) - else: - self.assertEqual(p[layer].id, self.port_in) + if not ignore_port: + if not dont_translate: + self.assertNotEqual(p[layer].id, self.port_in) + else: + self.assertEqual(p[layer].id, self.port_in) self.assertEqual(data, p[Raw].load) # out2in @@ -1379,6 +1389,19 @@ class MethodHolder(VppTestCase): class TestNAT44(MethodHolder): """ NAT44 Test Cases """ + max_translations = 10240 + max_users = 10240 + + @classmethod + def setUpConstants(cls): + super(TestNAT44, cls).setUpConstants() + cls.vpp_cmdline.extend([ + "nat", "{", + "max translations per thread %d" % cls.max_translations, + "max users per thread %d" % cls.max_users, + "}" + ]) + @classmethod def setUpClass(cls): super(TestNAT44, cls).setUpClass() @@ -1447,6 +1470,38 @@ class TestNAT44(MethodHolder): def tearDownClass(cls): super(TestNAT44, cls).tearDownClass() + def test_clear_sessions(self): + """ NAT44 session clearing test """ + + self.nat44_add_address(self.nat_addr) + flags = self.config_flags.NAT_IS_INSIDE + self.vapi.nat44_interface_add_del_feature( + sw_if_index=self.pg0.sw_if_index, + flags=flags, is_add=1) + self.vapi.nat44_interface_add_del_feature( + sw_if_index=self.pg1.sw_if_index, + is_add=1) + + nat_config = self.vapi.nat_show_config() + self.assertEqual(0, nat_config.endpoint_dependent) + + pkts = self.create_stream_in(self.pg0, self.pg1) + self.pg0.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg1.get_capture(len(pkts)) + self.verify_capture_out(capture) + + sessions = self.statistics.get_counter('/nat44/total-sessions') + self.assertTrue(sessions[0][0] > 0) + self.logger.info("sessions before clearing: %s" % sessions[0][0]) + + self.vapi.cli("clear nat44 sessions") + + sessions = self.statistics.get_counter('/nat44/total-sessions') + self.assertEqual(sessions[0][0], 0) + self.logger.info("sessions after clearing: %s" % sessions[0][0]) + def test_dynamic(self): """ NAT44 dynamic translation test """ self.nat44_add_address(self.nat_addr) @@ -1459,14 +1514,10 @@ class TestNAT44(MethodHolder): is_add=1) # in2out - tcpn = self.statistics.get_err_counter( - '/err/nat44-in2out-slowpath/TCP packets') - udpn = self.statistics.get_err_counter( - '/err/nat44-in2out-slowpath/UDP packets') - icmpn = self.statistics.get_err_counter( - '/err/nat44-in2out-slowpath/ICMP packets') - totaln = self.statistics.get_err_counter( - '/err/nat44-in2out-slowpath/good in2out packets processed') + tcpn = self.statistics.get_counter('/nat44/in2out/slowpath/tcp')[0] + udpn = self.statistics.get_counter('/nat44/in2out/slowpath/udp')[0] + icmpn = self.statistics.get_counter('/nat44/in2out/slowpath/icmp')[0] + drops = self.statistics.get_counter('/nat44/in2out/slowpath/drops')[0] pkts = self.create_stream_in(self.pg0, self.pg1) self.pg0.add_stream(pkts) @@ -1475,26 +1526,21 @@ class TestNAT44(MethodHolder): capture = self.pg1.get_capture(len(pkts)) self.verify_capture_out(capture) - err = self.statistics.get_err_counter( - '/err/nat44-in2out-slowpath/TCP packets') - self.assertEqual(err - tcpn, 2) - err = self.statistics.get_err_counter( - '/err/nat44-in2out-slowpath/UDP packets') - self.assertEqual(err - udpn, 1) - err = self.statistics.get_err_counter( - '/err/nat44-in2out-slowpath/ICMP packets') - self.assertEqual(err - icmpn, 1) - err = self.statistics.get_err_counter( - '/err/nat44-in2out-slowpath/good in2out packets processed') - self.assertEqual(err - totaln, 4) + if_idx = self.pg0.sw_if_index + cnt = self.statistics.get_counter('/nat44/in2out/slowpath/tcp')[0] + self.assertEqual(cnt[if_idx] - tcpn[if_idx], 2) + cnt = self.statistics.get_counter('/nat44/in2out/slowpath/udp')[0] + self.assertEqual(cnt[if_idx] - udpn[if_idx], 1) + cnt = self.statistics.get_counter('/nat44/in2out/slowpath/icmp')[0] + self.assertEqual(cnt[if_idx] - icmpn[if_idx], 1) + cnt = self.statistics.get_counter('/nat44/in2out/slowpath/drops')[0] + self.assertEqual(cnt[if_idx] - drops[if_idx], 0) # out2in - tcpn = self.statistics.get_err_counter('/err/nat44-out2in/TCP packets') - udpn = self.statistics.get_err_counter('/err/nat44-out2in/UDP packets') - icmpn = self.statistics.get_err_counter( - '/err/nat44-out2in/ICMP packets') - totaln = self.statistics.get_err_counter( - '/err/nat44-out2in/good out2in packets processed') + tcpn = self.statistics.get_counter('/nat44/out2in/slowpath/tcp')[0] + udpn = self.statistics.get_counter('/nat44/out2in/slowpath/udp')[0] + icmpn = self.statistics.get_counter('/nat44/out2in/slowpath/icmp')[0] + drops = self.statistics.get_counter('/nat44/out2in/slowpath/drops')[0] pkts = self.create_stream_out(self.pg1) self.pg1.add_stream(pkts) @@ -1503,15 +1549,15 @@ class TestNAT44(MethodHolder): capture = self.pg0.get_capture(len(pkts)) self.verify_capture_in(capture, self.pg0) - err = self.statistics.get_err_counter('/err/nat44-out2in/TCP packets') - self.assertEqual(err - tcpn, 2) - err = self.statistics.get_err_counter('/err/nat44-out2in/UDP packets') - self.assertEqual(err - udpn, 1) - err = self.statistics.get_err_counter('/err/nat44-out2in/ICMP packets') - self.assertEqual(err - icmpn, 1) - err = self.statistics.get_err_counter( - '/err/nat44-out2in/good out2in packets processed') - self.assertEqual(err - totaln, 4) + if_idx = self.pg1.sw_if_index + cnt = self.statistics.get_counter('/nat44/out2in/slowpath/tcp')[0] + self.assertEqual(cnt[if_idx] - tcpn[if_idx], 2) + cnt = self.statistics.get_counter('/nat44/out2in/slowpath/udp')[0] + self.assertEqual(cnt[if_idx] - udpn[if_idx], 1) + cnt = self.statistics.get_counter('/nat44/out2in/slowpath/icmp')[0] + self.assertEqual(cnt[if_idx] - icmpn[if_idx], 1) + cnt = self.statistics.get_counter('/nat44/out2in/slowpath/drops')[0] + self.assertEqual(cnt[if_idx] - drops[if_idx], 0) users = self.statistics.get_counter('/nat44/total-users') self.assertEqual(users[0][0], 1) @@ -2299,6 +2345,7 @@ class TestNAT44(MethodHolder): server_in_port, server_out_port, proto=IP_PROTOS.tcp) + cnt = self.statistics.get_counter('/nat44/hairpinning')[0] # send packet from host to server p = (Ether(src=host.mac, dst=self.pg0.local_mac) / IP(src=host.ip4, dst=self.nat_addr) / @@ -2321,6 +2368,10 @@ class TestNAT44(MethodHolder): self.logger.error(ppp("Unexpected or invalid packet:", p)) raise + after = self.statistics.get_counter('/nat44/hairpinning')[0] + if_idx = self.pg0.sw_if_index + self.assertEqual(after[if_idx] - cnt[if_idx], 1) + # send reply from server to host p = (Ether(src=server.mac, dst=self.pg0.local_mac) / IP(src=server.ip4, dst=self.nat_addr) / @@ -2342,6 +2393,10 @@ class TestNAT44(MethodHolder): self.logger.error(ppp("Unexpected or invalid packet:", p)) raise + after = self.statistics.get_counter('/nat44/hairpinning')[0] + if_idx = self.pg0.sw_if_index + self.assertEqual(after[if_idx] - cnt[if_idx], 2) + def test_hairpinning2(self): """ NAT44 hairpinning - 1:1 NAT""" @@ -2743,8 +2798,7 @@ class TestNAT44(MethodHolder): sw_if_index=self.pg1.sw_if_index, is_add=1) - nat44_config = self.vapi.nat_show_config() - max_sessions = 10 * nat44_config.translation_buckets + max_sessions = self.max_translations pkts = [] for i in range(0, max_sessions): @@ -4151,6 +4205,139 @@ class TestNAT44(MethodHolder): self.logger.info(self.vapi.cli("show nat ha")) +class TestNAT44EndpointDependent2(MethodHolder): + """ Endpoint-Dependent mapping and filtering test cases """ + + @classmethod + def setUpConstants(cls): + super(TestNAT44EndpointDependent2, cls).setUpConstants() + cls.vpp_cmdline.extend(["nat", "{", "endpoint-dependent", "}"]) + + @classmethod + def tearDownClass(cls): + super(TestNAT44EndpointDependent2, cls).tearDownClass() + + def tearDown(self): + super(TestNAT44EndpointDependent2, self).tearDown() + + @classmethod + def create_and_add_ip4_table(cls, i, table_id): + cls.vapi.ip_table_add_del(is_add=1, table={'table_id': table_id}) + i.set_table_ip4(table_id) + + @classmethod + def setUpClass(cls): + super(TestNAT44EndpointDependent2, cls).setUpClass() + + cls.create_pg_interfaces(range(3)) + cls.interfaces = list(cls.pg_interfaces) + + cls.create_and_add_ip4_table(cls.pg1, 10) + + for i in cls.interfaces: + i.admin_up() + i.config_ip4() + i.resolve_arp() + + i.generate_remote_hosts(1) + i.configure_ipv4_neighbors() + + def setUp(self): + super(TestNAT44EndpointDependent2, self).setUp() + + nat_config = self.vapi.nat_show_config() + self.assertEqual(1, nat_config.endpoint_dependent) + + def nat_add_inside_interface(self, i): + self.vapi.nat44_interface_add_del_feature( + flags=self.config_flags.NAT_IS_INSIDE, + sw_if_index=i.sw_if_index, is_add=1) + + def nat_add_outside_interface(self, i): + self.vapi.nat44_interface_add_del_feature( + flags=self.config_flags.NAT_IS_OUTSIDE, + sw_if_index=i.sw_if_index, is_add=1) + + def nat_add_interface_address(self, i): + self.nat_addr = i.local_ip4 + self.vapi.nat44_add_del_interface_addr( + sw_if_index=i.sw_if_index, is_add=1) + + def nat_add_address(self, address, vrf_id=0xFFFFFFFF): + self.nat_addr = address + self.nat44_add_address(address, vrf_id=vrf_id) + + def cli(self, command): + result = self.vapi.cli(command) + self.logger.info(result) + # print(result) + + def show_configuration(self): + self.cli("show interface") + self.cli("show interface address") + self.cli("show nat44 addresses") + self.cli("show nat44 interfaces") + + def create_tcp_stream(self, in_if, out_if, count): + """ + Create tcp packet stream + + :param in_if: Inside interface + :param out_if: Outside interface + :param count: count of packets to generate + """ + pkts = [] + port = 6303 + + for i in range(count): + p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) / + IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=64) / + TCP(sport=port + i, dport=20)) + pkts.append(p) + + return pkts + + def test_session_limit_per_vrf(self): + + inside = self.pg0 + inside_vrf10 = self.pg1 + outside = self.pg2 + + limit = 5 + + # 2 interfaces pg0, pg1 (vrf10, limit 1 tcp session) + # non existing vrf_id makes process core dump + self.vapi.nat44_set_session_limit(session_limit=limit, vrf_id=10) + + self.nat_add_inside_interface(inside) + self.nat_add_inside_interface(inside_vrf10) + self.nat_add_outside_interface(outside) + + # vrf independent + self.nat_add_interface_address(outside) + + # BUG: causing core dump - when bad vrf_id is specified + # self.nat44_add_address(outside.local_ip4, vrf_id=20) + + self.show_configuration() + + stream = self.create_tcp_stream(inside_vrf10, outside, limit * 2) + inside_vrf10.add_stream(stream) + + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + + capture = outside.get_capture(limit) + + stream = self.create_tcp_stream(inside, outside, limit * 2) + inside.add_stream(stream) + + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + + capture = outside.get_capture(len(stream)) + + class TestNAT44EndpointDependent(MethodHolder): """ Endpoint-Dependent mapping and filtering test cases """ @@ -4277,9 +4464,9 @@ class TestNAT44EndpointDependent(MethodHolder): self.vapi.nat44_interface_add_del_feature( sw_if_index=self.pg1.sw_if_index, is_add=1) - self.frag_in_order(proto=IP_PROTOS.tcp) - self.frag_in_order(proto=IP_PROTOS.udp) - self.frag_in_order(proto=IP_PROTOS.icmp) + self.frag_in_order(proto=IP_PROTOS.tcp, ignore_port=True) + self.frag_in_order(proto=IP_PROTOS.udp, ignore_port=True) + self.frag_in_order(proto=IP_PROTOS.icmp, ignore_port=True) def test_frag_in_order_dont_translate(self): """ NAT44 don't translate fragments arriving in order """ @@ -4303,9 +4490,9 @@ class TestNAT44EndpointDependent(MethodHolder): self.vapi.nat44_interface_add_del_feature( sw_if_index=self.pg1.sw_if_index, is_add=1) - self.frag_out_of_order(proto=IP_PROTOS.tcp) - self.frag_out_of_order(proto=IP_PROTOS.udp) - self.frag_out_of_order(proto=IP_PROTOS.icmp) + self.frag_out_of_order(proto=IP_PROTOS.tcp, ignore_port=True) + self.frag_out_of_order(proto=IP_PROTOS.udp, ignore_port=True) + self.frag_out_of_order(proto=IP_PROTOS.icmp, ignore_port=True) def test_frag_out_of_order_dont_translate(self): """ NAT44 don't translate fragments arriving out of order """ @@ -4433,9 +4620,46 @@ class TestNAT44EndpointDependent(MethodHolder): proto=IP_PROTOS.udp) self.nat44_add_static_mapping(self.server.ip4, self.nat_addr) - self.reass_hairpinning(proto=IP_PROTOS.tcp) - self.reass_hairpinning(proto=IP_PROTOS.udp) - self.reass_hairpinning(proto=IP_PROTOS.icmp) + self.reass_hairpinning(proto=IP_PROTOS.tcp, ignore_port=True) + self.reass_hairpinning(proto=IP_PROTOS.udp, ignore_port=True) + self.reass_hairpinning(proto=IP_PROTOS.icmp, ignore_port=True) + + def test_clear_sessions(self): + """ NAT44 ED session clearing test """ + + self.nat44_add_address(self.nat_addr) + flags = self.config_flags.NAT_IS_INSIDE + self.vapi.nat44_interface_add_del_feature( + sw_if_index=self.pg0.sw_if_index, + flags=flags, is_add=1) + self.vapi.nat44_interface_add_del_feature( + sw_if_index=self.pg1.sw_if_index, + is_add=1) + + nat_config = self.vapi.nat_show_config() + self.assertEqual(1, nat_config.endpoint_dependent) + + pkts = self.create_stream_in(self.pg0, self.pg1) + self.pg0.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg1.get_capture(len(pkts)) + self.verify_capture_out(capture, ignore_port=True) + + sessions = self.statistics.get_counter('/nat44/total-sessions') + self.assertTrue(sessions[0][0] > 0) + self.logger.info("sessions before clearing: %s" % sessions[0][0]) + + # just for testing purposes + self.logger.info(self.vapi.cli("show nat44 summary")) + + self.vapi.cli("clear nat44 sessions") + + self.logger.info(self.vapi.cli("show nat44 summary")) + + sessions = self.statistics.get_counter('/nat44/total-sessions') + self.assertEqual(sessions[0][0], 0) + self.logger.info("sessions after clearing: %s" % sessions[0][0]) def test_dynamic(self): """ NAT44 dynamic translation test """ @@ -4453,44 +4677,37 @@ class TestNAT44EndpointDependent(MethodHolder): self.assertEqual(1, nat_config.endpoint_dependent) # in2out - tcpn = self.statistics.get_err_counter( - '/err/nat44-ed-in2out-slowpath/TCP packets') - udpn = self.statistics.get_err_counter( - '/err/nat44-ed-in2out-slowpath/UDP packets') - icmpn = self.statistics.get_err_counter( - '/err/nat44-ed-in2out-slowpath/ICMP packets') - totaln = self.statistics.get_err_counter( - '/err/nat44-ed-in2out-slowpath/good in2out packets processed') + tcpn = self.statistics.get_counter('/nat44/ed/in2out/slowpath/tcp')[0] + udpn = self.statistics.get_counter('/nat44/ed/in2out/slowpath/udp')[0] + icmpn = self.statistics.get_counter( + '/nat44/ed/in2out/slowpath/icmp')[0] + drops = self.statistics.get_counter( + '/nat44/ed/in2out/slowpath/drops')[0] pkts = self.create_stream_in(self.pg0, self.pg1) self.pg0.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() capture = self.pg1.get_capture(len(pkts)) - self.verify_capture_out(capture) - - err = self.statistics.get_err_counter( - '/err/nat44-ed-in2out-slowpath/TCP packets') - self.assertEqual(err - tcpn, 2) - err = self.statistics.get_err_counter( - '/err/nat44-ed-in2out-slowpath/UDP packets') - self.assertEqual(err - udpn, 1) - err = self.statistics.get_err_counter( - '/err/nat44-ed-in2out-slowpath/ICMP packets') - self.assertEqual(err - icmpn, 1) - err = self.statistics.get_err_counter( - '/err/nat44-ed-in2out-slowpath/good in2out packets processed') - self.assertEqual(err - totaln, 4) + self.verify_capture_out(capture, ignore_port=True) + + if_idx = self.pg0.sw_if_index + cnt = self.statistics.get_counter('/nat44/ed/in2out/slowpath/tcp')[0] + self.assertEqual(cnt[if_idx] - tcpn[if_idx], 2) + cnt = self.statistics.get_counter('/nat44/ed/in2out/slowpath/udp')[0] + self.assertEqual(cnt[if_idx] - udpn[if_idx], 1) + cnt = self.statistics.get_counter('/nat44/ed/in2out/slowpath/icmp')[0] + self.assertEqual(cnt[if_idx] - icmpn[if_idx], 1) + cnt = self.statistics.get_counter('/nat44/ed/in2out/slowpath/drops')[0] + self.assertEqual(cnt[if_idx] - drops[if_idx], 0) # out2in - tcpn = self.statistics.get_err_counter( - '/err/nat44-ed-out2in/TCP packets') - udpn = self.statistics.get_err_counter( - '/err/nat44-ed-out2in/UDP packets') - icmpn = self.statistics.get_err_counter( - '/err/nat44-ed-out2in-slowpath/ICMP packets') - totaln = self.statistics.get_err_counter( - '/err/nat44-ed-out2in/good out2in packets processed') + tcpn = self.statistics.get_counter('/nat44/ed/out2in/fastpath/tcp')[0] + udpn = self.statistics.get_counter('/nat44/ed/out2in/fastpath/udp')[0] + icmpn = self.statistics.get_counter( + '/nat44/ed/out2in/slowpath/icmp')[0] + drops = self.statistics.get_counter( + '/nat44/ed/out2in/fastpath/drops')[0] pkts = self.create_stream_out(self.pg1) self.pg1.add_stream(pkts) @@ -4499,22 +4716,66 @@ class TestNAT44EndpointDependent(MethodHolder): capture = self.pg0.get_capture(len(pkts)) self.verify_capture_in(capture, self.pg0) - err = self.statistics.get_err_counter( - '/err/nat44-ed-out2in/TCP packets') - self.assertEqual(err - tcpn, 2) - err = self.statistics.get_err_counter( - '/err/nat44-ed-out2in/UDP packets') - self.assertEqual(err - udpn, 1) - err = self.statistics.get_err_counter( - '/err/nat44-ed-out2in-slowpath/ICMP packets') - self.assertEqual(err - icmpn, 1) - err = self.statistics.get_err_counter( - '/err/nat44-ed-out2in/good out2in packets processed') - self.assertEqual(err - totaln, 3) + if_idx = self.pg1.sw_if_index + cnt = self.statistics.get_counter('/nat44/ed/out2in/fastpath/tcp')[0] + self.assertEqual(cnt[if_idx] - tcpn[if_idx], 2) + cnt = self.statistics.get_counter('/nat44/ed/out2in/fastpath/udp')[0] + self.assertEqual(cnt[if_idx] - udpn[if_idx], 1) + cnt = self.statistics.get_counter('/nat44/ed/out2in/slowpath/icmp')[0] + self.assertEqual(cnt[if_idx] - icmpn[if_idx], 1) + cnt = self.statistics.get_counter('/nat44/ed/out2in/fastpath/drops')[0] + self.assertEqual(cnt[if_idx] - drops[if_idx], 0) sessions = self.statistics.get_counter('/nat44/total-sessions') self.assertEqual(sessions[0][0], 3) + def test_dynamic_out_of_ports(self): + """ NAT44 dynamic translation test: out of ports """ + + flags = self.config_flags.NAT_IS_INSIDE + self.vapi.nat44_interface_add_del_feature( + sw_if_index=self.pg0.sw_if_index, + flags=flags, is_add=1) + self.vapi.nat44_interface_add_del_feature( + sw_if_index=self.pg1.sw_if_index, + is_add=1) + + nat_config = self.vapi.nat_show_config() + self.assertEqual(1, nat_config.endpoint_dependent) + + # in2out and no NAT addresses added + err_old = self.statistics.get_err_counter( + '/err/nat44-ed-in2out-slowpath/out of ports') + + pkts = self.create_stream_in(self.pg0, self.pg1) + self.pg0.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + self.pg1.get_capture(0, timeout=1) + + err_new = self.statistics.get_err_counter( + '/err/nat44-ed-in2out-slowpath/out of ports') + + self.assertEqual(err_new - err_old, len(pkts)) + + # in2out after NAT addresses added + self.nat44_add_address(self.nat_addr) + + err_old = self.statistics.get_err_counter( + '/err/nat44-ed-in2out-slowpath/out of ports') + + pkts = self.create_stream_in(self.pg0, self.pg1) + self.pg0.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg1.get_capture(len(pkts)) + self.verify_capture_out(capture, ignore_port=True) + + err_new = self.statistics.get_err_counter( + '/err/nat44-ed-in2out-slowpath/out of ports') + + self.assertEqual(err_new, err_old) + def test_dynamic_output_feature_vrf(self): """ NAT44 dynamic translation test: output-feature, VRF""" @@ -4548,44 +4809,45 @@ class TestNAT44EndpointDependent(MethodHolder): self.assertEqual(1, nat_config.endpoint_dependent) # in2out - tcpn = self.statistics.get_err_counter( - '/err/nat44-ed-in2out-slowpath/TCP packets') - udpn = self.statistics.get_err_counter( - '/err/nat44-ed-in2out-slowpath/UDP packets') - icmpn = self.statistics.get_err_counter( - '/err/nat44-ed-in2out-slowpath/ICMP packets') - totaln = self.statistics.get_err_counter( - '/err/nat44-ed-in2out-slowpath/good in2out packets processed') + tcpn = self.statistics.get_counter( + '/nat44/ed/in2out/slowpath/tcp')[0] + udpn = self.statistics.get_counter( + '/nat44/ed/in2out/slowpath/udp')[0] + icmpn = self.statistics.get_counter( + '/nat44/ed/in2out/slowpath/icmp')[0] + drops = self.statistics.get_counter( + '/nat44/ed/in2out/slowpath/drops')[0] pkts = self.create_stream_in(self.pg7, self.pg8) self.pg7.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() capture = self.pg8.get_capture(len(pkts)) - self.verify_capture_out(capture) - - err = self.statistics.get_err_counter( - '/err/nat44-ed-in2out-slowpath/TCP packets') - self.assertEqual(err - tcpn, 2) - err = self.statistics.get_err_counter( - '/err/nat44-ed-in2out-slowpath/UDP packets') - self.assertEqual(err - udpn, 1) - err = self.statistics.get_err_counter( - '/err/nat44-ed-in2out-slowpath/ICMP packets') - self.assertEqual(err - icmpn, 1) - err = self.statistics.get_err_counter( - '/err/nat44-ed-in2out-slowpath/good in2out packets processed') - self.assertEqual(err - totaln, 4) + self.verify_capture_out(capture, ignore_port=True) + + if_idx = self.pg7.sw_if_index + cnt = self.statistics.get_counter( + '/nat44/ed/in2out/slowpath/tcp')[0] + self.assertEqual(cnt[if_idx] - tcpn[if_idx], 2) + cnt = self.statistics.get_counter( + '/nat44/ed/in2out/slowpath/udp')[0] + self.assertEqual(cnt[if_idx] - udpn[if_idx], 1) + cnt = self.statistics.get_counter( + '/nat44/ed/in2out/slowpath/icmp')[0] + self.assertEqual(cnt[if_idx] - icmpn[if_idx], 1) + cnt = self.statistics.get_counter( + '/nat44/ed/in2out/slowpath/drops')[0] + self.assertEqual(cnt[if_idx] - drops[if_idx], 0) # out2in - tcpn = self.statistics.get_err_counter( - '/err/nat44-ed-out2in/TCP packets') - udpn = self.statistics.get_err_counter( - '/err/nat44-ed-out2in/UDP packets') - icmpn = self.statistics.get_err_counter( - '/err/nat44-ed-out2in-slowpath/ICMP packets') - totaln = self.statistics.get_err_counter( - '/err/nat44-ed-out2in/good out2in packets processed') + tcpn = self.statistics.get_counter( + '/nat44/ed/out2in/fastpath/tcp')[0] + udpn = self.statistics.get_counter( + '/nat44/ed/out2in/fastpath/udp')[0] + icmpn = self.statistics.get_counter( + '/nat44/ed/out2in/slowpath/icmp')[0] + drops = self.statistics.get_counter( + '/nat44/ed/out2in/fastpath/drops')[0] pkts = self.create_stream_out(self.pg8) self.pg8.add_stream(pkts) @@ -4594,18 +4856,19 @@ class TestNAT44EndpointDependent(MethodHolder): capture = self.pg7.get_capture(len(pkts)) self.verify_capture_in(capture, self.pg7) - err = self.statistics.get_err_counter( - '/err/nat44-ed-out2in/TCP packets') - self.assertEqual(err - tcpn, 2) - err = self.statistics.get_err_counter( - '/err/nat44-ed-out2in/UDP packets') - self.assertEqual(err - udpn, 1) - err = self.statistics.get_err_counter( - '/err/nat44-ed-out2in-slowpath/ICMP packets') - self.assertEqual(err - icmpn, 1) - err = self.statistics.get_err_counter( - '/err/nat44-ed-out2in/good out2in packets processed') - self.assertEqual(err - totaln, 3) + if_idx = self.pg8.sw_if_index + cnt = self.statistics.get_counter( + '/nat44/ed/out2in/fastpath/tcp')[0] + self.assertEqual(cnt[if_idx] - tcpn[if_idx], 2) + cnt = self.statistics.get_counter( + '/nat44/ed/out2in/fastpath/udp')[0] + self.assertEqual(cnt[if_idx] - udpn[if_idx], 1) + cnt = self.statistics.get_counter( + '/nat44/ed/out2in/slowpath/icmp')[0] + self.assertEqual(cnt[if_idx] - icmpn[if_idx], 1) + cnt = self.statistics.get_counter( + '/nat44/ed/out2in/fastpath/drops')[0] + self.assertEqual(cnt[if_idx] - drops[if_idx], 0) sessions = self.statistics.get_counter('/nat44/total-sessions') self.assertEqual(sessions[0][0], 3) @@ -5311,13 +5574,13 @@ class TestNAT44EndpointDependent(MethodHolder): self.pg_enable_capture(self.pg_interfaces) self.pg_start() capture = self.pg1.get_capture(len(pkts)) - self.verify_capture_out(capture) + self.verify_capture_out(capture, ignore_port=True) pkts = self.create_stream_in(self.pg0, self.pg1) self.pg0.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() capture = self.pg1.get_capture(len(pkts)) - self.verify_capture_out(capture) + self.verify_capture_out(capture, ignore_port=True) # from external network back to local network host pkts = self.create_stream_out(self.pg1) @@ -5341,7 +5604,7 @@ class TestNAT44EndpointDependent(MethodHolder): self.pg_enable_capture(self.pg_interfaces) self.pg_start() capture = self.pg1.get_capture(len(pkts)) - self.verify_capture_out(capture) + self.verify_capture_out(capture, ignore_port=True) pkts = self.create_stream_out(self.pg1) self.pg1.add_stream(pkts) @@ -6351,7 +6614,7 @@ class TestNAT44EndpointDependent(MethodHolder): self.pg_enable_capture(self.pg_interfaces) self.pg_start() capture = self.pg1.get_capture(len(pkts)) - self.verify_capture_out(capture) + self.verify_capture_out(capture, ignore_port=True) # out2in pkts = self.create_stream_out(self.pg1) @@ -6383,7 +6646,7 @@ class TestNAT44EndpointDependent(MethodHolder): pkts_in2out = self.create_stream_in(self.pg0, self.pg1) capture = self.send_and_expect(self.pg0, pkts_in2out, self.pg1, len(pkts_in2out)) - self.verify_capture_out(capture) + self.verify_capture_out(capture, ignore_port=True) # send out2in again, with sessions created it should work now pkts_out2in = self.create_stream_out(self.pg1) @@ -6413,7 +6676,7 @@ class TestNAT44EndpointDependent(MethodHolder): # send in2out to generate ACL state (NAT state was created earlier) capture = self.send_and_expect(self.pg0, pkts_in2out, self.pg1, len(pkts_in2out)) - self.verify_capture_out(capture) + self.verify_capture_out(capture, ignore_port=True) # send out2in again. ACL state exists so it should work now. # TCP packets with the syn flag set also need the ack flag @@ -6518,7 +6781,6 @@ class TestNAT44EndpointDependent(MethodHolder): ip = p[IP] tcp = p[TCP] self.assertEqual(ip.src, self.nat_addr) - self.assertNotEqual(tcp.sport, 2345) self.assert_packet_checksums_valid(p) port = tcp.sport except: @@ -6846,768 +7108,358 @@ class TestNAT44EndpointDependent(MethodHolder): capture = self.pg2.get_capture(1) self.verify_syslog_sess(capture[0][Raw].load, False) - def tearDown(self): - super(TestNAT44EndpointDependent, self).tearDown() - if not self.vpp_dead: - self.clear_nat44() - self.vapi.cli("clear logging") + def test_ed_users_dump(self): + """ API test - nat44_user_dump """ + flags = self.config_flags.NAT_IS_INSIDE + self.vapi.nat44_interface_add_del_feature( + sw_if_index=self.pg0.sw_if_index, + flags=flags, is_add=1) + self.vapi.nat44_interface_add_del_feature( + sw_if_index=self.pg1.sw_if_index, + is_add=1) + self.vapi.nat44_forwarding_enable_disable(enable=1) - def show_commands_at_teardown(self): - self.logger.info(self.vapi.cli("show nat44 addresses")) - self.logger.info(self.vapi.cli("show nat44 interfaces")) - self.logger.info(self.vapi.cli("show nat44 static mappings")) - self.logger.info(self.vapi.cli("show nat44 interface address")) - self.logger.info(self.vapi.cli("show nat44 sessions detail")) - self.logger.info(self.vapi.cli("show nat44 hash tables detail")) - self.logger.info(self.vapi.cli("show nat timeouts")) - - -class TestNAT44Out2InDPO(MethodHolder): - """ NAT44 Test Cases using out2in DPO """ - - @classmethod - def setUpConstants(cls): - super(TestNAT44Out2InDPO, cls).setUpConstants() - cls.vpp_cmdline.extend(["nat", "{", "out2in dpo", "}"]) - - @classmethod - def setUpClass(cls): - super(TestNAT44Out2InDPO, cls).setUpClass() - cls.vapi.cli("set log class nat level debug") - - cls.tcp_port_in = 6303 - cls.tcp_port_out = 6303 - cls.udp_port_in = 6304 - cls.udp_port_out = 6304 - cls.icmp_id_in = 6305 - cls.icmp_id_out = 6305 - cls.nat_addr = '10.0.0.3' - cls.dst_ip4 = '192.168.70.1' - - cls.create_pg_interfaces(range(2)) - - cls.pg0.admin_up() - cls.pg0.config_ip4() - cls.pg0.resolve_arp() - - cls.pg1.admin_up() - cls.pg1.config_ip6() - cls.pg1.resolve_ndp() - - r1 = VppIpRoute(cls, "::", 0, - [VppRoutePath(cls.pg1.remote_ip6, - cls.pg1.sw_if_index)], - register=False) - r1.add_vpp_config() + real_ip = self.pg0.remote_ip4 + alias_ip = self.nat_addr + flags = self.config_flags.NAT_IS_ADDR_ONLY + self.vapi.nat44_add_del_static_mapping(is_add=1, + local_ip_address=real_ip, + external_ip_address=alias_ip, + external_sw_if_index=0xFFFFFFFF, + flags=flags) - @classmethod - def tearDownClass(cls): - super(TestNAT44Out2InDPO, cls).tearDownClass() + users = self.vapi.nat44_user_dump() + self.assertEqual(len(users), 0) + try: + # in2out - static mapping match - def configure_xlat(self): - self.dst_ip6_pfx = '1:2:3::' - self.dst_ip6_pfx_n = socket.inet_pton(socket.AF_INET6, - self.dst_ip6_pfx) - self.dst_ip6_pfx_len = 96 - self.src_ip6_pfx = '4:5:6::' - self.src_ip6_pfx_n = socket.inet_pton(socket.AF_INET6, - self.src_ip6_pfx) - self.src_ip6_pfx_len = 96 - self.vapi.map_add_domain(self.dst_ip6_pfx_n, self.dst_ip6_pfx_len, - self.src_ip6_pfx_n, self.src_ip6_pfx_len, - '\x00\x00\x00\x00', 0) + pkts = self.create_stream_out(self.pg1) + self.pg1.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg0.get_capture(len(pkts)) + self.verify_capture_in(capture, self.pg0) - @unittest.skip('Temporary disabled') - def test_464xlat_ce(self): - """ Test 464XLAT CE with NAT44 """ + pkts = self.create_stream_in(self.pg0, self.pg1) + self.pg0.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg1.get_capture(len(pkts)) + self.verify_capture_out(capture, same_port=True) - nat_config = self.vapi.nat_show_config() - self.assertEqual(1, nat_config.out2in_dpo) + users = self.vapi.nat44_user_dump() + self.assertEqual(len(users), 1) + static_user = users[0] + self.assertEqual(static_user.nstaticsessions, 3) + self.assertEqual(static_user.nsessions, 0) - self.configure_xlat() + # in2out - no static mapping match - flags = self.config_flags.NAT_IS_INSIDE - self.vapi.nat44_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) - self.vapi.nat44_add_del_address_range(first_ip_address=self.nat_addr_n, - last_ip_address=self.nat_addr_n, - vrf_id=0xFFFFFFFF, is_add=1) + host0 = self.pg0.remote_hosts[0] + self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1] + try: + pkts = self.create_stream_out(self.pg1, + dst_ip=self.pg0.remote_ip4, + use_inside_ports=True) + self.pg1.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg0.get_capture(len(pkts)) + self.verify_capture_in(capture, self.pg0) - out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx, - self.dst_ip6_pfx_len) - out_dst_ip6 = self.compose_ip6(self.nat_addr, self.src_ip6_pfx, - self.src_ip6_pfx_len) + pkts = self.create_stream_in(self.pg0, self.pg1) + self.pg0.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg1.get_capture(len(pkts)) + self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4, + same_port=True) + finally: + self.pg0.remote_hosts[0] = host0 - try: - pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4) - self.pg0.add_stream(pkts) - self.pg_enable_capture(self.pg_interfaces) - self.pg_start() - capture = self.pg1.get_capture(len(pkts)) - self.verify_capture_out_ip6(capture, nat_ip=out_dst_ip6, - dst_ip=out_src_ip6) + users = self.vapi.nat44_user_dump() + self.assertEqual(len(users), 2) + if str(users[0].ip_address) == self.pg0.remote_hosts[0].ip4: + non_static_user = users[1] + static_user = users[0] + else: + non_static_user = users[0] + static_user = users[1] + self.assertEqual(static_user.nstaticsessions, 3) + self.assertEqual(static_user.nsessions, 0) + self.assertEqual(non_static_user.nstaticsessions, 0) + self.assertEqual(non_static_user.nsessions, 3) + + users = self.vapi.nat44_user_dump() + self.assertEqual(len(users), 2) + if str(users[0].ip_address) == self.pg0.remote_hosts[0].ip4: + non_static_user = users[1] + static_user = users[0] + else: + non_static_user = users[0] + static_user = users[1] + self.assertEqual(static_user.nstaticsessions, 3) + self.assertEqual(static_user.nsessions, 0) + self.assertEqual(non_static_user.nstaticsessions, 0) + self.assertEqual(non_static_user.nsessions, 3) - pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6, - out_dst_ip6) - self.pg1.add_stream(pkts) - self.pg_enable_capture(self.pg_interfaces) - self.pg_start() - capture = self.pg0.get_capture(len(pkts)) - self.verify_capture_in(capture, self.pg0) finally: - self.vapi.nat44_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, + self.vapi.nat44_forwarding_enable_disable(enable=0) + flags = self.config_flags.NAT_IS_ADDR_ONLY + self.vapi.nat44_add_del_static_mapping( + is_add=0, + local_ip_address=real_ip, + external_ip_address=alias_ip, + external_sw_if_index=0xFFFFFFFF, flags=flags) - self.vapi.nat44_add_del_address_range( - first_ip_address=self.nat_addr_n, - last_ip_address=self.nat_addr_n, - vrf_id=0xFFFFFFFF) - - @unittest.skip('Temporary disabled') - def test_464xlat_ce_no_nat(self): - """ Test 464XLAT CE without NAT44 """ - - self.configure_xlat() - out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx, - self.dst_ip6_pfx_len) - out_dst_ip6 = self.compose_ip6(self.pg0.remote_ip4, self.src_ip6_pfx, - self.src_ip6_pfx_len) + def tearDown(self): + super(TestNAT44EndpointDependent, self).tearDown() + if not self.vpp_dead: + self.clear_nat44() + self.vapi.cli("clear logging") - pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4) - self.pg0.add_stream(pkts) - self.pg_enable_capture(self.pg_interfaces) - self.pg_start() - capture = self.pg1.get_capture(len(pkts)) - self.verify_capture_out_ip6(capture, dst_ip=out_src_ip6, - nat_ip=out_dst_ip6, same_port=True) + def show_commands_at_teardown(self): + self.logger.info(self.vapi.cli("show nat44 addresses")) + self.logger.info(self.vapi.cli("show nat44 interfaces")) + self.logger.info(self.vapi.cli("show nat44 static mappings")) + self.logger.info(self.vapi.cli("show nat44 interface address")) + self.logger.info(self.vapi.cli("show nat44 sessions detail")) + self.logger.info(self.vapi.cli("show nat44 hash tables detail")) + self.logger.info(self.vapi.cli("show nat timeouts")) - pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6, out_dst_ip6) - self.pg1.add_stream(pkts) - self.pg_enable_capture(self.pg_interfaces) - self.pg_start() - capture = self.pg0.get_capture(len(pkts)) - self.verify_capture_in(capture, self.pg0) +class TestNAT44EndpointDependent3(MethodHolder): + """ Endpoint-Dependent mapping and filtering extra test cases """ -class TestDeterministicNAT(MethodHolder): - """ Deterministic NAT Test Cases """ + max_translations = 50 @classmethod def setUpConstants(cls): - super(TestDeterministicNAT, cls).setUpConstants() - cls.vpp_cmdline.extend(["nat", "{", "deterministic", "}"]) + super(TestNAT44EndpointDependent3, cls).setUpConstants() + cls.vpp_cmdline.extend([ + "nat", "{", "endpoint-dependent", + "max translations per thread %d" % cls.max_translations, + "}" + ]) @classmethod def setUpClass(cls): - super(TestDeterministicNAT, cls).setUpClass() + super(TestNAT44EndpointDependent3, cls).setUpClass() cls.vapi.cli("set log class nat level debug") - cls.tcp_port_in = 6303 - cls.tcp_external_port = 6303 - cls.udp_port_in = 6304 - cls.udp_external_port = 6304 - cls.icmp_id_in = 6305 cls.nat_addr = '10.0.0.3' - cls.create_pg_interfaces(range(3)) - cls.interfaces = list(cls.pg_interfaces) + cls.create_pg_interfaces(range(2)) - for i in cls.interfaces: + for i in cls.pg_interfaces: i.admin_up() i.config_ip4() i.resolve_arp() - cls.pg0.generate_remote_hosts(2) - cls.pg0.configure_ipv4_neighbors() - - @classmethod - def tearDownClass(cls): - super(TestDeterministicNAT, cls).tearDownClass() - - def create_stream_in(self, in_if, out_if, ttl=64): - """ - Create packet stream for inside network - - :param in_if: Inside interface - :param out_if: Outside interface - :param ttl: TTL of generated packets - """ - pkts = [] - # TCP - p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) / - IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) / - TCP(sport=self.tcp_port_in, dport=self.tcp_external_port)) - pkts.append(p) - - # UDP - p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) / - IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) / - UDP(sport=self.udp_port_in, dport=self.udp_external_port)) - pkts.append(p) - - # ICMP - p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) / - IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) / - ICMP(id=self.icmp_id_in, type='echo-request')) - pkts.append(p) - - return pkts - - def create_stream_out(self, out_if, dst_ip=None, ttl=64): - """ - Create packet stream for outside network - - :param out_if: Outside interface - :param dst_ip: Destination IP address (Default use global NAT address) - :param ttl: TTL of generated packets - """ - if dst_ip is None: - dst_ip = self.nat_addr - pkts = [] - # TCP - p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) / - IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) / - TCP(dport=self.tcp_port_out, sport=self.tcp_external_port)) - pkts.append(p) - - # UDP - p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) / - IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) / - UDP(dport=self.udp_port_out, sport=self.udp_external_port)) - pkts.append(p) - - # ICMP - p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) / - IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) / - ICMP(id=self.icmp_external_id, type='echo-reply')) - pkts.append(p) - - return pkts - - def verify_capture_out(self, capture, nat_ip=None): - """ - Verify captured packets on outside network - - :param capture: Captured packets - :param nat_ip: Translated IP address (Default use global NAT address) - :param same_port: Source port number is not translated (Default False) - """ - if nat_ip is None: - nat_ip = self.nat_addr - for packet in capture: - try: - self.assertEqual(packet[IP].src, nat_ip) - if packet.haslayer(TCP): - self.tcp_port_out = packet[TCP].sport - elif packet.haslayer(UDP): - self.udp_port_out = packet[UDP].sport - else: - self.icmp_external_id = packet[ICMP].id - except: - self.logger.error(ppp("Unexpected or invalid packet " - "(outside network):", packet)) - raise - - def test_deterministic_mode(self): - """ NAT plugin run deterministic mode """ - in_addr = '172.16.255.0' - out_addr = '172.17.255.50' - in_addr_t = '172.16.255.20' - in_plen = 24 - out_plen = 32 - - nat_config = self.vapi.nat_show_config() - self.assertEqual(1, nat_config.deterministic) - - self.vapi.nat_det_add_del_map(is_add=1, in_addr=in_addr, - in_plen=in_plen, out_addr=out_addr, - out_plen=out_plen) - - rep1 = self.vapi.nat_det_forward(in_addr_t) - self.assertEqual(str(rep1.out_addr), out_addr) - rep2 = self.vapi.nat_det_reverse(rep1.out_port_hi, out_addr) - - self.assertEqual(str(rep2.in_addr), in_addr_t) - - deterministic_mappings = self.vapi.nat_det_map_dump() - self.assertEqual(len(deterministic_mappings), 1) - dsm = deterministic_mappings[0] - self.assertEqual(in_addr, str(dsm.in_addr)) - self.assertEqual(in_plen, dsm.in_plen) - self.assertEqual(out_addr, str(dsm.out_addr)) - self.assertEqual(out_plen, dsm.out_plen) - - self.clear_nat_det() - deterministic_mappings = self.vapi.nat_det_map_dump() - self.assertEqual(len(deterministic_mappings), 0) - - def test_set_timeouts(self): - """ Set deterministic NAT timeouts """ - timeouts_before = self.vapi.nat_get_timeouts() - + def setUp(self): + super(TestNAT44EndpointDependent3, self).setUp() self.vapi.nat_set_timeouts( - udp=timeouts_before.udp + 10, - tcp_established=timeouts_before.tcp_established + 10, - tcp_transitory=timeouts_before.tcp_transitory + 10, - icmp=timeouts_before.icmp + 10) - - timeouts_after = self.vapi.nat_get_timeouts() - - self.assertNotEqual(timeouts_before.udp, timeouts_after.udp) - self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp) - self.assertNotEqual(timeouts_before.tcp_established, - timeouts_after.tcp_established) - self.assertNotEqual(timeouts_before.tcp_transitory, - timeouts_after.tcp_transitory) - - def test_det_in(self): - """ Deterministic NAT translation test (TCP, UDP, ICMP) """ - - nat_ip = "10.0.0.10" - - self.vapi.nat_det_add_del_map(is_add=1, in_addr=self.pg0.remote_ip4, - in_plen=32, - out_addr=socket.inet_aton(nat_ip), - out_plen=32) - + udp=1, tcp_established=7440, tcp_transitory=30, icmp=1) + self.nat44_add_address(self.nat_addr) flags = self.config_flags.NAT_IS_INSIDE self.vapi.nat44_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1) self.vapi.nat44_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) + sw_if_index=self.pg1.sw_if_index, is_add=1) - # in2out - pkts = self.create_stream_in(self.pg0, self.pg1) - self.pg0.add_stream(pkts) + @classmethod + def tearDownClass(cls): + super(TestNAT44EndpointDependent3, cls).tearDownClass() + + def init_tcp_session(self, in_if, out_if, sport, ext_dport): + # SYN packet in->out + p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) / + IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) / + TCP(sport=sport, dport=ext_dport, flags="S")) + in_if.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() - capture = self.pg1.get_capture(len(pkts)) - self.verify_capture_out(capture, nat_ip) + capture = out_if.get_capture(1) + p = capture[0] + tcp_port_out = p[TCP].sport - # out2in - pkts = self.create_stream_out(self.pg1, nat_ip) - self.pg1.add_stream(pkts) + # SYN + ACK packet out->in + p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) / + IP(src=out_if.remote_ip4, dst=self.nat_addr) / + TCP(sport=ext_dport, dport=tcp_port_out, flags="SA")) + out_if.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() - capture = self.pg0.get_capture(len(pkts)) - self.verify_capture_in(capture, self.pg0) + in_if.get_capture(1) - # session dump test - sessions = self.vapi.nat_det_session_dump(self.pg0.remote_ip4) - self.assertEqual(len(sessions), 3) - - # TCP session - s = sessions[0] - self.assertEqual(str(s.ext_addr), self.pg1.remote_ip4) - self.assertEqual(s.in_port, self.tcp_port_in) - self.assertEqual(s.out_port, self.tcp_port_out) - self.assertEqual(s.ext_port, self.tcp_external_port) - - # UDP session - s = sessions[1] - self.assertEqual(str(s.ext_addr), self.pg1.remote_ip4) - self.assertEqual(s.in_port, self.udp_port_in) - self.assertEqual(s.out_port, self.udp_port_out) - self.assertEqual(s.ext_port, self.udp_external_port) - - # ICMP session - s = sessions[2] - self.assertEqual(str(s.ext_addr), self.pg1.remote_ip4) - self.assertEqual(s.in_port, self.icmp_id_in) - self.assertEqual(s.out_port, self.icmp_external_id) - - def test_multiple_users(self): - """ Deterministic NAT multiple users """ - - nat_ip = "10.0.0.10" - port_in = 80 - external_port = 6303 + # ACK packet in->out + p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) / + IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) / + TCP(sport=sport, dport=ext_dport, flags="A")) + in_if.add_stream(p) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + out_if.get_capture(1) - host0 = self.pg0.remote_hosts[0] - host1 = self.pg0.remote_hosts[1] + return tcp_port_out - self.vapi.nat_det_add_del_map(is_add=1, in_addr=host0.ip4, in_plen=24, - out_addr=socket.inet_aton(nat_ip), - out_plen=32) - flags = self.config_flags.NAT_IS_INSIDE - self.vapi.nat44_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) - self.vapi.nat44_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) + def test_lru_cleanup(self): + """ LRU cleanup algorithm """ + tcp_port_out = self.init_tcp_session(self.pg0, self.pg1, 2000, 80) + pkts = [] + for i in range(0, self.max_translations - 1): + p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / + IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) / + UDP(sport=7000+i, dport=80)) + pkts.append(p) - # host0 to out - p = (Ether(src=host0.mac, dst=self.pg0.local_mac) / - IP(src=host0.ip4, dst=self.pg1.remote_ip4) / - TCP(sport=port_in, dport=external_port)) - self.pg0.add_stream(p) + self.pg0.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() - capture = self.pg1.get_capture(1) - p = capture[0] - try: - ip = p[IP] - tcp = p[TCP] - self.assertEqual(ip.src, nat_ip) - self.assertEqual(ip.dst, self.pg1.remote_ip4) - self.assertEqual(tcp.dport, external_port) - port_out0 = tcp.sport - except: - self.logger.error(ppp("Unexpected or invalid packet:", p)) - raise + self.pg1.get_capture(len(pkts)) + self.sleep(1.5, "wait for timeouts") - # host1 to out - p = (Ether(src=host1.mac, dst=self.pg0.local_mac) / - IP(src=host1.ip4, dst=self.pg1.remote_ip4) / - TCP(sport=port_in, dport=external_port)) - self.pg0.add_stream(p) + pkts = [] + for i in range(0, self.max_translations - 1): + p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / + IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) / + ICMP(id=8000+i, type='echo-request')) + pkts.append(p) + + self.pg0.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() - capture = self.pg1.get_capture(1) - p = capture[0] - try: - ip = p[IP] - tcp = p[TCP] - self.assertEqual(ip.src, nat_ip) - self.assertEqual(ip.dst, self.pg1.remote_ip4) - self.assertEqual(tcp.dport, external_port) - port_out1 = tcp.sport - except: - self.logger.error(ppp("Unexpected or invalid packet:", p)) - raise + self.pg1.get_capture(len(pkts)) - dms = self.vapi.nat_det_map_dump() - self.assertEqual(1, len(dms)) - self.assertEqual(2, dms[0].ses_num) - # out to host0 - p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / - IP(src=self.pg1.remote_ip4, dst=nat_ip) / - TCP(sport=external_port, dport=port_out0)) - self.pg1.add_stream(p) - self.pg_enable_capture(self.pg_interfaces) - self.pg_start() - capture = self.pg0.get_capture(1) - p = capture[0] - try: - ip = p[IP] - tcp = p[TCP] - self.assertEqual(ip.src, self.pg1.remote_ip4) - self.assertEqual(ip.dst, host0.ip4) - self.assertEqual(tcp.dport, port_in) - self.assertEqual(tcp.sport, external_port) - except: - self.logger.error(ppp("Unexpected or invalid packet:", p)) - raise +class TestNAT44Out2InDPO(MethodHolder): + """ NAT44 Test Cases using out2in DPO """ - # out to host1 - p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / - IP(src=self.pg1.remote_ip4, dst=nat_ip) / - TCP(sport=external_port, dport=port_out1)) - self.pg1.add_stream(p) - self.pg_enable_capture(self.pg_interfaces) - self.pg_start() - capture = self.pg0.get_capture(1) - p = capture[0] - try: - ip = p[IP] - tcp = p[TCP] - self.assertEqual(ip.src, self.pg1.remote_ip4) - self.assertEqual(ip.dst, host1.ip4) - self.assertEqual(tcp.dport, port_in) - self.assertEqual(tcp.sport, external_port) - except: - self.logger.error(ppp("Unexpected or invalid packet", p)) - raise + @classmethod + def setUpConstants(cls): + super(TestNAT44Out2InDPO, cls).setUpConstants() + cls.vpp_cmdline.extend(["nat", "{", "out2in dpo", "}"]) - # session close api test - self.vapi.nat_det_close_session_out(socket.inet_aton(nat_ip), - port_out1, - self.pg1.remote_ip4, - external_port) - dms = self.vapi.nat_det_map_dump() - self.assertEqual(dms[0].ses_num, 1) - - self.vapi.nat_det_close_session_in(host0.ip4, - port_in, - self.pg1.remote_ip4, - external_port) - dms = self.vapi.nat_det_map_dump() - self.assertEqual(dms[0].ses_num, 0) - - def test_tcp_session_close_detection_in(self): - """ Deterministic NAT TCP session close from inside network """ - self.vapi.nat_det_add_del_map(is_add=1, in_addr=self.pg0.remote_ip4, - in_plen=32, - out_addr=socket.inet_aton(self.nat_addr), - out_plen=32) - flags = self.config_flags.NAT_IS_INSIDE - self.vapi.nat44_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) - self.vapi.nat44_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) + @classmethod + def setUpClass(cls): + super(TestNAT44Out2InDPO, cls).setUpClass() + cls.vapi.cli("set log class nat level debug") - self.initiate_tcp_session(self.pg0, self.pg1) + cls.tcp_port_in = 6303 + cls.tcp_port_out = 6303 + cls.udp_port_in = 6304 + cls.udp_port_out = 6304 + cls.icmp_id_in = 6305 + cls.icmp_id_out = 6305 + cls.nat_addr = '10.0.0.3' + cls.dst_ip4 = '192.168.70.1' - # close the session from inside - try: - # FIN packet in -> out - p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, - flags="F")) - self.pg0.add_stream(p) - self.pg_enable_capture(self.pg_interfaces) - self.pg_start() - self.pg1.get_capture(1) + cls.create_pg_interfaces(range(2)) - pkts = [] + cls.pg0.admin_up() + cls.pg0.config_ip4() + cls.pg0.resolve_arp() - # ACK packet out -> in - p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / - IP(src=self.pg1.remote_ip4, dst=self.nat_addr) / - TCP(sport=self.tcp_external_port, dport=self.tcp_port_out, - flags="A")) - pkts.append(p) + cls.pg1.admin_up() + cls.pg1.config_ip6() + cls.pg1.resolve_ndp() - # FIN packet out -> in - p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / - IP(src=self.pg1.remote_ip4, dst=self.nat_addr) / - TCP(sport=self.tcp_external_port, dport=self.tcp_port_out, - flags="F")) - pkts.append(p) + r1 = VppIpRoute(cls, "::", 0, + [VppRoutePath(cls.pg1.remote_ip6, + cls.pg1.sw_if_index)], + register=False) + r1.add_vpp_config() - self.pg1.add_stream(pkts) - self.pg_enable_capture(self.pg_interfaces) - self.pg_start() - self.pg0.get_capture(2) + @classmethod + def tearDownClass(cls): + super(TestNAT44Out2InDPO, cls).tearDownClass() + + def configure_xlat(self): + self.dst_ip6_pfx = '1:2:3::' + self.dst_ip6_pfx_n = socket.inet_pton(socket.AF_INET6, + self.dst_ip6_pfx) + self.dst_ip6_pfx_len = 96 + self.src_ip6_pfx = '4:5:6::' + self.src_ip6_pfx_n = socket.inet_pton(socket.AF_INET6, + self.src_ip6_pfx) + self.src_ip6_pfx_len = 96 + self.vapi.map_add_domain(self.dst_ip6_pfx_n, self.dst_ip6_pfx_len, + self.src_ip6_pfx_n, self.src_ip6_pfx_len, + '\x00\x00\x00\x00', 0) + + @unittest.skip('Temporary disabled') + def test_464xlat_ce(self): + """ Test 464XLAT CE with NAT44 """ - # ACK packet in -> out - p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, - flags="A")) - self.pg0.add_stream(p) - self.pg_enable_capture(self.pg_interfaces) - self.pg_start() - self.pg1.get_capture(1) + nat_config = self.vapi.nat_show_config() + self.assertEqual(1, nat_config.out2in_dpo) - # Check if deterministic NAT44 closed the session - dms = self.vapi.nat_det_map_dump() - self.assertEqual(0, dms[0].ses_num) - except: - self.logger.error("TCP session termination failed") - raise + self.configure_xlat() - def test_tcp_session_close_detection_out(self): - """ Deterministic NAT TCP session close from outside network """ - self.vapi.nat_det_add_del_map(is_add=1, in_addr=self.pg0.remote_ip4, - in_plen=32, - out_addr=socket.inet_aton(self.nat_addr), - out_plen=32) flags = self.config_flags.NAT_IS_INSIDE self.vapi.nat44_interface_add_del_feature( sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1) - self.vapi.nat44_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) + self.vapi.nat44_add_del_address_range(first_ip_address=self.nat_addr_n, + last_ip_address=self.nat_addr_n, + vrf_id=0xFFFFFFFF, is_add=1) - self.initiate_tcp_session(self.pg0, self.pg1) + out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx, + self.dst_ip6_pfx_len) + out_dst_ip6 = self.compose_ip6(self.nat_addr, self.src_ip6_pfx, + self.src_ip6_pfx_len) - # close the session from outside try: - # FIN packet out -> in - p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / - IP(src=self.pg1.remote_ip4, dst=self.nat_addr) / - TCP(sport=self.tcp_external_port, dport=self.tcp_port_out, - flags="F")) - self.pg1.add_stream(p) - self.pg_enable_capture(self.pg_interfaces) - self.pg_start() - self.pg0.get_capture(1) - - pkts = [] - - # ACK packet in -> out - p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, - flags="A")) - pkts.append(p) - - # ACK packet in -> out - p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, - flags="F")) - pkts.append(p) - + pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4) self.pg0.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() - self.pg1.get_capture(2) + capture = self.pg1.get_capture(len(pkts)) + self.verify_capture_out_ip6(capture, nat_ip=out_dst_ip6, + dst_ip=out_src_ip6) - # ACK packet out -> in - p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / - IP(src=self.pg1.remote_ip4, dst=self.nat_addr) / - TCP(sport=self.tcp_external_port, dport=self.tcp_port_out, - flags="A")) - self.pg1.add_stream(p) + pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6, + out_dst_ip6) + self.pg1.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() - self.pg0.get_capture(1) - - # Check if deterministic NAT44 closed the session - dms = self.vapi.nat_det_map_dump() - self.assertEqual(0, dms[0].ses_num) - except: - self.logger.error("TCP session termination failed") - raise - - @unittest.skipUnless(running_extended_tests, "part of extended tests") - def test_session_timeout(self): - """ Deterministic NAT session timeouts """ - self.vapi.nat_det_add_del_map(is_add=1, in_addr=self.pg0.remote_ip4, - in_plen=32, - out_addr=socket.inet_aton(self.nat_addr), - out_plen=32) - flags = self.config_flags.NAT_IS_INSIDE - self.vapi.nat44_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) - self.vapi.nat44_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) - - self.initiate_tcp_session(self.pg0, self.pg1) - self.vapi.nat_set_timeouts(udp=5, tcp_established=5, tcp_transitory=5, - icmp=5) - pkts = self.create_stream_in(self.pg0, self.pg1) - self.pg0.add_stream(pkts) - self.pg_enable_capture(self.pg_interfaces) - self.pg_start() - capture = self.pg1.get_capture(len(pkts)) - sleep(15) + capture = self.pg0.get_capture(len(pkts)) + self.verify_capture_in(capture, self.pg0) + finally: + self.vapi.nat44_interface_add_del_feature( + sw_if_index=self.pg0.sw_if_index, + flags=flags) + self.vapi.nat44_add_del_address_range( + first_ip_address=self.nat_addr_n, + last_ip_address=self.nat_addr_n, + vrf_id=0xFFFFFFFF) - dms = self.vapi.nat_det_map_dump() - self.assertEqual(0, dms[0].ses_num) + @unittest.skip('Temporary disabled') + def test_464xlat_ce_no_nat(self): + """ Test 464XLAT CE without NAT44 """ - @unittest.skipUnless(running_extended_tests, "part of extended tests") - def test_session_limit_per_user(self): - """ Deterministic NAT maximum sessions per user limit """ - self.vapi.nat_det_add_del_map(is_add=1, in_addr=self.pg0.remote_ip4, - in_plen=32, - out_addr=socket.inet_aton(self.nat_addr), - out_plen=32) - flags = self.config_flags.NAT_IS_INSIDE - self.vapi.nat44_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) - self.vapi.nat44_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) - self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4, - src_address=self.pg2.local_ip4, - path_mtu=512, - template_interval=10) - self.vapi.nat_ipfix_enable_disable(domain_id=1, src_port=4739, - enable=1) + self.configure_xlat() - pkts = [] - for port in range(1025, 2025): - p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - UDP(sport=port, dport=port)) - pkts.append(p) + out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx, + self.dst_ip6_pfx_len) + out_dst_ip6 = self.compose_ip6(self.pg0.remote_ip4, self.src_ip6_pfx, + self.src_ip6_pfx_len) + pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4) self.pg0.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() capture = self.pg1.get_capture(len(pkts)) + self.verify_capture_out_ip6(capture, dst_ip=out_src_ip6, + nat_ip=out_dst_ip6, same_port=True) - p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - UDP(sport=3001, dport=3002)) - self.pg0.add_stream(p) + pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6, out_dst_ip6) + self.pg1.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() - capture = self.pg1.assert_nothing_captured() - - # verify ICMP error packet - capture = self.pg0.get_capture(1) - p = capture[0] - self.assertTrue(p.haslayer(ICMP)) - icmp = p[ICMP] - self.assertEqual(icmp.type, 3) - self.assertEqual(icmp.code, 1) - self.assertTrue(icmp.haslayer(IPerror)) - inner_ip = icmp[IPerror] - self.assertEqual(inner_ip[UDPerror].sport, 3001) - self.assertEqual(inner_ip[UDPerror].dport, 3002) - - dms = self.vapi.nat_det_map_dump() - - self.assertEqual(1000, dms[0].ses_num) - - # verify IPFIX logging - self.vapi.ipfix_flush() - sleep(1) - capture = self.pg2.get_capture(2) - ipfix = IPFIXDecoder() - # first load template - for p in capture: - self.assertTrue(p.haslayer(IPFIX)) - if p.haslayer(Template): - ipfix.add_template(p.getlayer(Template)) - # verify events in data set - for p in capture: - if p.haslayer(Data): - data = ipfix.decode_data_set(p.getlayer(Set)) - self.verify_ipfix_max_entries_per_user(data, - 1000, - self.pg0.remote_ip4) - - def clear_nat_det(self): - """ - Clear deterministic NAT configuration. - """ - self.vapi.nat_ipfix_enable_disable(domain_id=1, src_port=4739, - enable=0) - self.vapi.nat_set_timeouts(udp=300, tcp_established=7440, - tcp_transitory=240, icmp=60) - deterministic_mappings = self.vapi.nat_det_map_dump() - for dsm in deterministic_mappings: - self.vapi.nat_det_add_del_map(is_add=0, in_addr=dsm.in_addr, - in_plen=dsm.in_plen, - out_addr=dsm.out_addr, - out_plen=dsm.out_plen) - - interfaces = self.vapi.nat44_interface_dump() - for intf in interfaces: - self.vapi.nat44_interface_add_del_feature( - sw_if_index=intf.sw_if_index, - flags=intf.flags) - - def tearDown(self): - super(TestDeterministicNAT, self).tearDown() - if not self.vpp_dead: - self.clear_nat_det() - - def show_commands_at_teardown(self): - self.logger.info(self.vapi.cli("show nat44 interfaces")) - self.logger.info(self.vapi.cli("show nat timeouts")) - self.logger.info( - self.vapi.cli("show nat44 deterministic mappings")) - self.logger.info( - self.vapi.cli("show nat44 deterministic sessions")) + capture = self.pg0.get_capture(len(pkts)) + self.verify_capture_in(capture, self.pg0) class TestNAT64(MethodHolder): @@ -7854,12 +7706,10 @@ class TestNAT64(MethodHolder): sw_if_index=self.pg1.sw_if_index) # in2out - tcpn = self.statistics.get_err_counter('/err/nat64-in2out/TCP packets') - udpn = self.statistics.get_err_counter('/err/nat64-in2out/UDP packets') - icmpn = self.statistics.get_err_counter( - '/err/nat64-in2out/ICMP packets') - totaln = self.statistics.get_err_counter( - '/err/nat64-in2out/good in2out packets processed') + tcpn = self.statistics.get_counter('/nat64/in2out/tcp')[0] + udpn = self.statistics.get_counter('/nat64/in2out/udp')[0] + icmpn = self.statistics.get_counter('/nat64/in2out/icmp')[0] + drops = self.statistics.get_counter('/nat64/in2out/drops')[0] pkts = self.create_stream_in_ip6(self.pg0, self.pg1) self.pg0.add_stream(pkts) @@ -7869,23 +7719,21 @@ class TestNAT64(MethodHolder): self.verify_capture_out(capture, nat_ip=self.nat_addr, dst_ip=self.pg1.remote_ip4) - err = self.statistics.get_err_counter('/err/nat64-in2out/TCP packets') - self.assertEqual(err - tcpn, 1) - err = self.statistics.get_err_counter('/err/nat64-in2out/UDP packets') - self.assertEqual(err - udpn, 1) - err = self.statistics.get_err_counter('/err/nat64-in2out/ICMP packets') - self.assertEqual(err - icmpn, 1) - err = self.statistics.get_err_counter( - '/err/nat64-in2out/good in2out packets processed') - self.assertEqual(err - totaln, 3) + if_idx = self.pg0.sw_if_index + cnt = self.statistics.get_counter('/nat64/in2out/tcp')[0] + self.assertEqual(cnt[if_idx] - tcpn[if_idx], 1) + cnt = self.statistics.get_counter('/nat64/in2out/udp')[0] + self.assertEqual(cnt[if_idx] - udpn[if_idx], 1) + cnt = self.statistics.get_counter('/nat64/in2out/icmp')[0] + self.assertEqual(cnt[if_idx] - icmpn[if_idx], 1) + cnt = self.statistics.get_counter('/nat64/in2out/drops')[0] + self.assertEqual(cnt[if_idx] - drops[if_idx], 0) # out2in - tcpn = self.statistics.get_err_counter('/err/nat64-out2in/TCP packets') - udpn = self.statistics.get_err_counter('/err/nat64-out2in/UDP packets') - icmpn = self.statistics.get_err_counter( - '/err/nat64-out2in/ICMP packets') - totaln = self.statistics.get_err_counter( - '/err/nat64-out2in/good out2in packets processed') + tcpn = self.statistics.get_counter('/nat64/out2in/tcp')[0] + udpn = self.statistics.get_counter('/nat64/out2in/udp')[0] + icmpn = self.statistics.get_counter('/nat64/out2in/icmp')[0] + drops = self.statistics.get_counter('/nat64/out2in/drops')[0] pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr) self.pg1.add_stream(pkts) @@ -7895,15 +7743,15 @@ class TestNAT64(MethodHolder): ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4])) self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6) - err = self.statistics.get_err_counter('/err/nat64-out2in/TCP packets') - self.assertEqual(err - tcpn, 2) - err = self.statistics.get_err_counter('/err/nat64-out2in/UDP packets') - self.assertEqual(err - udpn, 1) - err = self.statistics.get_err_counter('/err/nat64-out2in/ICMP packets') - self.assertEqual(err - icmpn, 1) - err = self.statistics.get_err_counter( - '/err/nat64-out2in/good out2in packets processed') - self.assertEqual(err - totaln, 4) + if_idx = self.pg1.sw_if_index + cnt = self.statistics.get_counter('/nat64/out2in/tcp')[0] + self.assertEqual(cnt[if_idx] - tcpn[if_idx], 2) + cnt = self.statistics.get_counter('/nat64/out2in/udp')[0] + self.assertEqual(cnt[if_idx] - udpn[if_idx], 1) + cnt = self.statistics.get_counter('/nat64/out2in/icmp')[0] + self.assertEqual(cnt[if_idx] - icmpn[if_idx], 1) + cnt = self.statistics.get_counter('/nat64/out2in/drops')[0] + self.assertEqual(cnt[if_idx] - drops[if_idx], 0) bibs = self.statistics.get_counter('/nat64/total-bibs') self.assertEqual(bibs[0][0], 3) @@ -9027,158 +8875,5 @@ class TestNAT64(MethodHolder): self.logger.info(self.vapi.cli("show nat64 session table all")) -class TestNAT66(MethodHolder): - """ NAT66 Test Cases """ - - @classmethod - def setUpClass(cls): - super(TestNAT66, cls).setUpClass() - - cls.nat_addr = 'fd01:ff::2' - - cls.create_pg_interfaces(range(2)) - cls.interfaces = list(cls.pg_interfaces) - - for i in cls.interfaces: - i.admin_up() - i.config_ip6() - i.configure_ipv6_neighbors() - - @classmethod - def tearDownClass(cls): - super(TestNAT66, cls).tearDownClass() - - def test_static(self): - """ 1:1 NAT66 test """ - flags = self.config_flags.NAT_IS_INSIDE - self.vapi.nat66_add_del_interface(is_add=1, flags=flags, - sw_if_index=self.pg0.sw_if_index) - self.vapi.nat66_add_del_interface(is_add=1, - sw_if_index=self.pg1.sw_if_index) - self.vapi.nat66_add_del_static_mapping( - local_ip_address=self.pg0.remote_ip6, - external_ip_address=self.nat_addr, - is_add=1) - - # in2out - pkts = [] - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) / - TCP()) - pkts.append(p) - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) / - UDP()) - pkts.append(p) - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) / - ICMPv6EchoRequest()) - pkts.append(p) - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) / - GRE() / IP() / TCP()) - pkts.append(p) - self.pg0.add_stream(pkts) - self.pg_enable_capture(self.pg_interfaces) - self.pg_start() - capture = self.pg1.get_capture(len(pkts)) - - for packet in capture: - try: - self.assertEqual(packet[IPv6].src, self.nat_addr) - self.assertEqual(packet[IPv6].dst, self.pg1.remote_ip6) - self.assert_packet_checksums_valid(packet) - except: - self.logger.error(ppp("Unexpected or invalid packet:", packet)) - raise - - # out2in - pkts = [] - p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) / - IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) / - TCP()) - pkts.append(p) - p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) / - IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) / - UDP()) - pkts.append(p) - p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) / - IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) / - ICMPv6EchoReply()) - pkts.append(p) - p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) / - IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) / - GRE() / IP() / TCP()) - pkts.append(p) - self.pg1.add_stream(pkts) - self.pg_enable_capture(self.pg_interfaces) - self.pg_start() - capture = self.pg0.get_capture(len(pkts)) - for packet in capture: - try: - self.assertEqual(packet[IPv6].src, self.pg1.remote_ip6) - self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6) - self.assert_packet_checksums_valid(packet) - except: - self.logger.error(ppp("Unexpected or invalid packet:", packet)) - raise - - sm = self.vapi.nat66_static_mapping_dump() - self.assertEqual(len(sm), 1) - self.assertEqual(sm[0].total_pkts, 8) - - def test_check_no_translate(self): - """ NAT66 translate only when egress interface is outside interface """ - flags = self.config_flags.NAT_IS_INSIDE - self.vapi.nat66_add_del_interface(is_add=1, flags=flags, - sw_if_index=self.pg0.sw_if_index) - self.vapi.nat66_add_del_interface(is_add=1, flags=flags, - sw_if_index=self.pg1.sw_if_index) - self.vapi.nat66_add_del_static_mapping( - local_ip_address=self.pg0.remote_ip6, - external_ip_address=self.nat_addr, - is_add=1) - - # in2out - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) / - UDP()) - self.pg0.add_stream([p]) - self.pg_enable_capture(self.pg_interfaces) - self.pg_start() - capture = self.pg1.get_capture(1) - packet = capture[0] - try: - self.assertEqual(packet[IPv6].src, self.pg0.remote_ip6) - self.assertEqual(packet[IPv6].dst, self.pg1.remote_ip6) - except: - self.logger.error(ppp("Unexpected or invalid packet:", packet)) - raise - - def clear_nat66(self): - """ - Clear NAT66 configuration. - """ - interfaces = self.vapi.nat66_interface_dump() - for intf in interfaces: - self.vapi.nat66_add_del_interface(is_add=0, flags=intf.flags, - sw_if_index=intf.sw_if_index) - - static_mappings = self.vapi.nat66_static_mapping_dump() - for sm in static_mappings: - self.vapi.nat66_add_del_static_mapping( - local_ip_address=sm.local_ip_address, - external_ip_address=sm.external_ip_address, vrf_id=sm.vrf_id, - is_add=0) - - def tearDown(self): - super(TestNAT66, self).tearDown() - self.clear_nat66() - - def show_commands_at_teardown(self): - self.logger.info(self.vapi.cli("show nat66 interfaces")) - self.logger.info(self.vapi.cli("show nat66 static mappings")) - - if __name__ == '__main__': unittest.main(testRunner=VppTestRunner)