-#!/usr/bin/env python
+#!/usr/bin/env python3
import socket
import unittest
from vpp_papi import VppEnum
from vpp_ip_route import VppIpRoute, VppRoutePath, FibPathType
from vpp_neighbor import VppNeighbor
-from vpp_ip import VppIpAddress, VppIpPrefix
from scapy.all import bind_layers, Packet, ByteEnumField, ShortField, \
IPField, IntField, LongField, XByteField, FlagsField, FieldLenField, \
PacketListField
p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
TCP(sport=self.tcp_port_in, dport=20))
- pkts.append(p)
+ pkts.extend([p, p])
# UDP
p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
TCP(dport=tcp_port, sport=20))
- pkts.append(p)
+ pkts.extend([p, p])
# UDP
p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
# maxEntriesPerUser
self.assertEqual(struct.pack("I", limit), record[473])
# sourceIPv4Address
- self.assertEqual(src_addr, record[8])
+ self.assertEqual(socket.inet_pton(socket.AF_INET, src_addr), record[8])
def verify_syslog_apmap(self, data, is_add=True):
message = data.decode('utf-8')
cls.pg1.configure_ipv4_neighbors()
cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
- cls.vapi.ip_table_add_del(is_add=1, table_id=10)
- cls.vapi.ip_table_add_del(is_add=1, table_id=20)
+ cls.vapi.ip_table_add_del(is_add=1, table={'table_id': 10})
+ cls.vapi.ip_table_add_del(is_add=1, table={'table_id': 20})
- cls.pg4._local_ip4 = VppIpPrefix("172.16.255.1",
- cls.pg4.local_ip4_prefix.len)
+ cls.pg4._local_ip4 = "172.16.255.1"
cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
cls.pg4.set_table_ip4(10)
- cls.pg5._local_ip4 = VppIpPrefix("172.17.255.3",
- cls.pg5.local_ip4_prefix.len)
+ cls.pg5._local_ip4 = "172.17.255.3"
cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
cls.pg5.set_table_ip4(10)
- cls.pg6._local_ip4 = VppIpPrefix("172.16.255.1",
- cls.pg6.local_ip4_prefix.len)
+ cls.pg6._local_ip4 = "172.16.255.1"
cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
cls.pg6.set_table_ip4(20)
for i in cls.overlapping_interfaces:
cls.pg9.config_ip4()
cls.vapi.sw_interface_add_del_address(
sw_if_index=cls.pg9.sw_if_index,
- prefix=VppIpPrefix("10.0.0.1", 24).encode())
+ prefix="10.0.0.1/24")
cls.pg9.admin_up()
cls.pg9.resolve_arp()
err = self.statistics.get_err_counter(
'/err/nat44-in2out-slowpath/TCP packets')
- self.assertEqual(err - tcpn, 1)
+ self.assertEqual(err - tcpn, 2)
err = self.statistics.get_err_counter(
'/err/nat44-in2out-slowpath/UDP packets')
self.assertEqual(err - udpn, 1)
self.assertEqual(err - icmpn, 1)
err = self.statistics.get_err_counter(
'/err/nat44-in2out-slowpath/good in2out packets processed')
- self.assertEqual(err - totaln, 3)
+ self.assertEqual(err - totaln, 4)
# out2in
tcpn = self.statistics.get_err_counter('/err/nat44-out2in/TCP packets')
self.verify_capture_in(capture, self.pg0)
err = self.statistics.get_err_counter('/err/nat44-out2in/TCP packets')
- self.assertEqual(err - tcpn, 1)
+ self.assertEqual(err - tcpn, 2)
err = self.statistics.get_err_counter('/err/nat44-out2in/UDP packets')
self.assertEqual(err - udpn, 1)
err = self.statistics.get_err_counter('/err/nat44-out2in/ICMP packets')
self.assertEqual(err - icmpn, 1)
err = self.statistics.get_err_counter(
'/err/nat44-out2in/good out2in packets processed')
- self.assertEqual(err - totaln, 3)
+ self.assertEqual(err - totaln, 4)
users = self.statistics.get_counter('/nat44/total-users')
self.assertEqual(users[0][0], 1)
is_add=1)
self.vapi.nat44_forwarding_enable_disable(enable=1)
- real_ip = self.pg0.remote_ip4n
+ real_ip = self.pg0.remote_ip4
alias_ip = self.nat_addr
flags = self.config_flags.NAT_IS_ADDR_ONLY
self.vapi.nat44_add_del_static_mapping(is_add=1,
self.tcp_port_out = 6303
self.udp_port_out = 6304
self.icmp_id_out = 6305
- tag = b"testTAG"
+ tag = "testTAG"
self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip, tag=tag)
flags = self.config_flags.NAT_IS_INSIDE
is_add=1)
sm = self.vapi.nat44_static_mapping_dump()
self.assertEqual(len(sm), 1)
- self.assertEqual((sm[0].tag).split(b'\0', 1)[0], tag)
+ self.assertEqual(sm[0].tag, tag)
# out2in
pkts = self.create_stream_out(self.pg1, nat_ip)
# 1:1NAT
self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
- sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
+ sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
self.assertEqual(len(sessions), 0)
pkts = self.create_stream_in(self.pg0, self.pg1)
self.pg0.add_stream(pkts)
""" Identity NAT """
flags = self.config_flags.NAT_IS_ADDR_ONLY
self.vapi.nat44_add_del_identity_mapping(
- ip_address=self.pg0.remote_ip4n, sw_if_index=0xFFFFFFFF,
+ ip_address=self.pg0.remote_ip4, sw_if_index=0xFFFFFFFF,
flags=flags, is_add=1)
flags = self.config_flags.NAT_IS_INSIDE
self.vapi.nat44_interface_add_del_feature(
self.logger.error(ppp("Unexpected or invalid packet:", p))
raise
- sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
+ sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
self.assertEqual(len(sessions), 0)
flags = self.config_flags.NAT_IS_ADDR_ONLY
self.vapi.nat44_add_del_identity_mapping(
- ip_address=self.pg0.remote_ip4n, sw_if_index=0xFFFFFFFF,
+ ip_address=self.pg0.remote_ip4, sw_if_index=0xFFFFFFFF,
flags=flags, vrf_id=1, is_add=1)
identity_mappings = self.vapi.nat44_identity_mapping_dump()
self.assertEqual(len(identity_mappings), 2)
# pg5 session dump
addresses = self.vapi.nat44_address_dump()
self.assertEqual(len(addresses), 1)
- sessions = self.vapi.nat44_user_session_dump(self.pg5.remote_ip4n, 10)
+ sessions = self.vapi.nat44_user_session_dump(self.pg5.remote_ip4, 10)
self.assertEqual(len(sessions), 3)
for session in sessions:
self.assertFalse(session.flags & self.config_flags.NAT_IS_STATIC)
self.config_flags.NAT_IS_EXT_HOST_VALID)
# pg4 session dump
- sessions = self.vapi.nat44_user_session_dump(self.pg4.remote_ip4n, 10)
+ sessions = self.vapi.nat44_user_session_dump(self.pg4.remote_ip4, 10)
self.assertGreaterEqual(len(sessions), 4)
for session in sessions:
self.assertFalse(session.flags & self.config_flags.NAT_IS_STATIC)
addresses[0].ip_address)
# pg6 session dump
- sessions = self.vapi.nat44_user_session_dump(self.pg6.remote_ip4n, 20)
+ sessions = self.vapi.nat44_user_session_dump(self.pg6.remote_ip4, 20)
self.assertGreaterEqual(len(sessions), 3)
for session in sessions:
self.assertTrue(session.flags & self.config_flags.NAT_IS_STATIC)
self.logger.error(ppp("Unexpected or invalid packet:", packet))
raise
- def test_max_translations_per_user(self):
- """ MAX translations per user - recycle the least recently used """
-
- self.nat44_add_address(self.nat_addr)
- flags = self.config_flags.NAT_IS_INSIDE
- self.vapi.nat44_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
- self.vapi.nat44_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- is_add=1)
-
- # get maximum number of translations per user
- nat44_config = self.vapi.nat_show_config()
-
- # send more than maximum number of translations per user packets
- pkts_num = nat44_config.max_translations_per_user + 5
- pkts = []
- for port in range(0, pkts_num):
- p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
- TCP(sport=1025 + port))
- pkts.append(p)
- self.pg0.add_stream(pkts)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
-
- # verify number of translated packet
- self.pg1.get_capture(pkts_num)
-
- users = self.vapi.nat44_user_dump()
- for user in users:
- if user.ip_address == self.pg0.remote_ip4n:
- self.assertEqual(user.nsessions,
- nat44_config.max_translations_per_user)
- self.assertEqual(user.nstaticsessions, 0)
-
- tcp_port = 22
- self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
- tcp_port, tcp_port,
- proto=IP_PROTOS.tcp)
- p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
- TCP(sport=tcp_port))
- self.pg0.add_stream(p)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
- self.pg1.get_capture(1)
- users = self.vapi.nat44_user_dump()
- for user in users:
- if user.ip_address == self.pg0.remote_ip4n:
- self.assertEqual(user.nsessions,
- nat44_config.max_translations_per_user - 1)
- self.assertEqual(user.nstaticsessions, 1)
-
def test_interface_addr(self):
""" Acquire NAT44 addresses from interface """
self.vapi.nat44_add_del_interface_addr(
def test_interface_addr_static_mapping(self):
""" Static mapping with addresses from interface """
- tag = b"testTAG"
+ tag = "testTAG"
self.vapi.nat44_add_del_interface_addr(
is_add=1,
self.assertEqual(1, len(static_mappings))
self.assertEqual(self.pg7.sw_if_index,
static_mappings[0].external_sw_if_index)
- self.assertEqual((static_mappings[0].tag).split(b'\0', 1)[0], tag)
+ self.assertEqual(static_mappings[0].tag, tag)
# configure interface address and check static mappings
self.pg7.config_ip4()
if sm.external_sw_if_index == 0xFFFFFFFF:
self.assertEqual(str(sm.external_ip_address),
self.pg7.local_ip4)
- self.assertEqual((sm.tag).split(b'\0', 1)[0], tag)
+ self.assertEqual(sm.tag, tag)
resolved = True
self.assertTrue(resolved)
self.assertEqual(1, len(static_mappings))
self.assertEqual(self.pg7.sw_if_index,
static_mappings[0].external_sw_if_index)
- self.assertEqual((static_mappings[0].tag).split(b'\0', 1)[0], tag)
+ self.assertEqual(static_mappings[0].tag, tag)
# configure interface address again and check static mappings
self.pg7.config_ip4()
if sm.external_sw_if_index == 0xFFFFFFFF:
self.assertEqual(str(sm.external_ip_address),
self.pg7.local_ip4)
- self.assertEqual((sm.tag).split(b'\0', 1)[0], tag)
+ self.assertEqual(sm.tag, tag)
resolved = True
self.assertTrue(resolved)
self.vapi.nat44_interface_add_del_feature(
sw_if_index=self.pg1.sw_if_index,
is_add=1)
- self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
- src_address=self.pg3.local_ip4n,
+ self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4,
+ src_address=self.pg3.local_ip4,
path_mtu=512,
template_interval=10,
collector_port=collector_port)
self.vapi.nat44_interface_add_del_feature(
sw_if_index=self.pg1.sw_if_index,
is_add=1)
- self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
- src_address=self.pg3.local_ip4n,
+ self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4,
+ src_address=self.pg3.local_ip4,
path_mtu=512,
template_interval=10)
self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
self.pg_start()
self.pg1.get_capture(max_sessions)
- self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
- src_address=self.pg3.local_ip4n,
+ self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4,
+ src_address=self.pg3.local_ip4,
path_mtu=512,
template_interval=10)
self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
""" Test syslog address and port mapping creation and deletion """
self.vapi.syslog_set_filter(
self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_INFO)
- self.vapi.syslog_set_sender(self.pg3.local_ip4n, self.pg3.remote_ip4n)
+ self.vapi.syslog_set_sender(self.pg3.local_ip4, self.pg3.remote_ip4)
self.nat44_add_address(self.nat_addr)
flags = self.config_flags.NAT_IS_INSIDE
self.vapi.nat44_interface_add_del_feature(
self.pg0.unconfig_ip4()
self.pg1.unconfig_ip4()
- self.vapi.ip_table_add_del(is_add=1, table_id=vrf_id1)
- self.vapi.ip_table_add_del(is_add=1, table_id=vrf_id2)
+ self.vapi.ip_table_add_del(is_add=1, table={'table_id': vrf_id1})
+ self.vapi.ip_table_add_del(is_add=1, table={'table_id': vrf_id2})
self.pg0.set_table_ip4(vrf_id1)
self.pg1.set_table_ip4(vrf_id2)
self.pg0.config_ip4()
self.pg1.config_ip4()
self.pg0.resolve_arp()
self.pg1.resolve_arp()
- self.vapi.ip_table_add_del(is_add=0, table_id=vrf_id1)
- self.vapi.ip_table_add_del(is_add=0, table_id=vrf_id2)
+ self.vapi.ip_table_add_del(is_add=0, table={'table_id': vrf_id1})
+ self.vapi.ip_table_add_del(is_add=0, table={'table_id': vrf_id2})
def test_vrf_feature_independent(self):
""" NAT44 tenant VRF independent address pool mode """
self.pg_start()
self.pg1.get_capture(len(pkts))
- sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
+ sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
nsessions = len(sessions)
self.vapi.nat44_del_session(address=sessions[0].inside_ip_address,
port=sessions[1].outside_port,
protocol=sessions[1].protocol)
- sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
+ sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
self.assertEqual(nsessions - len(sessions), 2)
self.vapi.nat44_del_session(address=sessions[0].inside_ip_address,
is_add=1)
self.vapi.nat_set_reass(timeout=2, max_reass=1024, max_frag=1,
drop_frag=0)
- self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
- src_address=self.pg3.local_ip4n,
+ self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4,
+ src_address=self.pg3.local_ip4,
path_mtu=512,
template_interval=10)
self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
self.pg1.unconfig_ip4()
self.pg2.unconfig_ip4()
- self.vapi.ip_table_add_del(is_add=1, table_id=vrf_id1)
- self.vapi.ip_table_add_del(is_add=1, table_id=vrf_id2)
+ self.vapi.ip_table_add_del(is_add=1, table={'table_id': vrf_id1})
+ self.vapi.ip_table_add_del(is_add=1, table={'table_id': vrf_id2})
self.pg1.set_table_ip4(vrf_id1)
self.pg2.set_table_ip4(vrf_id2)
self.pg1.config_ip4()
# delete one session
self.pg_enable_capture(self.pg_interfaces)
- self.vapi.nat44_del_session(address=self.pg0.remote_ip4n,
+ self.vapi.nat44_del_session(address=self.pg0.remote_ip4,
port=self.tcp_port_in,
protocol=IP_PROTOS.tcp,
flags=self.config_flags.NAT_IS_INSIDE)
self.logger.info(self.vapi.cli("show nat ha"))
+class TestNAT44EndpointDependent2(MethodHolder):
+ """ Endpoint-Dependent session test cases """
+
+ icmp_timeout = 2
+
+ @classmethod
+ def setUpConstants(cls):
+ super(TestNAT44EndpointDependent2, cls).setUpConstants()
+ cls.vpp_cmdline.extend(["nat", "{", "endpoint-dependent",
+ "translation", "hash", "buckets", "1",
+ "icmp", "timeout", str(cls.icmp_timeout), "}"])
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestNAT44EndpointDependent2, cls).setUpClass()
+ try:
+ translation_buckets = 1
+ cls.max_translations = 10 * translation_buckets
+
+ cls.create_pg_interfaces(range(2))
+ cls.interfaces = list(cls.pg_interfaces[0:2])
+
+ for i in cls.interfaces:
+ i.admin_up()
+ i.config_ip4()
+ i.resolve_arp()
+
+ cls.pg0.generate_remote_hosts(1)
+ cls.pg0.configure_ipv4_neighbors()
+
+ cls.pg1.generate_remote_hosts(1)
+ cls.pg1.configure_ipv4_neighbors()
+
+ except Exception:
+ super(TestNAT44EndpointDependent2, cls).tearDownClass()
+ raise
+
+ def create_icmp_stream(self, in_if, out_if, count):
+ """
+ Create ICMP packet stream for inside network
+
+ :param in_if: Inside interface
+ :param out_if: Outside interface
+ :param count: Number of packets
+ """
+
+ self.assertTrue(count > 0)
+ icmp_id = random.randint(0, 65535 - (count - 1))
+
+ pkts = list()
+ for i in range(count):
+ p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
+ IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=64) /
+ ICMP(id=icmp_id + i, type='echo-request'))
+ pkts.append(p)
+ return pkts
+
+ def send_pkts(self, pkts, expected=None):
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ return self.pg1.get_capture(
+ len(pkts) if expected is None else expected)
+
+ def test_session_cleanup(self):
+ """ NAT44 session cleanup test """
+
+ self.nat44_add_address(self.pg1.local_ip4)
+ flags = self.config_flags.NAT_IS_INSIDE
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=self.pg0.sw_if_index,
+ flags=flags, is_add=1)
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=self.pg1.sw_if_index,
+ is_add=1)
+
+ nat_config = self.vapi.nat_show_config()
+ self.assertEqual(1, nat_config.endpoint_dependent)
+
+ pkts = self.create_icmp_stream(self.pg0, self.pg1,
+ self.max_translations + 2)
+ sz = len(pkts)
+
+ # positive test
+ self.send_pkts(pkts[0:self.max_translations])
+
+ # false positive test
+ self.send_pkts(pkts[self.max_translations:sz - 1], 0)
+
+ sleep(self.icmp_timeout)
+
+ # positive test
+ self.send_pkts(pkts[self.max_translations + 1:sz])
+
+
class TestNAT44EndpointDependent(MethodHolder):
""" Endpoint-Dependent mapping and filtering test cases """
cls.ipfix_domain_id = 1
cls.tcp_external_port = 80
- cls.create_pg_interfaces(range(7))
+ cls.create_pg_interfaces(range(9))
cls.interfaces = list(cls.pg_interfaces[0:3])
for i in cls.interfaces:
cls.pg4.config_ip4()
cls.vapi.sw_interface_add_del_address(
sw_if_index=cls.pg4.sw_if_index,
- prefix=VppIpPrefix("10.0.0.1", 24).encode())
+ prefix="10.0.0.1/24")
cls.pg4.admin_up()
cls.pg4.resolve_arp()
cls.pg4._remote_hosts[1]._ip4 = cls.pg4._remote_hosts[0]._ip4
cls.pg4.resolve_arp()
- zero_ip4n = socket.inet_pton(socket.AF_INET, "0.0.0.0")
- cls.vapi.ip_table_add_del(is_add=1, table_id=1)
+ zero_ip4 = socket.inet_pton(socket.AF_INET, "0.0.0.0")
+ cls.vapi.ip_table_add_del(is_add=1, table={'table_id': 1})
- cls.pg5._local_ip4 = VppIpPrefix("10.1.1.1",
- cls.pg5.local_ip4_prefix.len)
+ cls.pg5._local_ip4 = "10.1.1.1"
cls.pg5._remote_hosts[0]._ip4 = "10.1.1.2"
- cls.pg5._remote_hosts[0]._ip4n = socket.inet_pton(
- socket.AF_INET, cls.pg5.remote_ip4)
cls.pg5.set_table_ip4(1)
cls.pg5.config_ip4()
cls.pg5.admin_up()
register=False)
r1.add_vpp_config()
- cls.pg6._local_ip4 = VppIpPrefix("10.1.2.1",
- cls.pg6.local_ip4_prefix.len)
+ cls.pg6._local_ip4 = "10.1.2.1"
cls.pg6._remote_hosts[0]._ip4 = "10.1.2.2"
- cls.pg6._remote_hosts[0]._ip4n = socket.inet_pton(
- socket.AF_INET, cls.pg6.remote_ip4)
cls.pg6.set_table_ip4(1)
cls.pg6.config_ip4()
cls.pg6.admin_up()
cls.pg5.resolve_arp()
cls.pg6.resolve_arp()
+ cls.pg7.admin_up()
+ cls.pg7.config_ip4()
+ cls.pg7.resolve_arp()
+ cls.pg7.generate_remote_hosts(3)
+ cls.pg7.configure_ipv4_neighbors()
+
+ cls.pg8.admin_up()
+ cls.pg8.config_ip4()
+ cls.pg8.resolve_arp()
+
except Exception:
super(TestNAT44EndpointDependent, cls).tearDownClass()
raise
err = self.statistics.get_err_counter(
'/err/nat44-ed-in2out-slowpath/TCP packets')
- self.assertEqual(err - tcpn, 1)
+ self.assertEqual(err - tcpn, 2)
err = self.statistics.get_err_counter(
'/err/nat44-ed-in2out-slowpath/UDP packets')
self.assertEqual(err - udpn, 1)
self.assertEqual(err - icmpn, 1)
err = self.statistics.get_err_counter(
'/err/nat44-ed-in2out-slowpath/good in2out packets processed')
- self.assertEqual(err - totaln, 3)
+ self.assertEqual(err - totaln, 4)
# out2in
tcpn = self.statistics.get_err_counter(
err = self.statistics.get_err_counter(
'/err/nat44-ed-out2in/TCP packets')
- self.assertEqual(err - tcpn, 1)
+ self.assertEqual(err - tcpn, 2)
err = self.statistics.get_err_counter(
'/err/nat44-ed-out2in/UDP packets')
self.assertEqual(err - udpn, 1)
self.assertEqual(err - icmpn, 1)
err = self.statistics.get_err_counter(
'/err/nat44-ed-out2in/good out2in packets processed')
- self.assertEqual(err - totaln, 2)
+ self.assertEqual(err - totaln, 3)
users = self.statistics.get_counter('/nat44/total-users')
self.assertEqual(users[0][0], 1)
sessions = self.statistics.get_counter('/nat44/total-sessions')
self.assertEqual(sessions[0][0], 3)
+ def test_dynamic_output_feature_vrf(self):
+ """ NAT44 dynamic translation test: output-feature, VRF"""
+
+ # other then default (0)
+ new_vrf_id = 22
+
+ self.nat44_add_address(self.nat_addr)
+ flags = self.config_flags.NAT_IS_INSIDE
+ self.vapi.nat44_interface_add_del_output_feature(
+ sw_if_index=self.pg7.sw_if_index,
+ flags=flags, is_add=1)
+ self.vapi.nat44_interface_add_del_output_feature(
+ sw_if_index=self.pg8.sw_if_index,
+ is_add=1)
+
+ try:
+ self.vapi.ip_table_add_del(is_add=1,
+ table={'table_id': new_vrf_id})
+
+ self.pg7.unconfig_ip4()
+ self.pg7.set_table_ip4(new_vrf_id)
+ self.pg7.config_ip4()
+ self.pg7.resolve_arp()
+
+ self.pg8.unconfig_ip4()
+ self.pg8.set_table_ip4(new_vrf_id)
+ self.pg8.config_ip4()
+ self.pg8.resolve_arp()
+
+ nat_config = self.vapi.nat_show_config()
+ self.assertEqual(1, nat_config.endpoint_dependent)
+
+ # in2out
+ tcpn = self.statistics.get_err_counter(
+ '/err/nat44-ed-in2out-slowpath/TCP packets')
+ udpn = self.statistics.get_err_counter(
+ '/err/nat44-ed-in2out-slowpath/UDP packets')
+ icmpn = self.statistics.get_err_counter(
+ '/err/nat44-ed-in2out-slowpath/ICMP packets')
+ totaln = self.statistics.get_err_counter(
+ '/err/nat44-ed-in2out-slowpath/good in2out packets processed')
+
+ pkts = self.create_stream_in(self.pg7, self.pg8)
+ self.pg7.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg8.get_capture(len(pkts))
+ self.verify_capture_out(capture)
+
+ err = self.statistics.get_err_counter(
+ '/err/nat44-ed-in2out-slowpath/TCP packets')
+ self.assertEqual(err - tcpn, 2)
+ err = self.statistics.get_err_counter(
+ '/err/nat44-ed-in2out-slowpath/UDP packets')
+ self.assertEqual(err - udpn, 1)
+ err = self.statistics.get_err_counter(
+ '/err/nat44-ed-in2out-slowpath/ICMP packets')
+ self.assertEqual(err - icmpn, 1)
+ err = self.statistics.get_err_counter(
+ '/err/nat44-ed-in2out-slowpath/good in2out packets processed')
+ self.assertEqual(err - totaln, 4)
+
+ # out2in
+ tcpn = self.statistics.get_err_counter(
+ '/err/nat44-ed-out2in/TCP packets')
+ udpn = self.statistics.get_err_counter(
+ '/err/nat44-ed-out2in/UDP packets')
+ icmpn = self.statistics.get_err_counter(
+ '/err/nat44-ed-out2in-slowpath/ICMP packets')
+ totaln = self.statistics.get_err_counter(
+ '/err/nat44-ed-out2in/good out2in packets processed')
+
+ pkts = self.create_stream_out(self.pg8)
+ self.pg8.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg7.get_capture(len(pkts))
+ self.verify_capture_in(capture, self.pg7)
+
+ err = self.statistics.get_err_counter(
+ '/err/nat44-ed-out2in/TCP packets')
+ self.assertEqual(err - tcpn, 2)
+ err = self.statistics.get_err_counter(
+ '/err/nat44-ed-out2in/UDP packets')
+ self.assertEqual(err - udpn, 1)
+ err = self.statistics.get_err_counter(
+ '/err/nat44-ed-out2in-slowpath/ICMP packets')
+ self.assertEqual(err - icmpn, 1)
+ err = self.statistics.get_err_counter(
+ '/err/nat44-ed-out2in/good out2in packets processed')
+ self.assertEqual(err - totaln, 3)
+
+ users = self.statistics.get_counter('/nat44/total-users')
+ self.assertEqual(users[0][0], 1)
+ sessions = self.statistics.get_counter('/nat44/total-sessions')
+ self.assertEqual(sessions[0][0], 3)
+
+ finally:
+ self.pg7.unconfig_ip4()
+ self.pg7.set_table_ip4(1)
+ self.pg7.config_ip4()
+ self.pg7.resolve_arp()
+
+ self.pg8.unconfig_ip4()
+ self.pg8.set_table_ip4(1)
+ self.pg8.config_ip4()
+ self.pg8.resolve_arp()
+
+ self.vapi.ip_table_add_del(is_add=0,
+ table={'table_id': new_vrf_id})
+
def test_forwarding(self):
""" NAT44 forwarding test """
self.pg0.remote_hosts[0] = host0
user = self.pg0.remote_hosts[1]
- sessions = self.vapi.nat44_user_session_dump(user.ip4n, 0)
+ sessions = self.vapi.nat44_user_session_dump(user.ip4, 0)
self.assertEqual(len(sessions), 3)
self.assertTrue(sessions[0].flags &
self.config_flags.NAT_IS_EXT_HOST_VALID)
self.config_flags.NAT_IS_EXT_HOST_VALID),
ext_host_address=sessions[0].ext_host_address,
ext_host_port=sessions[0].ext_host_port)
- sessions = self.vapi.nat44_user_session_dump(user.ip4n, 0)
+ sessions = self.vapi.nat44_user_session_dump(user.ip4, 0)
self.assertEqual(len(sessions), 2)
finally:
server1 = self.pg0.remote_hosts[0]
server2 = self.pg0.remote_hosts[1]
- locals = [{'addr': server1.ip4n,
+ locals = [{'addr': server1.ip4,
'port': local_port,
'probability': 70,
'vrf_id': 0},
- {'addr': server2.ip4n,
+ {'addr': server2.ip4,
'port': local_port,
'probability': 30,
'vrf_id': 0}]
self.logger.error(ppp("Unexpected or invalid packet:", p))
raise
- sessions = self.vapi.nat44_user_session_dump(server.ip4n, 0)
+ sessions = self.vapi.nat44_user_session_dump(server.ip4, 0)
self.assertEqual(len(sessions), 1)
self.assertTrue(sessions[0].flags &
self.config_flags.NAT_IS_EXT_HOST_VALID)
self.config_flags.NAT_IS_EXT_HOST_VALID),
ext_host_address=sessions[0].ext_host_address,
ext_host_port=sessions[0].ext_host_port)
- sessions = self.vapi.nat44_user_session_dump(server.ip4n, 0)
+ sessions = self.vapi.nat44_user_session_dump(server.ip4, 0)
self.assertEqual(len(sessions), 0)
@unittest.skipUnless(running_extended_tests, "part of extended tests")
server2 = self.pg0.remote_hosts[1]
server3 = self.pg0.remote_hosts[2]
- locals = [{'addr': server1.ip4n,
+ locals = [{'addr': server1.ip4,
'port': local_port,
'probability': 90,
'vrf_id': 0},
- {'addr': server2.ip4n,
+ {'addr': server2.ip4,
'port': local_port,
'probability': 10,
'vrf_id': 0}]
self.assertGreater(server1_n, server2_n)
local = {
- 'addr': server3.ip4n,
+ 'addr': server3.ip4,
'port': local_port,
'probability': 20,
'vrf_id': 0
self.assertGreater(server3_n, 0)
local = {
- 'addr': server2.ip4n,
+ 'addr': server2.ip4,
'port': local_port,
'probability': 10,
'vrf_id': 0
server1 = self.pg0.remote_hosts[0]
server2 = self.pg0.remote_hosts[1]
- locals = [{'addr': server1.ip4n,
+ locals = [{'addr': server1.ip4,
'port': local_port,
'probability': 70,
'vrf_id': 0},
- {'addr': server2.ip4n,
+ {'addr': server2.ip4,
'port': local_port,
'probability': 30,
'vrf_id': 0}]
server1 = self.pg0.remote_hosts[0]
server2 = self.pg0.remote_hosts[1]
- locals = [{'addr': server1.ip4n,
+ locals = [{'addr': server1.ip4,
'port': local_port,
'probability': 50,
'vrf_id': 0},
- {'addr': server2.ip4n,
+ {'addr': server2.ip4,
'port': local_port,
'probability': 50,
'vrf_id': 0}]
self.nat44_add_address(self.nat_addr)
flags = self.config_flags.NAT_IS_ADDR_ONLY
self.vapi.nat44_add_del_identity_mapping(
- ip_address=self.pg1.remote_ip4n, sw_if_index=0xFFFFFFFF,
+ ip_address=self.pg1.remote_ip4, sw_if_index=0xFFFFFFFF,
flags=flags, is_add=1)
flags = self.config_flags.NAT_IS_OUT2IN_ONLY
self.nat44_add_static_mapping(self.pg0.remote_ip4, external_addr,
proto=IP_PROTOS.tcp,
flags=flags)
else:
- locals = [{'addr': server1.ip4n,
+ locals = [{'addr': server1.ip4,
'port': port_in1,
'probability': 50,
'vrf_id': 0},
- {'addr': server2.ip4n,
+ {'addr': server2.ip4,
'port': port_in2,
'probability': 50,
'vrf_id': 0}]
raise
if eh_translate:
- sessions = self.vapi.nat44_user_session_dump(server.ip4n, 0)
+ sessions = self.vapi.nat44_user_session_dump(server.ip4, 0)
self.assertEqual(len(sessions), 1)
self.assertTrue(sessions[0].flags &
self.config_flags.NAT_IS_EXT_HOST_VALID)
self.config_flags.NAT_IS_EXT_HOST_VALID),
ext_host_address=sessions[0].ext_host_nat_address,
ext_host_port=sessions[0].ext_host_nat_port)
- sessions = self.vapi.nat44_user_session_dump(server.ip4n, 0)
+ sessions = self.vapi.nat44_user_session_dump(server.ip4, 0)
self.assertEqual(len(sessions), 0)
def test_twice_nat(self):
80,
proto=IP_PROTOS.tcp,
flags=flags)
- sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
+ sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
start_sessnum = len(sessions)
# SYN packet out->in
self.pg_start()
self.pg1.get_capture(1)
- sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n,
+ sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4,
0)
self.assertEqual(len(sessions) - start_sessnum, 0)
sw_if_index=self.pg1.sw_if_index,
is_add=1)
- sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
+ sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
start_sessnum = len(sessions)
self.initiate_tcp_session(self.pg0, self.pg1)
self.pg_start()
self.pg1.get_capture(1)
- sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n,
+ sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4,
0)
self.assertEqual(len(sessions) - start_sessnum, 0)
sw_if_index=self.pg1.sw_if_index,
is_add=1)
- sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
+ sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
start_sessnum = len(sessions)
self.initiate_tcp_session(self.pg0, self.pg1)
self.pg_start()
self.pg0.get_capture(1)
- sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n,
+ sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4,
0)
self.assertEqual(len(sessions) - start_sessnum, 0)
sw_if_index=self.pg1.sw_if_index,
is_add=1)
- sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
+ sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
start_sessnum = len(sessions)
self.initiate_tcp_session(self.pg0, self.pg1)
self.pg_start()
self.pg0.get_capture(1)
- sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n,
+ sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4,
0)
self.assertEqual(len(sessions) - start_sessnum, 0)
self.vapi.nat44_interface_add_del_feature(
sw_if_index=self.pg1.sw_if_index,
is_add=1)
- self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
- src_address=self.pg2.local_ip4n,
+ self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4,
+ src_address=self.pg2.local_ip4,
path_mtu=512,
template_interval=10)
self.vapi.nat_set_timeouts(udp=5, tcp_established=7440,
self.verify_ipfix_max_entries_per_user(
data,
nat44_config.max_translations_per_user,
- self.pg0.remote_ip4n)
+ self.pg0.remote_ip4)
sleep(6)
p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
""" Test syslog session creation and deletion """
self.vapi.syslog_set_filter(
self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_INFO)
- self.vapi.syslog_set_sender(self.pg2.local_ip4n, self.pg2.remote_ip4n)
+ self.vapi.syslog_set_sender(self.pg2.local_ip4, self.pg2.remote_ip4)
self.nat44_add_address(self.nat_addr)
flags = self.config_flags.NAT_IS_INSIDE
self.vapi.nat44_interface_add_del_feature(
nat_ip = "10.0.0.10"
- self.vapi.nat_det_add_del_map(is_add=1, in_addr=self.pg0.remote_ip4n,
+ self.vapi.nat_det_add_del_map(is_add=1, in_addr=self.pg0.remote_ip4,
in_plen=32,
out_addr=socket.inet_aton(nat_ip),
out_plen=32)
self.verify_capture_in(capture, self.pg0)
# session dump test
- sessions = self.vapi.nat_det_session_dump(self.pg0.remote_ip4n)
+ sessions = self.vapi.nat_det_session_dump(self.pg0.remote_ip4)
self.assertEqual(len(sessions), 3)
# TCP session
host0 = self.pg0.remote_hosts[0]
host1 = self.pg0.remote_hosts[1]
- self.vapi.nat_det_add_del_map(is_add=1, in_addr=host0.ip4n, in_plen=24,
+ self.vapi.nat_det_add_del_map(is_add=1, in_addr=host0.ip4, in_plen=24,
out_addr=socket.inet_aton(nat_ip),
out_plen=32)
flags = self.config_flags.NAT_IS_INSIDE
# session close api test
self.vapi.nat_det_close_session_out(socket.inet_aton(nat_ip),
port_out1,
- self.pg1.remote_ip4n,
+ self.pg1.remote_ip4,
external_port)
dms = self.vapi.nat_det_map_dump()
self.assertEqual(dms[0].ses_num, 1)
- self.vapi.nat_det_close_session_in(host0.ip4n,
+ self.vapi.nat_det_close_session_in(host0.ip4,
port_in,
- self.pg1.remote_ip4n,
+ self.pg1.remote_ip4,
external_port)
dms = self.vapi.nat_det_map_dump()
self.assertEqual(dms[0].ses_num, 0)
def test_tcp_session_close_detection_in(self):
""" Deterministic NAT TCP session close from inside network """
- self.vapi.nat_det_add_del_map(is_add=1, in_addr=self.pg0.remote_ip4n,
+ self.vapi.nat_det_add_del_map(is_add=1, in_addr=self.pg0.remote_ip4,
in_plen=32,
out_addr=socket.inet_aton(self.nat_addr),
out_plen=32)
def test_tcp_session_close_detection_out(self):
""" Deterministic NAT TCP session close from outside network """
- self.vapi.nat_det_add_del_map(is_add=1, in_addr=self.pg0.remote_ip4n,
+ self.vapi.nat_det_add_del_map(is_add=1, in_addr=self.pg0.remote_ip4,
in_plen=32,
out_addr=socket.inet_aton(self.nat_addr),
out_plen=32)
@unittest.skipUnless(running_extended_tests, "part of extended tests")
def test_session_timeout(self):
""" Deterministic NAT session timeouts """
- self.vapi.nat_det_add_del_map(is_add=1, in_addr=self.pg0.remote_ip4n,
+ self.vapi.nat_det_add_del_map(is_add=1, in_addr=self.pg0.remote_ip4,
in_plen=32,
out_addr=socket.inet_aton(self.nat_addr),
out_plen=32)
@unittest.skipUnless(running_extended_tests, "part of extended tests")
def test_session_limit_per_user(self):
""" Deterministic NAT maximum sessions per user limit """
- self.vapi.nat_det_add_del_map(is_add=1, in_addr=self.pg0.remote_ip4n,
+ self.vapi.nat_det_add_del_map(is_add=1, in_addr=self.pg0.remote_ip4,
in_plen=32,
out_addr=socket.inet_aton(self.nat_addr),
out_plen=32)
self.vapi.nat44_interface_add_del_feature(
sw_if_index=self.pg1.sw_if_index,
is_add=1)
- self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
- src_address=self.pg2.local_ip4n,
+ self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4,
+ src_address=self.pg2.local_ip4,
path_mtu=512,
template_interval=10)
self.vapi.nat_ipfix_enable_disable(domain_id=1, src_port=4739,
data = ipfix.decode_data_set(p.getlayer(Set))
self.verify_ipfix_max_entries_per_user(data,
1000,
- self.pg0.remote_ip4n)
+ self.pg0.remote_ip4)
def clear_nat_det(self):
"""
cls.ip6_interfaces.append(cls.pg_interfaces[2])
cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
- cls.vapi.ip_table_add_del(is_ipv6=1, is_add=1,
- table_id=cls.vrf1_id)
+ cls.vapi.ip_table_add_del(is_add=1,
+ table={'table_id': cls.vrf1_id,
+ 'is_ip6': 1})
cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
err = self.statistics.get_err_counter('/err/nat64-out2in/TCP packets')
- self.assertEqual(err - tcpn, 1)
+ self.assertEqual(err - tcpn, 2)
err = self.statistics.get_err_counter('/err/nat64-out2in/UDP packets')
self.assertEqual(err - udpn, 1)
err = self.statistics.get_err_counter('/err/nat64-out2in/ICMP packets')
self.assertEqual(err - icmpn, 1)
err = self.statistics.get_err_counter(
'/err/nat64-out2in/good out2in packets processed')
- self.assertEqual(err - totaln, 3)
+ self.assertEqual(err - totaln, 4)
bibs = self.statistics.get_counter('/nat64/total-bibs')
self.assertEqual(bibs[0][0], 3)
prefix = self.vapi.nat64_prefix_dump()
self.assertEqual(len(prefix), 1)
- self.assertEqual(prefix[0].prefix,
- IPv6Network(unicode(global_pref64_str)))
+ self.assertEqual(str(prefix[0].prefix), global_pref64_str)
self.assertEqual(prefix[0].vrf_id, 0)
# Add tenant specific prefix
def test_reass_hairpinning(self):
""" NAT64 fragments hairpinning """
- data = 'a' * 200
+ data = b'a' * 200
server = self.pg0.remote_hosts[1]
server_in_port = random.randint(1025, 65535)
server_out_port = random.randint(1025, 65535)
self.pg_start()
self.pg1.get_capture(max_sessions)
- self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
- src_address=self.pg3.local_ip4n,
+ self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4,
+ src_address=self.pg3.local_ip4,
path_mtu=512,
template_interval=10)
self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
sw_if_index=self.pg1.sw_if_index)
self.vapi.nat_set_reass(timeout=2, max_reass=1024, max_frag=1,
drop_frag=0, is_ip6=1)
- self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
- src_address=self.pg3.local_ip4n,
+ self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4,
+ src_address=self.pg3.local_ip4,
path_mtu=512,
template_interval=10)
self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
sw_if_index=self.pg0.sw_if_index)
self.vapi.nat64_add_del_interface(is_add=1, flags=0,
sw_if_index=self.pg1.sw_if_index)
- self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
- src_address=self.pg3.local_ip4n,
+ self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4,
+ src_address=self.pg3.local_ip4,
path_mtu=512,
template_interval=10)
self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
sw_if_index=self.pg1.sw_if_index)
self.vapi.syslog_set_filter(
self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_INFO)
- self.vapi.syslog_set_sender(self.pg3.local_ip4n, self.pg3.remote_ip4n)
+ self.vapi.syslog_set_sender(self.pg3.local_ip4, self.pg3.remote_ip4)
p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
aftr_ip4 = '192.0.0.1'
aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
self.vapi.dslite_set_aftr_addr(ip4_addr=aftr_ip4, ip6_addr=aftr_ip6)
- self.vapi.syslog_set_sender(self.pg2.local_ip4n, self.pg2.remote_ip4n)
+ self.vapi.syslog_set_sender(self.pg2.local_ip4, self.pg2.remote_ip4)
# UDP
p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /