-#!/usr/bin/env python
+#!/usr/bin/env python3
import socket
import unittest
from vpp_papi import VppEnum
from vpp_ip_route import VppIpRoute, VppRoutePath, FibPathType
from vpp_neighbor import VppNeighbor
-from vpp_ip import VppIpAddress, VppIpPrefix
from scapy.all import bind_layers, Packet, ByteEnumField, ShortField, \
IPField, IntField, LongField, XByteField, FlagsField, FieldLenField, \
PacketListField
from ipaddress import IPv6Network
+from util import ppc, ppp
# NAT HA protocol event data
last_ip_address=addr.ip_address,
vrf_id=0xFFFFFFFF, flags=addr.flags)
- self.vapi.nat_set_reass(timeout=2, max_reass=1024, max_frag=5,
- drop_frag=0)
- self.vapi.nat_set_reass(timeout=2, max_reass=1024, max_frag=5,
- drop_frag=0, is_ip6=1)
self.verify_no_nat44_user()
self.vapi.nat_set_timeouts(udp=300, tcp_established=7440,
tcp_transitory=240, icmp=60)
p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
TCP(sport=self.tcp_port_in, dport=20))
- pkts.append(p)
+ pkts.extend([p, p])
# UDP
p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
TCP(dport=tcp_port, sport=20))
- pkts.append(p)
+ pkts.extend([p, p])
# UDP
p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
proto=frags[0][IP].proto)
if ip.proto == IP_PROTOS.tcp:
p = (ip / TCP(buffer.getvalue()))
+ self.logger.debug(ppp("Reassembled:", p))
self.assert_tcp_checksum_valid(p)
elif ip.proto == IP_PROTOS.udp:
p = (ip / UDP(buffer.getvalue()[:8]) /
p = (ip / TCP(buffer.getvalue()))
elif ip.nh == IP_PROTOS.udp:
p = (ip / UDP(buffer.getvalue()))
+ self.logger.debug(ppp("Reassembled:", p))
self.assert_packet_checksums_valid(p)
return p
data = b"A" * 16 + b"B" * 16 + b"C" * 3
self.port_in = random.randint(1025, 65535)
- reass = self.vapi.nat_reass_dump()
- reass_n_start = len(reass)
-
# in2out
pkts = self.create_stream_frag(self.pg0,
self.pg1.remote_ip4,
self.assertEqual(p[layer].id, self.port_in)
self.assertEqual(data, p[Raw].load)
- reass = self.vapi.nat_reass_dump()
- reass_n_end = len(reass)
-
- self.assertEqual(reass_n_end - reass_n_start, 2)
-
def frag_in_order_in_plus_out(self, proto=IP_PROTOS.tcp):
layer = self.proto2layer(proto)
self.port_in = random.randint(1025, 65535)
for i in range(2):
- reass = self.vapi.nat_reass_dump()
- reass_n_start = len(reass)
-
# out2in
pkts = self.create_stream_frag(self.pg0,
self.server_out_addr,
self.assertEqual(p[layer].id, self.port_in)
self.assertEqual(data, p[Raw].load)
- reass = self.vapi.nat_reass_dump()
- reass_n_end = len(reass)
-
- self.assertEqual(reass_n_end - reass_n_start, 2)
-
def reass_hairpinning(self, proto=IP_PROTOS.tcp):
layer = self.proto2layer(proto)
cls.pg1.configure_ipv4_neighbors()
cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
- cls.vapi.ip_table_add_del(is_add=1, table_id=10)
- cls.vapi.ip_table_add_del(is_add=1, table_id=20)
+ cls.vapi.ip_table_add_del(is_add=1, table={'table_id': 10})
+ cls.vapi.ip_table_add_del(is_add=1, table={'table_id': 20})
- cls.pg4._local_ip4 = VppIpPrefix("172.16.255.1",
- cls.pg4.local_ip4_prefix.len)
+ cls.pg4._local_ip4 = "172.16.255.1"
cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
cls.pg4.set_table_ip4(10)
- cls.pg5._local_ip4 = VppIpPrefix("172.17.255.3",
- cls.pg5.local_ip4_prefix.len)
+ cls.pg5._local_ip4 = "172.17.255.3"
cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
cls.pg5.set_table_ip4(10)
- cls.pg6._local_ip4 = VppIpPrefix("172.16.255.1",
- cls.pg6.local_ip4_prefix.len)
+ cls.pg6._local_ip4 = "172.16.255.1"
cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
cls.pg6.set_table_ip4(20)
for i in cls.overlapping_interfaces:
cls.pg9.config_ip4()
cls.vapi.sw_interface_add_del_address(
sw_if_index=cls.pg9.sw_if_index,
- prefix=VppIpPrefix("10.0.0.1", 24).encode())
+ prefix="10.0.0.1/24")
cls.pg9.admin_up()
cls.pg9.resolve_arp()
err = self.statistics.get_err_counter(
'/err/nat44-in2out-slowpath/TCP packets')
- self.assertEqual(err - tcpn, 1)
+ self.assertEqual(err - tcpn, 2)
err = self.statistics.get_err_counter(
'/err/nat44-in2out-slowpath/UDP packets')
self.assertEqual(err - udpn, 1)
self.assertEqual(err - icmpn, 1)
err = self.statistics.get_err_counter(
'/err/nat44-in2out-slowpath/good in2out packets processed')
- self.assertEqual(err - totaln, 3)
+ self.assertEqual(err - totaln, 4)
# out2in
tcpn = self.statistics.get_err_counter('/err/nat44-out2in/TCP packets')
self.verify_capture_in(capture, self.pg0)
err = self.statistics.get_err_counter('/err/nat44-out2in/TCP packets')
- self.assertEqual(err - tcpn, 1)
+ self.assertEqual(err - tcpn, 2)
err = self.statistics.get_err_counter('/err/nat44-out2in/UDP packets')
self.assertEqual(err - udpn, 1)
err = self.statistics.get_err_counter('/err/nat44-out2in/ICMP packets')
self.assertEqual(err - icmpn, 1)
err = self.statistics.get_err_counter(
'/err/nat44-out2in/good out2in packets processed')
- self.assertEqual(err - totaln, 3)
+ self.assertEqual(err - totaln, 4)
users = self.statistics.get_counter('/nat44/total-users')
self.assertEqual(users[0][0], 1)
self.logger.error(ppp("Unexpected or invalid packet:", packet))
raise
- def test_max_translations_per_user(self):
- """ MAX translations per user - recycle the least recently used """
-
- self.nat44_add_address(self.nat_addr)
- flags = self.config_flags.NAT_IS_INSIDE
- self.vapi.nat44_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
- self.vapi.nat44_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- is_add=1)
-
- # get maximum number of translations per user
- nat44_config = self.vapi.nat_show_config()
-
- # send more than maximum number of translations per user packets
- pkts_num = nat44_config.max_translations_per_user + 5
- pkts = []
- for port in range(0, pkts_num):
- p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
- TCP(sport=1025 + port))
- pkts.append(p)
- self.pg0.add_stream(pkts)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
-
- # verify number of translated packet
- self.pg1.get_capture(pkts_num)
-
- users = self.vapi.nat44_user_dump()
- for user in users:
- if user.ip_address == self.pg0.remote_ip4:
- self.assertEqual(user.nsessions,
- nat44_config.max_translations_per_user)
- self.assertEqual(user.nstaticsessions, 0)
-
- tcp_port = 22
- self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
- tcp_port, tcp_port,
- proto=IP_PROTOS.tcp)
- p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
- TCP(sport=tcp_port))
- self.pg0.add_stream(p)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
- self.pg1.get_capture(1)
- users = self.vapi.nat44_user_dump()
- for user in users:
- if user.ip_address == self.pg0.remote_ip4:
- self.assertEqual(user.nsessions,
- nat44_config.max_translations_per_user - 1)
- self.assertEqual(user.nstaticsessions, 1)
-
def test_interface_addr(self):
""" Acquire NAT44 addresses from interface """
self.vapi.nat44_add_del_interface_addr(
self.pg0.unconfig_ip4()
self.pg1.unconfig_ip4()
- self.vapi.ip_table_add_del(is_add=1, table_id=vrf_id1)
- self.vapi.ip_table_add_del(is_add=1, table_id=vrf_id2)
+ self.vapi.ip_table_add_del(is_add=1, table={'table_id': vrf_id1})
+ self.vapi.ip_table_add_del(is_add=1, table={'table_id': vrf_id2})
self.pg0.set_table_ip4(vrf_id1)
self.pg1.set_table_ip4(vrf_id2)
self.pg0.config_ip4()
self.pg1.config_ip4()
self.pg0.resolve_arp()
self.pg1.resolve_arp()
- self.vapi.ip_table_add_del(is_add=0, table_id=vrf_id1)
- self.vapi.ip_table_add_del(is_add=0, table_id=vrf_id2)
+ self.vapi.ip_table_add_del(is_add=0, table={'table_id': vrf_id1})
+ self.vapi.ip_table_add_del(is_add=0, table={'table_id': vrf_id2})
def test_vrf_feature_independent(self):
""" NAT44 tenant VRF independent address pool mode """
self.verify_no_nat44_user()
- def test_set_get_reass(self):
- """ NAT44 set/get virtual fragmentation reassembly """
- reas_cfg1 = self.vapi.nat_get_reass()
-
- self.vapi.nat_set_reass(timeout=reas_cfg1.ip4_timeout + 5,
- max_reass=reas_cfg1.ip4_max_reass * 2,
- max_frag=reas_cfg1.ip4_max_frag * 2,
- drop_frag=0)
-
- reas_cfg2 = self.vapi.nat_get_reass()
-
- self.assertEqual(reas_cfg1.ip4_timeout + 5, reas_cfg2.ip4_timeout)
- self.assertEqual(reas_cfg1.ip4_max_reass * 2, reas_cfg2.ip4_max_reass)
- self.assertEqual(reas_cfg1.ip4_max_frag * 2, reas_cfg2.ip4_max_frag)
-
- self.vapi.nat_set_reass(timeout=2, max_reass=1024, max_frag=5,
- drop_frag=1)
- self.assertTrue(self.vapi.nat_get_reass().ip4_drop_frag)
-
def test_frag_in_order(self):
""" NAT44 translate fragments arriving in order """
sw_if_index=self.pg1.sw_if_index,
is_add=1)
- reas_cfg1 = self.vapi.nat_get_reass()
- # this test was intermittently failing in some cases
- # until we temporarily bump the reassembly timeouts
- self.vapi.nat_set_reass(timeout=20, max_reass=1024, max_frag=5,
- drop_frag=0)
-
self.frag_in_order(proto=IP_PROTOS.tcp)
self.frag_in_order(proto=IP_PROTOS.udp)
self.frag_in_order(proto=IP_PROTOS.icmp)
- # restore the reassembly timeouts
- self.vapi.nat_set_reass(timeout=reas_cfg1.ip4_timeout,
- max_reass=reas_cfg1.ip4_max_reass,
- max_frag=reas_cfg1.ip4_max_frag,
- drop_frag=reas_cfg1.ip4_drop_frag)
-
def test_frag_forwarding(self):
""" NAT44 forwarding fragment test """
self.vapi.nat44_add_del_interface_addr(
self.assertGreaterEqual(tcp.sport, 1025)
self.assertLessEqual(tcp.sport, 1027)
- def test_ipfix_max_frags(self):
- """ IPFIX logging maximum fragments pending reassembly exceeded """
- self.nat44_add_address(self.nat_addr)
- flags = self.config_flags.NAT_IS_INSIDE
- self.vapi.nat44_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
- self.vapi.nat44_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- is_add=1)
- self.vapi.nat_set_reass(timeout=2, max_reass=1024, max_frag=1,
- drop_frag=0)
- self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4,
- src_address=self.pg3.local_ip4,
- path_mtu=512,
- template_interval=10)
- self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
- src_port=self.ipfix_src_port,
- enable=1)
-
- data = b"A" * 4 + b"B" * 16 + b"C" * 3
- self.tcp_port_in = random.randint(1025, 65535)
- pkts = self.create_stream_frag(self.pg0,
- self.pg1.remote_ip4,
- self.tcp_port_in,
- 20,
- data)
- pkts.reverse()
- self.pg0.add_stream(pkts)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
- self.pg1.assert_nothing_captured()
- sleep(1)
- self.vapi.ipfix_flush()
- capture = self.pg3.get_capture(9)
- ipfix = IPFIXDecoder()
- # first load template
- for p in capture:
- self.assertTrue(p.haslayer(IPFIX))
- self.assertEqual(p[IP].src, self.pg3.local_ip4)
- self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
- self.assertEqual(p[UDP].sport, self.ipfix_src_port)
- self.assertEqual(p[UDP].dport, 4739)
- self.assertEqual(p[IPFIX].observationDomainID,
- self.ipfix_domain_id)
- if p.haslayer(Template):
- ipfix.add_template(p.getlayer(Template))
- # verify events in data set
- for p in capture:
- if p.haslayer(Data):
- data = ipfix.decode_data_set(p.getlayer(Set))
- self.verify_ipfix_max_fragments_ip4(data, 1,
- self.pg0.remote_ip4n)
-
def test_multiple_outside_vrf(self):
""" Multiple outside VRF """
vrf_id1 = 1
self.pg1.unconfig_ip4()
self.pg2.unconfig_ip4()
- self.vapi.ip_table_add_del(is_add=1, table_id=vrf_id1)
- self.vapi.ip_table_add_del(is_add=1, table_id=vrf_id2)
+ self.vapi.ip_table_add_del(is_add=1, table={'table_id': vrf_id1})
+ self.vapi.ip_table_add_del(is_add=1, table={'table_id': vrf_id2})
self.pg1.set_table_ip4(vrf_id1)
self.pg2.set_table_ip4(vrf_id2)
self.pg1.config_ip4()
self.logger.info(self.vapi.cli("show nat44 static mappings"))
self.logger.info(self.vapi.cli("show nat44 interface address"))
self.logger.info(self.vapi.cli("show nat44 sessions detail"))
- self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
self.logger.info(self.vapi.cli("show nat44 hash tables detail"))
self.logger.info(self.vapi.cli("show nat timeouts"))
self.logger.info(
self.logger.info(self.vapi.cli("show nat ha"))
+class TestNAT44EndpointDependent2(MethodHolder):
+ """ Endpoint-Dependent session test cases """
+
+ icmp_timeout = 2
+
+ @classmethod
+ def setUpConstants(cls):
+ super(TestNAT44EndpointDependent2, cls).setUpConstants()
+ cls.vpp_cmdline.extend(["nat", "{", "endpoint-dependent",
+ "translation", "hash", "buckets", "1",
+ "icmp", "timeout", str(cls.icmp_timeout), "}"])
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestNAT44EndpointDependent2, cls).setUpClass()
+ try:
+ translation_buckets = 1
+ cls.max_translations = 10 * translation_buckets
+
+ cls.create_pg_interfaces(range(2))
+ cls.interfaces = list(cls.pg_interfaces[0:2])
+
+ for i in cls.interfaces:
+ i.admin_up()
+ i.config_ip4()
+ i.resolve_arp()
+
+ cls.pg0.generate_remote_hosts(1)
+ cls.pg0.configure_ipv4_neighbors()
+
+ cls.pg1.generate_remote_hosts(1)
+ cls.pg1.configure_ipv4_neighbors()
+
+ except Exception:
+ super(TestNAT44EndpointDependent2, cls).tearDownClass()
+ raise
+
+ def create_icmp_stream(self, in_if, out_if, count):
+ """
+ Create ICMP packet stream for inside network
+
+ :param in_if: Inside interface
+ :param out_if: Outside interface
+ :param count: Number of packets
+ """
+
+ self.assertTrue(count > 0)
+ icmp_id = random.randint(0, 65535 - (count - 1))
+
+ pkts = list()
+ for i in range(count):
+ p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
+ IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=64) /
+ ICMP(id=icmp_id + i, type='echo-request'))
+ pkts.append(p)
+ return pkts
+
+ def send_pkts(self, pkts, expected=None):
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ return self.pg1.get_capture(
+ len(pkts) if expected is None else expected)
+
+ def test_session_cleanup(self):
+ """ NAT44 session cleanup test """
+
+ self.nat44_add_address(self.pg1.local_ip4)
+ flags = self.config_flags.NAT_IS_INSIDE
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=self.pg0.sw_if_index,
+ flags=flags, is_add=1)
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=self.pg1.sw_if_index,
+ is_add=1)
+
+ nat_config = self.vapi.nat_show_config()
+ self.assertEqual(1, nat_config.endpoint_dependent)
+
+ pkts = self.create_icmp_stream(self.pg0, self.pg1,
+ self.max_translations + 2)
+ sz = len(pkts)
+
+ # positive test
+ self.send_pkts(pkts[0:self.max_translations])
+
+ # false positive test
+ self.send_pkts(pkts[self.max_translations:sz - 1], 0)
+
+ sleep(self.icmp_timeout)
+
+ # positive test
+ self.send_pkts(pkts[self.max_translations + 1:sz])
+
+
class TestNAT44EndpointDependent(MethodHolder):
""" Endpoint-Dependent mapping and filtering test cases """
cls.pg4.config_ip4()
cls.vapi.sw_interface_add_del_address(
sw_if_index=cls.pg4.sw_if_index,
- prefix=VppIpPrefix("10.0.0.1", 24).encode())
+ prefix="10.0.0.1/24")
cls.pg4.admin_up()
cls.pg4.resolve_arp()
cls.pg4.resolve_arp()
zero_ip4 = socket.inet_pton(socket.AF_INET, "0.0.0.0")
- cls.vapi.ip_table_add_del(is_add=1, table_id=1)
+ cls.vapi.ip_table_add_del(is_add=1, table={'table_id': 1})
- cls.pg5._local_ip4 = VppIpPrefix("10.1.1.1",
- cls.pg5.local_ip4_prefix.len)
+ cls.pg5._local_ip4 = "10.1.1.1"
cls.pg5._remote_hosts[0]._ip4 = "10.1.1.2"
cls.pg5.set_table_ip4(1)
cls.pg5.config_ip4()
register=False)
r1.add_vpp_config()
- cls.pg6._local_ip4 = VppIpPrefix("10.1.2.1",
- cls.pg6.local_ip4_prefix.len)
+ cls.pg6._local_ip4 = "10.1.2.1"
cls.pg6._remote_hosts[0]._ip4 = "10.1.2.2"
cls.pg6.set_table_ip4(1)
cls.pg6.config_ip4()
sw_if_index=self.pg1.sw_if_index,
is_add=1)
self.vapi.nat44_forwarding_enable_disable(enable=True)
- reas_cfg1 = self.vapi.nat_get_reass()
- # this test was intermittently failing in some cases
- # until we temporarily bump the reassembly timeouts
- self.vapi.nat_set_reass(timeout=20, max_reass=1024, max_frag=5,
- drop_frag=0)
self.frag_in_order(proto=IP_PROTOS.tcp, dont_translate=True)
- # restore the reassembly timeouts
- self.vapi.nat_set_reass(timeout=reas_cfg1.ip4_timeout,
- max_reass=reas_cfg1.ip4_max_reass,
- max_frag=reas_cfg1.ip4_max_frag,
- drop_frag=reas_cfg1.ip4_drop_frag)
def test_frag_out_of_order(self):
""" NAT44 translate fragments arriving out of order """
self.server_out_addr,
proto=IP_PROTOS.icmp)
- self.vapi.nat_set_reass(timeout=10, max_reass=1024, max_frag=5,
- drop_frag=0)
-
self.frag_in_order_in_plus_out(proto=IP_PROTOS.tcp)
self.frag_in_order_in_plus_out(proto=IP_PROTOS.udp)
self.frag_in_order_in_plus_out(proto=IP_PROTOS.icmp)
self.server_out_addr,
proto=IP_PROTOS.icmp)
- self.vapi.nat_set_reass(timeout=10, max_reass=1024, max_frag=5,
- drop_frag=0)
-
self.frag_out_of_order_in_plus_out(proto=IP_PROTOS.tcp)
self.frag_out_of_order_in_plus_out(proto=IP_PROTOS.udp)
self.frag_out_of_order_in_plus_out(proto=IP_PROTOS.icmp)
err = self.statistics.get_err_counter(
'/err/nat44-ed-in2out-slowpath/TCP packets')
- self.assertEqual(err - tcpn, 1)
+ self.assertEqual(err - tcpn, 2)
err = self.statistics.get_err_counter(
'/err/nat44-ed-in2out-slowpath/UDP packets')
self.assertEqual(err - udpn, 1)
self.assertEqual(err - icmpn, 1)
err = self.statistics.get_err_counter(
'/err/nat44-ed-in2out-slowpath/good in2out packets processed')
- self.assertEqual(err - totaln, 3)
+ self.assertEqual(err - totaln, 4)
# out2in
tcpn = self.statistics.get_err_counter(
err = self.statistics.get_err_counter(
'/err/nat44-ed-out2in/TCP packets')
- self.assertEqual(err - tcpn, 1)
+ self.assertEqual(err - tcpn, 2)
err = self.statistics.get_err_counter(
'/err/nat44-ed-out2in/UDP packets')
self.assertEqual(err - udpn, 1)
self.assertEqual(err - icmpn, 1)
err = self.statistics.get_err_counter(
'/err/nat44-ed-out2in/good out2in packets processed')
- self.assertEqual(err - totaln, 2)
+ self.assertEqual(err - totaln, 3)
users = self.statistics.get_counter('/nat44/total-users')
self.assertEqual(users[0][0], 1)
is_add=1)
try:
- self.vapi.ip_table_add_del(is_add=1, table_id=new_vrf_id)
+ self.vapi.ip_table_add_del(is_add=1,
+ table={'table_id': new_vrf_id})
self.pg7.unconfig_ip4()
self.pg7.set_table_ip4(new_vrf_id)
err = self.statistics.get_err_counter(
'/err/nat44-ed-in2out-slowpath/TCP packets')
- self.assertEqual(err - tcpn, 1)
+ self.assertEqual(err - tcpn, 2)
err = self.statistics.get_err_counter(
'/err/nat44-ed-in2out-slowpath/UDP packets')
self.assertEqual(err - udpn, 1)
self.assertEqual(err - icmpn, 1)
err = self.statistics.get_err_counter(
'/err/nat44-ed-in2out-slowpath/good in2out packets processed')
- self.assertEqual(err - totaln, 3)
+ self.assertEqual(err - totaln, 4)
# out2in
tcpn = self.statistics.get_err_counter(
err = self.statistics.get_err_counter(
'/err/nat44-ed-out2in/TCP packets')
- self.assertEqual(err - tcpn, 1)
+ self.assertEqual(err - tcpn, 2)
err = self.statistics.get_err_counter(
'/err/nat44-ed-out2in/UDP packets')
self.assertEqual(err - udpn, 1)
self.assertEqual(err - icmpn, 1)
err = self.statistics.get_err_counter(
'/err/nat44-ed-out2in/good out2in packets processed')
- self.assertEqual(err - totaln, 2)
+ self.assertEqual(err - totaln, 3)
users = self.statistics.get_counter('/nat44/total-users')
self.assertEqual(users[0][0], 1)
self.pg8.config_ip4()
self.pg8.resolve_arp()
- self.vapi.ip_table_add_del(is_add=0, table_id=new_vrf_id)
+ self.vapi.ip_table_add_del(is_add=0,
+ table={'table_id': new_vrf_id})
def test_forwarding(self):
""" NAT44 forwarding test """
cls.ip6_interfaces.append(cls.pg_interfaces[2])
cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
- cls.vapi.ip_table_add_del(is_ipv6=1, is_add=1,
- table_id=cls.vrf1_id)
+ cls.vapi.ip_table_add_del(is_add=1,
+ table={'table_id': cls.vrf1_id,
+ 'is_ip6': 1})
cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
err = self.statistics.get_err_counter('/err/nat64-out2in/TCP packets')
- self.assertEqual(err - tcpn, 1)
+ self.assertEqual(err - tcpn, 2)
err = self.statistics.get_err_counter('/err/nat64-out2in/UDP packets')
self.assertEqual(err - udpn, 1)
err = self.statistics.get_err_counter('/err/nat64-out2in/ICMP packets')
self.assertEqual(err - icmpn, 1)
err = self.statistics.get_err_counter(
'/err/nat64-out2in/good out2in packets processed')
- self.assertEqual(err - totaln, 3)
+ self.assertEqual(err - totaln, 4)
bibs = self.statistics.get_counter('/nat64/total-bibs')
self.assertEqual(bibs[0][0], 3)
self.vapi.nat64_add_del_interface(is_add=1, flags=0,
sw_if_index=self.pg1.sw_if_index)
- reass = self.vapi.nat_reass_dump()
- reass_n_start = len(reass)
-
# in2out
data = b'a' * 200
pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
frags = self.pg0.get_capture(len(pkts))
+ self.logger.debug(ppc("Captured:", frags))
src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
self.assertEqual(p[TCP].sport, 20)
self.assertEqual(p[TCP].dport, self.tcp_port_in)
self.assertEqual(data, p[Raw].load)
- reass = self.vapi.nat_reass_dump()
- reass_n_end = len(reass)
-
- self.assertEqual(reass_n_end - reass_n_start, 2)
-
def test_reass_hairpinning(self):
""" NAT64 fragments hairpinning """
data = b'a' * 200
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
frags = self.pg0.get_capture(len(pkts))
+ self.logger.debug(ppc("Captured:", frags))
p = self.reass_frags_and_verify_ip6(frags, nat_addr_ip6, server.ip6)
self.assertNotEqual(p[TCP].sport, client_in_port)
self.assertEqual(p[TCP].dport, server_in_port)
data = ipfix.decode_data_set(p.getlayer(Set))
self.verify_ipfix_max_bibs(data, max_bibs)
- def test_ipfix_max_frags(self):
- """ IPFIX logging maximum fragments pending reassembly exceeded """
- self.vapi.nat64_add_del_pool_addr_range(start_addr=self.nat_addr,
- end_addr=self.nat_addr,
- vrf_id=0xFFFFFFFF,
- is_add=1)
- flags = self.config_flags.NAT_IS_INSIDE
- self.vapi.nat64_add_del_interface(is_add=1, flags=flags,
- sw_if_index=self.pg0.sw_if_index)
- self.vapi.nat64_add_del_interface(is_add=1, flags=0,
- sw_if_index=self.pg1.sw_if_index)
- self.vapi.nat_set_reass(timeout=2, max_reass=1024, max_frag=1,
- drop_frag=0, is_ip6=1)
- self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4,
- src_address=self.pg3.local_ip4,
- path_mtu=512,
- template_interval=10)
- self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
- src_port=self.ipfix_src_port,
- enable=1)
-
- data = b'a' * 200
- pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
- self.tcp_port_in, 20, data)
- pkts.reverse()
- self.pg0.add_stream(pkts)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
- self.pg1.assert_nothing_captured()
- sleep(1)
- self.vapi.ipfix_flush()
- capture = self.pg3.get_capture(9)
- ipfix = IPFIXDecoder()
- # first load template
- for p in capture:
- self.assertTrue(p.haslayer(IPFIX))
- self.assertEqual(p[IP].src, self.pg3.local_ip4)
- self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
- self.assertEqual(p[UDP].sport, self.ipfix_src_port)
- self.assertEqual(p[UDP].dport, 4739)
- self.assertEqual(p[IPFIX].observationDomainID,
- self.ipfix_domain_id)
- if p.haslayer(Template):
- ipfix.add_template(p.getlayer(Template))
- # verify events in data set
- for p in capture:
- if p.haslayer(Data):
- data = ipfix.decode_data_set(p.getlayer(Set))
- self.verify_ipfix_max_fragments_ip6(data, 1,
- self.pg0.remote_ip6n)
-
def test_ipfix_bib_ses(self):
""" IPFIX logging NAT64 BIB/session create and delete events """
self.tcp_port_in = random.randint(1025, 65535)
self.logger.info(self.vapi.cli("show nat64 prefix"))
self.logger.info(self.vapi.cli("show nat64 bib all"))
self.logger.info(self.vapi.cli("show nat64 session table all"))
- self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
class TestDSlite(MethodHolder):
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
capture = self.pg1.get_capture(len(pkts))
+
for packet in capture:
try:
self.assertEqual(packet[IPv6].src, self.nat_addr)