from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
from time import sleep
from util import ip4_range
-from vpp_mac import mactobinary
+from vpp_papi import mac_pton
from syslog_rfc5424_parser import SyslogMessage, ParseError
from syslog_rfc5424_parser.constants import SyslogFacility, SyslogSeverity
from vpp_papi_provider import SYSLOG_SEVERITY
from io import BytesIO
+from vpp_papi import VppEnum
class MethodHolder(VppTestCase):
is_add=0)
for intf in [self.pg7, self.pg8]:
- neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
- for n in neighbors:
- self.vapi.ip_neighbor_add_del(intf.sw_if_index,
- n.mac_address,
- n.ip_address,
- is_add=0)
+ self.vapi.ip_neighbor_add_del(
+ intf.sw_if_index,
+ intf.remote_mac,
+ intf.remote_ip4,
+ flags=(VppEnum.vl_api_ip_neighbor_flags_t.
+ IP_API_NEIGHBOR_FLAG_STATIC),
+ is_add=0)
if self.pg7.has_ip4_config:
self.pg7.unconfig_ip4()
""" Verify that there is no NAT44 user """
users = self.vapi.nat44_user_dump()
self.assertEqual(len(users), 0)
+ users = self.statistics.get_counter('/nat44/total-users')
+ self.assertEqual(users[0][0], 0)
+ sessions = self.statistics.get_counter('/nat44/total-sessions')
+ self.assertEqual(sessions[0][0], 0)
def verify_ipfix_max_entries_per_user(self, data, limit, src_addr):
"""
message = data.decode('utf-8')
try:
message = SyslogMessage.parse(message)
+ except ParseError as e:
+ self.logger.error(e)
+ raise
+ else:
self.assertEqual(message.severity, SyslogSeverity.info)
self.assertEqual(message.appname, 'NAT')
self.assertEqual(message.msgid, 'APMADD' if is_add else 'APMDEL')
self.assertEqual(sd_params.get('PROTO'), "%d" % IP_PROTOS.tcp)
self.assertTrue(sd_params.get('SSUBIX') is not None)
self.assertEqual(sd_params.get('SVLAN'), '0')
- except ParseError as e:
- self.logger.error(e)
def verify_syslog_sess(self, data, is_add=True, is_ip6=False):
message = data.decode('utf-8')
try:
message = SyslogMessage.parse(message)
+ except ParseError as e:
+ self.logger.error(e)
+ raise
+ else:
self.assertEqual(message.severity, SyslogSeverity.info)
self.assertEqual(message.appname, 'NAT')
self.assertEqual(message.msgid, 'SADD' if is_add else 'SDEL')
self.assertEqual(sd_params.get('XDADDR'), self.pg1.remote_ip4)
self.assertEqual(sd_params.get('XDPORT'),
"%d" % self.tcp_external_port)
- except ParseError as e:
- self.logger.error(e)
def verify_mss_value(self, pkt, mss):
"""
'/err/nat44-out2in/good out2in packets processed')
self.assertEqual(err - totaln, 3)
+ users = self.statistics.get_counter('/nat44/total-users')
+ self.assertEqual(users[0][0], 1)
+ sessions = self.statistics.get_counter('/nat44/total-sessions')
+ self.assertEqual(sessions[0][0], 3)
+
def test_dynamic_icmp_errors_in2out_ttl_1(self):
""" NAT44 handling of client packets with TTL=1 """
data = ipfix.decode_data_set(p.getlayer(Set))
self.verify_ipfix_addr_exhausted(data)
- @unittest.skipUnless(running_extended_tests(), "part of extended tests")
+ @unittest.skipUnless(running_extended_tests, "part of extended tests")
def test_ipfix_max_sessions(self):
""" IPFIX logging maximum session entries exceeded """
self.nat44_add_address(self.nat_addr)
def test_dynamic_ipless_interfaces(self):
""" NAT44 interfaces without configured IP address """
- self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
- mactobinary(self.pg7.remote_mac),
- self.pg7.remote_ip4n,
- is_static=1)
- self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
- mactobinary(self.pg8.remote_mac),
- self.pg8.remote_ip4n,
- is_static=1)
+ self.vapi.ip_neighbor_add_del(
+ self.pg7.sw_if_index,
+ self.pg7.remote_mac,
+ self.pg7.remote_ip4,
+ flags=(VppEnum.vl_api_ip_neighbor_flags_t.
+ IP_API_NEIGHBOR_FLAG_STATIC))
+ self.vapi.ip_neighbor_add_del(
+ self.pg8.sw_if_index,
+ self.pg8.remote_mac,
+ self.pg8.remote_ip4,
+ flags=(VppEnum.vl_api_ip_neighbor_flags_t.
+ IP_API_NEIGHBOR_FLAG_STATIC))
self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
dst_address_length=32,
def test_static_ipless_interfaces(self):
""" NAT44 interfaces without configured IP address - 1:1 NAT """
- self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
- mactobinary(self.pg7.remote_mac),
- self.pg7.remote_ip4n,
- is_static=1)
- self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
- mactobinary(self.pg8.remote_mac),
- self.pg8.remote_ip4n,
- is_static=1)
+ self.vapi.ip_neighbor_add_del(
+ self.pg7.sw_if_index,
+ self.pg7.remote_mac,
+ self.pg7.remote_ip4,
+ flags=(VppEnum.vl_api_ip_neighbor_flags_t.
+ IP_API_NEIGHBOR_FLAG_STATIC))
+ self.vapi.ip_neighbor_add_del(
+ self.pg8.sw_if_index,
+ self.pg8.remote_mac,
+ self.pg8.remote_ip4,
+ flags=(VppEnum.vl_api_ip_neighbor_flags_t.
+ IP_API_NEIGHBOR_FLAG_STATIC))
self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
dst_address_length=32,
self.udp_port_out = 30607
self.icmp_id_out = 30608
- self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
- mactobinary(self.pg7.remote_mac),
- self.pg7.remote_ip4n,
- is_static=1)
- self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
- mactobinary(self.pg8.remote_mac),
- self.pg8.remote_ip4n,
- is_static=1)
+ self.vapi.ip_neighbor_add_del(
+ self.pg7.sw_if_index,
+ self.pg7.remote_mac,
+ self.pg7.remote_ip4,
+ flags=(VppEnum.vl_api_ip_neighbor_flags_t.
+ IP_API_NEIGHBOR_FLAG_STATIC))
+ self.vapi.ip_neighbor_add_del(
+ self.pg8.sw_if_index,
+ self.pg8.remote_mac,
+ self.pg8.remote_ip4,
+ flags=(VppEnum.vl_api_ip_neighbor_flags_t.
+ IP_API_NEIGHBOR_FLAG_STATIC))
self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
dst_address_length=32,
self.pg1.resolve_arp()
self.pg2.resolve_arp()
- @unittest.skipUnless(running_extended_tests(), "part of extended tests")
+ @unittest.skipUnless(running_extended_tests, "part of extended tests")
def test_session_timeout(self):
""" NAT44 session timeouts """
self.nat44_add_address(self.nat_addr)
'/err/nat44-ed-out2in/good out2in packets processed')
self.assertEqual(err - totaln, 2)
+ users = self.statistics.get_counter('/nat44/total-users')
+ self.assertEqual(users[0][0], 1)
+ sessions = self.statistics.get_counter('/nat44/total-sessions')
+ self.assertEqual(sessions[0][0], 3)
+
def test_forwarding(self):
""" NAT44 forwarding test """
sessions = self.vapi.nat44_user_session_dump(server.ip4n, 0)
self.assertEqual(len(sessions), 0)
- @unittest.skipUnless(running_extended_tests(), "part of extended tests")
+ @unittest.skipUnless(running_extended_tests, "part of extended tests")
def test_static_lb_multi_clients(self):
""" NAT44 local service load balancing - multiple clients"""
local_port = 8080
server1 = self.pg0.remote_hosts[0]
server2 = self.pg0.remote_hosts[1]
+ server3 = self.pg0.remote_hosts[2]
locals = [{'addr': server1.ip4n,
'port': local_port,
server2_n += 1
self.assertGreater(server1_n, server2_n)
+ # add new back-end
+ self.vapi.nat44_lb_static_mapping_add_del_local(external_addr_n,
+ external_port,
+ server3.ip4n,
+ local_port,
+ IP_PROTOS.tcp,
+ 20)
+ server1_n = 0
+ server2_n = 0
+ server3_n = 0
+ clients = ip4_range(self.pg1.remote_ip4, 60, 110)
+ pkts = []
+ for client in clients:
+ p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
+ IP(src=client, dst=self.nat_addr) /
+ TCP(sport=12346, dport=external_port))
+ pkts.append(p)
+ self.assertGreater(len(pkts), 0)
+ self.pg1.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture(len(pkts))
+ for p in capture:
+ if p[IP].dst == server1.ip4:
+ server1_n += 1
+ elif p[IP].dst == server2.ip4:
+ server2_n += 1
+ else:
+ server3_n += 1
+ self.assertGreater(server1_n, 0)
+ self.assertGreater(server2_n, 0)
+ self.assertGreater(server3_n, 0)
+
+ # remove one back-end
+ self.vapi.nat44_lb_static_mapping_add_del_local(external_addr_n,
+ external_port,
+ server2.ip4n,
+ local_port,
+ IP_PROTOS.tcp,
+ 10,
+ is_add=0)
+ server1_n = 0
+ server2_n = 0
+ server3_n = 0
+ self.pg1.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture(len(pkts))
+ for p in capture:
+ if p[IP].dst == server1.ip4:
+ server1_n += 1
+ elif p[IP].dst == server2.ip4:
+ server2_n += 1
+ else:
+ server3_n += 1
+ self.assertGreater(server1_n, 0)
+ self.assertEqual(server2_n, 0)
+ self.assertGreater(server3_n, 0)
+
def test_static_lb_2(self):
""" NAT44 local service load balancing (asymmetrical rule) """
external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
self.logger.error(ppp("Unexpected or invalid packet:", p))
raise
- @unittest.skipUnless(running_extended_tests(), "part of extended tests")
+ @unittest.skipUnless(running_extended_tests, "part of extended tests")
def test_session_timeout(self):
""" NAT44 session timeouts """
self.nat44_add_address(self.nat_addr)
nsessions = nsessions + user.nsessions
self.assertLess(nsessions, 2 * max_sessions)
- @unittest.skipUnless(running_extended_tests(), "part of extended tests")
+ @unittest.skipUnless(running_extended_tests, "part of extended tests")
def test_session_rst_timeout(self):
""" NAT44 session RST timeouts """
self.nat44_add_address(self.nat_addr)
self.assertEqual(users[0].ip_address, self.pg0.remote_ip4n)
self.assertEqual(users[0].nsessions, 1)
- @unittest.skipUnless(running_extended_tests(), "part of extended tests")
+ @unittest.skipUnless(running_extended_tests, "part of extended tests")
def test_session_limit_per_user(self):
""" Maximum sessions per user limit """
self.nat44_add_address(self.nat_addr)
self.src_ip6_pfx_len = 96
self.vapi.map_add_domain(self.dst_ip6_pfx_n, self.dst_ip6_pfx_len,
self.src_ip6_pfx_n, self.src_ip6_pfx_len,
- '\x00\x00\x00\x00', 0, is_translation=1,
- is_rfc6052=1)
+ '\x00\x00\x00\x00', 0)
@unittest.skip('Temporary disabled')
def test_464xlat_ce(self):
self.logger.error("TCP session termination failed")
raise
- @unittest.skipUnless(running_extended_tests(), "part of extended tests")
+ @unittest.skipUnless(running_extended_tests, "part of extended tests")
def test_session_timeout(self):
""" Deterministic NAT session timeouts """
self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
dms = self.vapi.nat_det_map_dump()
self.assertEqual(0, dms[0].ses_num)
- @unittest.skipUnless(running_extended_tests(), "part of extended tests")
+ @unittest.skipUnless(running_extended_tests, "part of extended tests")
def test_session_limit_per_user(self):
""" Deterministic NAT maximum sessions per user limit """
self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
self.assertEqual(bibe.i_port, in_port)
self.assertEqual(bibe.o_port, out_port)
self.assertEqual(static_bib_num, 1)
+ bibs = self.statistics.get_counter('/nat64/total-bibs')
+ self.assertEqual(bibs[0][0], 1)
self.vapi.nat64_add_del_static_bib(in_addr,
out_addr,
if bibe.is_static:
static_bib_num += 1
self.assertEqual(static_bib_num, 0)
+ bibs = self.statistics.get_counter('/nat64/total-bibs')
+ self.assertEqual(bibs[0][0], 0)
def test_set_timeouts(self):
""" Set NAT64 timeouts """
'/err/nat64-out2in/good out2in packets processed')
self.assertEqual(err - totaln, 3)
+ bibs = self.statistics.get_counter('/nat64/total-bibs')
+ self.assertEqual(bibs[0][0], 3)
+ sessions = self.statistics.get_counter('/nat64/total-sessions')
+ self.assertEqual(sessions[0][0], 3)
+
# in2out
pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
self.pg0.add_stream(pkts)
self.assertEqual(ses_num_end - ses_num_start, 3)
- @unittest.skipUnless(running_extended_tests(), "part of extended tests")
+ @unittest.skipUnless(running_extended_tests, "part of extended tests")
def test_session_timeout(self):
""" NAT64 session timeout """
self.icmp_id_in = 1234
addresses = self.vapi.nat64_pool_addr_dump()
self.assertEqual(0, len(adresses))
- @unittest.skipUnless(running_extended_tests(), "part of extended tests")
+ @unittest.skipUnless(running_extended_tests, "part of extended tests")
def test_ipfix_max_bibs_sessions(self):
""" IPFIX logging maximum session and BIB entries exceeded """
max_bibs = 1280
vrf_id=prefix.vrf_id,
is_add=0)
+ bibs = self.statistics.get_counter('/nat64/total-bibs')
+ self.assertEqual(bibs[0][0], 0)
+ sessions = self.statistics.get_counter('/nat64/total-sessions')
+ self.assertEqual(sessions[0][0], 0)
+
def tearDown(self):
super(TestNAT64, self).tearDown()
if not self.vpp_dead:
message = data.decode('utf-8')
try:
message = SyslogMessage.parse(message)
+ except ParseError as e:
+ self.logger.error(e)
+ else:
self.assertEqual(message.severity, SyslogSeverity.info)
self.assertEqual(message.appname, 'NAT')
self.assertEqual(message.msgid, 'APMADD')
self.assertEqual(sd_params.get('PROTO'), "%d" % proto)
self.assertTrue(sd_params.get('SSUBIX') is not None)
self.assertEqual(sd_params.get('SV6ENC'), sv6enc)
- except ParseError as e:
- self.logger.error(e)
def test_dslite(self):
""" Test DS-Lite """
self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
self.assertTrue(capture.haslayer(ICMPv6EchoReply))
+ b4s = self.statistics.get_counter('/dslite/total-b4s')
+ self.assertEqual(b4s[0][0], 2)
+ sessions = self.statistics.get_counter('/dslite/total-sessions')
+ self.assertEqual(sessions[0][0], 3)
+
def tearDown(self):
super(TestDSlite, self).tearDown()
if not self.vpp_dead: