from vpp_papi import VppEnum
from vpp_ip_route import VppIpRoute, VppRoutePath, FibPathType
from vpp_neighbor import VppNeighbor
from vpp_papi import VppEnum
from vpp_ip_route import VppIpRoute, VppRoutePath, FibPathType
from vpp_neighbor import VppNeighbor
from scapy.all import bind_layers, Packet, ByteEnumField, ShortField, \
IPField, IntField, LongField, XByteField, FlagsField, FieldLenField, \
PacketListField
from scapy.all import bind_layers, Packet, ByteEnumField, ShortField, \
IPField, IntField, LongField, XByteField, FlagsField, FieldLenField, \
PacketListField
p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
TCP(sport=self.tcp_port_in, dport=20))
p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
TCP(sport=self.tcp_port_in, dport=20))
p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
TCP(dport=tcp_port, sport=20))
p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
TCP(dport=tcp_port, sport=20))
def verify_syslog_apmap(self, data, is_add=True):
message = data.decode('utf-8')
def verify_syslog_apmap(self, data, is_add=True):
message = data.decode('utf-8')
- cls.vapi.ip_table_add_del(is_add=1, table_id=10)
- cls.vapi.ip_table_add_del(is_add=1, table_id=20)
+ cls.vapi.ip_table_add_del(is_add=1, table={'table_id': 10})
+ cls.vapi.ip_table_add_del(is_add=1, table={'table_id': 20})
cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
cls.pg6.set_table_ip4(20)
for i in cls.overlapping_interfaces:
cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
cls.pg6.set_table_ip4(20)
for i in cls.overlapping_interfaces:
cls.pg9.config_ip4()
cls.vapi.sw_interface_add_del_address(
sw_if_index=cls.pg9.sw_if_index,
cls.pg9.config_ip4()
cls.vapi.sw_interface_add_del_address(
sw_if_index=cls.pg9.sw_if_index,
err = self.statistics.get_err_counter(
'/err/nat44-in2out-slowpath/UDP packets')
self.assertEqual(err - udpn, 1)
err = self.statistics.get_err_counter(
'/err/nat44-in2out-slowpath/UDP packets')
self.assertEqual(err - udpn, 1)
self.assertEqual(err - icmpn, 1)
err = self.statistics.get_err_counter(
'/err/nat44-in2out-slowpath/good in2out packets processed')
self.assertEqual(err - icmpn, 1)
err = self.statistics.get_err_counter(
'/err/nat44-in2out-slowpath/good in2out packets processed')
self.verify_capture_in(capture, self.pg0)
err = self.statistics.get_err_counter('/err/nat44-out2in/TCP packets')
self.verify_capture_in(capture, self.pg0)
err = self.statistics.get_err_counter('/err/nat44-out2in/TCP packets')
err = self.statistics.get_err_counter('/err/nat44-out2in/UDP packets')
self.assertEqual(err - udpn, 1)
err = self.statistics.get_err_counter('/err/nat44-out2in/ICMP packets')
self.assertEqual(err - icmpn, 1)
err = self.statistics.get_err_counter(
'/err/nat44-out2in/good out2in packets processed')
err = self.statistics.get_err_counter('/err/nat44-out2in/UDP packets')
self.assertEqual(err - udpn, 1)
err = self.statistics.get_err_counter('/err/nat44-out2in/ICMP packets')
self.assertEqual(err - icmpn, 1)
err = self.statistics.get_err_counter(
'/err/nat44-out2in/good out2in packets processed')
users = self.statistics.get_counter('/nat44/total-users')
self.assertEqual(users[0][0], 1)
users = self.statistics.get_counter('/nat44/total-users')
self.assertEqual(users[0][0], 1)
alias_ip = self.nat_addr
flags = self.config_flags.NAT_IS_ADDR_ONLY
self.vapi.nat44_add_del_static_mapping(is_add=1,
alias_ip = self.nat_addr
flags = self.config_flags.NAT_IS_ADDR_ONLY
self.vapi.nat44_add_del_static_mapping(is_add=1,
self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip, tag=tag)
flags = self.config_flags.NAT_IS_INSIDE
self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip, tag=tag)
flags = self.config_flags.NAT_IS_INSIDE
is_add=1)
sm = self.vapi.nat44_static_mapping_dump()
self.assertEqual(len(sm), 1)
is_add=1)
sm = self.vapi.nat44_static_mapping_dump()
self.assertEqual(len(sm), 1)
self.assertEqual(len(sessions), 0)
pkts = self.create_stream_in(self.pg0, self.pg1)
self.pg0.add_stream(pkts)
self.assertEqual(len(sessions), 0)
pkts = self.create_stream_in(self.pg0, self.pg1)
self.pg0.add_stream(pkts)
self.assertEqual(len(sessions), 0)
flags = self.config_flags.NAT_IS_ADDR_ONLY
self.vapi.nat44_add_del_identity_mapping(
self.assertEqual(len(sessions), 0)
flags = self.config_flags.NAT_IS_ADDR_ONLY
self.vapi.nat44_add_del_identity_mapping(
flags=flags, vrf_id=1, is_add=1)
identity_mappings = self.vapi.nat44_identity_mapping_dump()
self.assertEqual(len(identity_mappings), 2)
flags=flags, vrf_id=1, is_add=1)
identity_mappings = self.vapi.nat44_identity_mapping_dump()
self.assertEqual(len(identity_mappings), 2)
# pg5 session dump
addresses = self.vapi.nat44_address_dump()
self.assertEqual(len(addresses), 1)
# pg5 session dump
addresses = self.vapi.nat44_address_dump()
self.assertEqual(len(addresses), 1)
self.assertEqual(len(sessions), 3)
for session in sessions:
self.assertFalse(session.flags & self.config_flags.NAT_IS_STATIC)
self.assertEqual(len(sessions), 3)
for session in sessions:
self.assertFalse(session.flags & self.config_flags.NAT_IS_STATIC)
self.assertGreaterEqual(len(sessions), 4)
for session in sessions:
self.assertFalse(session.flags & self.config_flags.NAT_IS_STATIC)
self.assertGreaterEqual(len(sessions), 4)
for session in sessions:
self.assertFalse(session.flags & self.config_flags.NAT_IS_STATIC)
self.assertGreaterEqual(len(sessions), 3)
for session in sessions:
self.assertTrue(session.flags & self.config_flags.NAT_IS_STATIC)
self.assertGreaterEqual(len(sessions), 3)
for session in sessions:
self.assertTrue(session.flags & self.config_flags.NAT_IS_STATIC)
self.assertEqual(user.nsessions,
nat44_config.max_translations_per_user)
self.assertEqual(user.nstaticsessions, 0)
self.assertEqual(user.nsessions,
nat44_config.max_translations_per_user)
self.assertEqual(user.nstaticsessions, 0)
self.assertEqual(user.nsessions,
nat44_config.max_translations_per_user - 1)
self.assertEqual(user.nstaticsessions, 1)
self.assertEqual(user.nsessions,
nat44_config.max_translations_per_user - 1)
self.assertEqual(user.nstaticsessions, 1)
self.assertEqual(1, len(static_mappings))
self.assertEqual(self.pg7.sw_if_index,
static_mappings[0].external_sw_if_index)
self.assertEqual(1, len(static_mappings))
self.assertEqual(self.pg7.sw_if_index,
static_mappings[0].external_sw_if_index)
if sm.external_sw_if_index == 0xFFFFFFFF:
self.assertEqual(str(sm.external_ip_address),
self.pg7.local_ip4)
if sm.external_sw_if_index == 0xFFFFFFFF:
self.assertEqual(str(sm.external_ip_address),
self.pg7.local_ip4)
self.assertEqual(1, len(static_mappings))
self.assertEqual(self.pg7.sw_if_index,
static_mappings[0].external_sw_if_index)
self.assertEqual(1, len(static_mappings))
self.assertEqual(self.pg7.sw_if_index,
static_mappings[0].external_sw_if_index)
if sm.external_sw_if_index == 0xFFFFFFFF:
self.assertEqual(str(sm.external_ip_address),
self.pg7.local_ip4)
if sm.external_sw_if_index == 0xFFFFFFFF:
self.assertEqual(str(sm.external_ip_address),
self.pg7.local_ip4)
self.vapi.nat44_interface_add_del_feature(
sw_if_index=self.pg1.sw_if_index,
is_add=1)
self.vapi.nat44_interface_add_del_feature(
sw_if_index=self.pg1.sw_if_index,
is_add=1)
- self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
- src_address=self.pg3.local_ip4n,
+ self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4,
+ src_address=self.pg3.local_ip4,
self.vapi.nat44_interface_add_del_feature(
sw_if_index=self.pg1.sw_if_index,
is_add=1)
self.vapi.nat44_interface_add_del_feature(
sw_if_index=self.pg1.sw_if_index,
is_add=1)
- self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
- src_address=self.pg3.local_ip4n,
+ self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4,
+ src_address=self.pg3.local_ip4,
- self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
- src_address=self.pg3.local_ip4n,
+ self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4,
+ src_address=self.pg3.local_ip4,
""" Test syslog address and port mapping creation and deletion """
self.vapi.syslog_set_filter(
self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_INFO)
""" Test syslog address and port mapping creation and deletion """
self.vapi.syslog_set_filter(
self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_INFO)
self.nat44_add_address(self.nat_addr)
flags = self.config_flags.NAT_IS_INSIDE
self.vapi.nat44_interface_add_del_feature(
self.nat44_add_address(self.nat_addr)
flags = self.config_flags.NAT_IS_INSIDE
self.vapi.nat44_interface_add_del_feature(
- self.vapi.ip_table_add_del(is_add=1, table_id=vrf_id1)
- self.vapi.ip_table_add_del(is_add=1, table_id=vrf_id2)
+ self.vapi.ip_table_add_del(is_add=1, table={'table_id': vrf_id1})
+ self.vapi.ip_table_add_del(is_add=1, table={'table_id': vrf_id2})
- self.vapi.ip_table_add_del(is_add=0, table_id=vrf_id1)
- self.vapi.ip_table_add_del(is_add=0, table_id=vrf_id2)
+ self.vapi.ip_table_add_del(is_add=0, table={'table_id': vrf_id1})
+ self.vapi.ip_table_add_del(is_add=0, table={'table_id': vrf_id2})
nsessions = len(sessions)
self.vapi.nat44_del_session(address=sessions[0].inside_ip_address,
nsessions = len(sessions)
self.vapi.nat44_del_session(address=sessions[0].inside_ip_address,
port=sessions[1].outside_port,
protocol=sessions[1].protocol)
port=sessions[1].outside_port,
protocol=sessions[1].protocol)
self.assertEqual(nsessions - len(sessions), 2)
self.vapi.nat44_del_session(address=sessions[0].inside_ip_address,
self.assertEqual(nsessions - len(sessions), 2)
self.vapi.nat44_del_session(address=sessions[0].inside_ip_address,
is_add=1)
self.vapi.nat_set_reass(timeout=2, max_reass=1024, max_frag=1,
drop_frag=0)
is_add=1)
self.vapi.nat_set_reass(timeout=2, max_reass=1024, max_frag=1,
drop_frag=0)
- self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
- src_address=self.pg3.local_ip4n,
+ self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4,
+ src_address=self.pg3.local_ip4,
- self.vapi.ip_table_add_del(is_add=1, table_id=vrf_id1)
- self.vapi.ip_table_add_del(is_add=1, table_id=vrf_id2)
+ self.vapi.ip_table_add_del(is_add=1, table={'table_id': vrf_id1})
+ self.vapi.ip_table_add_del(is_add=1, table={'table_id': vrf_id2})
cls.pg4.config_ip4()
cls.vapi.sw_interface_add_del_address(
sw_if_index=cls.pg4.sw_if_index,
cls.pg4.config_ip4()
cls.vapi.sw_interface_add_del_address(
sw_if_index=cls.pg4.sw_if_index,
- zero_ip4n = socket.inet_pton(socket.AF_INET, "0.0.0.0")
- cls.vapi.ip_table_add_del(is_add=1, table_id=1)
+ zero_ip4 = socket.inet_pton(socket.AF_INET, "0.0.0.0")
+ cls.vapi.ip_table_add_del(is_add=1, table={'table_id': 1})
err = self.statistics.get_err_counter(
'/err/nat44-ed-in2out-slowpath/UDP packets')
self.assertEqual(err - udpn, 1)
err = self.statistics.get_err_counter(
'/err/nat44-ed-in2out-slowpath/UDP packets')
self.assertEqual(err - udpn, 1)
self.assertEqual(err - icmpn, 1)
err = self.statistics.get_err_counter(
'/err/nat44-ed-in2out-slowpath/good in2out packets processed')
self.assertEqual(err - icmpn, 1)
err = self.statistics.get_err_counter(
'/err/nat44-ed-in2out-slowpath/good in2out packets processed')
err = self.statistics.get_err_counter(
'/err/nat44-ed-out2in/UDP packets')
self.assertEqual(err - udpn, 1)
err = self.statistics.get_err_counter(
'/err/nat44-ed-out2in/UDP packets')
self.assertEqual(err - udpn, 1)
self.assertEqual(err - icmpn, 1)
err = self.statistics.get_err_counter(
'/err/nat44-ed-out2in/good out2in packets processed')
self.assertEqual(err - icmpn, 1)
err = self.statistics.get_err_counter(
'/err/nat44-ed-out2in/good out2in packets processed')
users = self.statistics.get_counter('/nat44/total-users')
self.assertEqual(users[0][0], 1)
users = self.statistics.get_counter('/nat44/total-users')
self.assertEqual(users[0][0], 1)
err = self.statistics.get_err_counter(
'/err/nat44-ed-in2out-slowpath/UDP packets')
self.assertEqual(err - udpn, 1)
err = self.statistics.get_err_counter(
'/err/nat44-ed-in2out-slowpath/UDP packets')
self.assertEqual(err - udpn, 1)
self.assertEqual(err - icmpn, 1)
err = self.statistics.get_err_counter(
'/err/nat44-ed-in2out-slowpath/good in2out packets processed')
self.assertEqual(err - icmpn, 1)
err = self.statistics.get_err_counter(
'/err/nat44-ed-in2out-slowpath/good in2out packets processed')
err = self.statistics.get_err_counter(
'/err/nat44-ed-out2in/UDP packets')
self.assertEqual(err - udpn, 1)
err = self.statistics.get_err_counter(
'/err/nat44-ed-out2in/UDP packets')
self.assertEqual(err - udpn, 1)
self.assertEqual(err - icmpn, 1)
err = self.statistics.get_err_counter(
'/err/nat44-ed-out2in/good out2in packets processed')
self.assertEqual(err - icmpn, 1)
err = self.statistics.get_err_counter(
'/err/nat44-ed-out2in/good out2in packets processed')
users = self.statistics.get_counter('/nat44/total-users')
self.assertEqual(users[0][0], 1)
users = self.statistics.get_counter('/nat44/total-users')
self.assertEqual(users[0][0], 1)
self.assertEqual(len(sessions), 3)
self.assertTrue(sessions[0].flags &
self.config_flags.NAT_IS_EXT_HOST_VALID)
self.assertEqual(len(sessions), 3)
self.assertTrue(sessions[0].flags &
self.config_flags.NAT_IS_EXT_HOST_VALID)
self.config_flags.NAT_IS_EXT_HOST_VALID),
ext_host_address=sessions[0].ext_host_address,
ext_host_port=sessions[0].ext_host_port)
self.config_flags.NAT_IS_EXT_HOST_VALID),
ext_host_address=sessions[0].ext_host_address,
ext_host_port=sessions[0].ext_host_port)
self.assertEqual(len(sessions), 1)
self.assertTrue(sessions[0].flags &
self.config_flags.NAT_IS_EXT_HOST_VALID)
self.assertEqual(len(sessions), 1)
self.assertTrue(sessions[0].flags &
self.config_flags.NAT_IS_EXT_HOST_VALID)
self.config_flags.NAT_IS_EXT_HOST_VALID),
ext_host_address=sessions[0].ext_host_address,
ext_host_port=sessions[0].ext_host_port)
self.config_flags.NAT_IS_EXT_HOST_VALID),
ext_host_address=sessions[0].ext_host_address,
ext_host_port=sessions[0].ext_host_port)
self.assertEqual(len(sessions), 0)
@unittest.skipUnless(running_extended_tests, "part of extended tests")
self.assertEqual(len(sessions), 0)
@unittest.skipUnless(running_extended_tests, "part of extended tests")
self.nat44_add_address(self.nat_addr)
flags = self.config_flags.NAT_IS_ADDR_ONLY
self.vapi.nat44_add_del_identity_mapping(
self.nat44_add_address(self.nat_addr)
flags = self.config_flags.NAT_IS_ADDR_ONLY
self.vapi.nat44_add_del_identity_mapping(
flags=flags, is_add=1)
flags = self.config_flags.NAT_IS_OUT2IN_ONLY
self.nat44_add_static_mapping(self.pg0.remote_ip4, external_addr,
flags=flags, is_add=1)
flags = self.config_flags.NAT_IS_OUT2IN_ONLY
self.nat44_add_static_mapping(self.pg0.remote_ip4, external_addr,
self.assertEqual(len(sessions), 1)
self.assertTrue(sessions[0].flags &
self.config_flags.NAT_IS_EXT_HOST_VALID)
self.assertEqual(len(sessions), 1)
self.assertTrue(sessions[0].flags &
self.config_flags.NAT_IS_EXT_HOST_VALID)
self.config_flags.NAT_IS_EXT_HOST_VALID),
ext_host_address=sessions[0].ext_host_nat_address,
ext_host_port=sessions[0].ext_host_nat_port)
self.config_flags.NAT_IS_EXT_HOST_VALID),
ext_host_address=sessions[0].ext_host_nat_address,
ext_host_port=sessions[0].ext_host_nat_port)
self.assertEqual(len(sessions), 0)
def test_twice_nat(self):
self.assertEqual(len(sessions), 0)
def test_twice_nat(self):
start_sessnum = len(sessions)
self.initiate_tcp_session(self.pg0, self.pg1)
start_sessnum = len(sessions)
self.initiate_tcp_session(self.pg0, self.pg1)
start_sessnum = len(sessions)
self.initiate_tcp_session(self.pg0, self.pg1)
start_sessnum = len(sessions)
self.initiate_tcp_session(self.pg0, self.pg1)
start_sessnum = len(sessions)
self.initiate_tcp_session(self.pg0, self.pg1)
start_sessnum = len(sessions)
self.initiate_tcp_session(self.pg0, self.pg1)
self.vapi.nat44_interface_add_del_feature(
sw_if_index=self.pg1.sw_if_index,
is_add=1)
self.vapi.nat44_interface_add_del_feature(
sw_if_index=self.pg1.sw_if_index,
is_add=1)
- self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
- src_address=self.pg2.local_ip4n,
+ self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4,
+ src_address=self.pg2.local_ip4,
self.verify_ipfix_max_entries_per_user(
data,
nat44_config.max_translations_per_user,
self.verify_ipfix_max_entries_per_user(
data,
nat44_config.max_translations_per_user,
""" Test syslog session creation and deletion """
self.vapi.syslog_set_filter(
self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_INFO)
""" Test syslog session creation and deletion """
self.vapi.syslog_set_filter(
self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_INFO)
self.nat44_add_address(self.nat_addr)
flags = self.config_flags.NAT_IS_INSIDE
self.vapi.nat44_interface_add_del_feature(
self.nat44_add_address(self.nat_addr)
flags = self.config_flags.NAT_IS_INSIDE
self.vapi.nat44_interface_add_del_feature(
- self.vapi.nat_det_add_del_map(is_add=1, in_addr=host0.ip4n, in_plen=24,
+ self.vapi.nat_det_add_del_map(is_add=1, in_addr=host0.ip4, in_plen=24,
out_addr=socket.inet_aton(nat_ip),
out_plen=32)
flags = self.config_flags.NAT_IS_INSIDE
out_addr=socket.inet_aton(nat_ip),
out_plen=32)
flags = self.config_flags.NAT_IS_INSIDE
external_port)
dms = self.vapi.nat_det_map_dump()
self.assertEqual(dms[0].ses_num, 1)
external_port)
dms = self.vapi.nat_det_map_dump()
self.assertEqual(dms[0].ses_num, 1)
external_port)
dms = self.vapi.nat_det_map_dump()
self.assertEqual(dms[0].ses_num, 0)
def test_tcp_session_close_detection_in(self):
""" Deterministic NAT TCP session close from inside network """
external_port)
dms = self.vapi.nat_det_map_dump()
self.assertEqual(dms[0].ses_num, 0)
def test_tcp_session_close_detection_in(self):
""" Deterministic NAT TCP session close from inside network """
def test_tcp_session_close_detection_out(self):
""" Deterministic NAT TCP session close from outside network """
def test_tcp_session_close_detection_out(self):
""" Deterministic NAT TCP session close from outside network """
@unittest.skipUnless(running_extended_tests, "part of extended tests")
def test_session_timeout(self):
""" Deterministic NAT session timeouts """
@unittest.skipUnless(running_extended_tests, "part of extended tests")
def test_session_timeout(self):
""" Deterministic NAT session timeouts """
@unittest.skipUnless(running_extended_tests, "part of extended tests")
def test_session_limit_per_user(self):
""" Deterministic NAT maximum sessions per user limit """
@unittest.skipUnless(running_extended_tests, "part of extended tests")
def test_session_limit_per_user(self):
""" Deterministic NAT maximum sessions per user limit """
self.vapi.nat44_interface_add_del_feature(
sw_if_index=self.pg1.sw_if_index,
is_add=1)
self.vapi.nat44_interface_add_del_feature(
sw_if_index=self.pg1.sw_if_index,
is_add=1)
- self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
- src_address=self.pg2.local_ip4n,
+ self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4,
+ src_address=self.pg2.local_ip4,
data = ipfix.decode_data_set(p.getlayer(Set))
self.verify_ipfix_max_entries_per_user(data,
1000,
data = ipfix.decode_data_set(p.getlayer(Set))
self.verify_ipfix_max_entries_per_user(data,
1000,
- cls.vapi.ip_table_add_del(is_ipv6=1, is_add=1,
- table_id=cls.vrf1_id)
+ cls.vapi.ip_table_add_del(is_add=1,
+ table={'table_id': cls.vrf1_id,
+ 'is_ip6': 1})
self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
err = self.statistics.get_err_counter('/err/nat64-out2in/TCP packets')
self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
err = self.statistics.get_err_counter('/err/nat64-out2in/TCP packets')
err = self.statistics.get_err_counter('/err/nat64-out2in/UDP packets')
self.assertEqual(err - udpn, 1)
err = self.statistics.get_err_counter('/err/nat64-out2in/ICMP packets')
self.assertEqual(err - icmpn, 1)
err = self.statistics.get_err_counter(
'/err/nat64-out2in/good out2in packets processed')
err = self.statistics.get_err_counter('/err/nat64-out2in/UDP packets')
self.assertEqual(err - udpn, 1)
err = self.statistics.get_err_counter('/err/nat64-out2in/ICMP packets')
self.assertEqual(err - icmpn, 1)
err = self.statistics.get_err_counter(
'/err/nat64-out2in/good out2in packets processed')
bibs = self.statistics.get_counter('/nat64/total-bibs')
self.assertEqual(bibs[0][0], 3)
bibs = self.statistics.get_counter('/nat64/total-bibs')
self.assertEqual(bibs[0][0], 3)
prefix = self.vapi.nat64_prefix_dump()
self.assertEqual(len(prefix), 1)
prefix = self.vapi.nat64_prefix_dump()
self.assertEqual(len(prefix), 1)
self.assertEqual(prefix[0].vrf_id, 0)
# Add tenant specific prefix
self.assertEqual(prefix[0].vrf_id, 0)
# Add tenant specific prefix
server = self.pg0.remote_hosts[1]
server_in_port = random.randint(1025, 65535)
server_out_port = random.randint(1025, 65535)
server = self.pg0.remote_hosts[1]
server_in_port = random.randint(1025, 65535)
server_out_port = random.randint(1025, 65535)
- self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
- src_address=self.pg3.local_ip4n,
+ self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4,
+ src_address=self.pg3.local_ip4,
sw_if_index=self.pg1.sw_if_index)
self.vapi.nat_set_reass(timeout=2, max_reass=1024, max_frag=1,
drop_frag=0, is_ip6=1)
sw_if_index=self.pg1.sw_if_index)
self.vapi.nat_set_reass(timeout=2, max_reass=1024, max_frag=1,
drop_frag=0, is_ip6=1)
- self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
- src_address=self.pg3.local_ip4n,
+ self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4,
+ src_address=self.pg3.local_ip4,
sw_if_index=self.pg0.sw_if_index)
self.vapi.nat64_add_del_interface(is_add=1, flags=0,
sw_if_index=self.pg1.sw_if_index)
sw_if_index=self.pg0.sw_if_index)
self.vapi.nat64_add_del_interface(is_add=1, flags=0,
sw_if_index=self.pg1.sw_if_index)
- self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
- src_address=self.pg3.local_ip4n,
+ self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4,
+ src_address=self.pg3.local_ip4,
sw_if_index=self.pg1.sw_if_index)
self.vapi.syslog_set_filter(
self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_INFO)
sw_if_index=self.pg1.sw_if_index)
self.vapi.syslog_set_filter(
self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_INFO)
p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
aftr_ip4 = '192.0.0.1'
aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
self.vapi.dslite_set_aftr_addr(ip4_addr=aftr_ip4, ip6_addr=aftr_ip6)
aftr_ip4 = '192.0.0.1'
aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
self.vapi.dslite_set_aftr_addr(ip4_addr=aftr_ip4, ip6_addr=aftr_ip6)