from io import BytesIO
from random import randint, choice
+import re
import scapy.compat
from framework import tag_fixme_ubuntu2204, is_distro_ubuntu2204
-from framework import VppTestCase, VppTestRunner
+from framework import VppTestCase, VppTestRunner, VppLoInterface
from scapy.data import IP_PROTOS
from scapy.layers.inet import IP, TCP, UDP, ICMP, GRE
from scapy.layers.inet import IPerror, TCPerror
if not self.vpp_dead:
self.plugin_disable()
- def plugin_enable(self):
- self.vapi.nat44_ed_plugin_enable_disable(sessions=self.max_sessions, enable=1)
+ def plugin_enable(self, max_sessions=None):
+ max_sessions = max_sessions or self.max_sessions
+ self.vapi.nat44_ed_plugin_enable_disable(sessions=max_sessions, enable=1)
def plugin_disable(self):
self.vapi.nat44_ed_plugin_enable_disable(enable=0)
return pkts
+ def create_udp_stream(self, in_if, out_if, count, base_port=6303):
+ return [
+ (
+ Ether(dst=in_if.local_mac, src=in_if.remote_mac)
+ / IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=64)
+ / UDP(sport=base_port + i, dport=20)
+ )
+ for i in range(count)
+ ]
+
def create_stream_frag(
self, src_if, dst, sport, dport, data, proto=IP_PROTOS.tcp, echo_reply=False
):
in_if.unconfig()
out_if.unconfig()
+ def test_delete_interface(self):
+ """NAT44ED delete nat interface"""
+
+ self.nat_add_address(self.nat_addr)
+
+ interfaces = self.create_loopback_interfaces(4)
+ self.nat_add_outside_interface(interfaces[0])
+ self.nat_add_inside_interface(interfaces[1])
+ self.nat_add_outside_interface(interfaces[2])
+ self.nat_add_inside_interface(interfaces[2])
+ self.vapi.nat44_ed_add_del_output_interface(
+ sw_if_index=interfaces[3].sw_if_index, is_add=1
+ )
+
+ nat_sw_if_indices = [
+ i.sw_if_index
+ for i in self.vapi.nat44_interface_dump()
+ + list(self.vapi.vpp.details_iter(self.vapi.nat44_ed_output_interface_get))
+ ]
+ self.assertEqual(len(nat_sw_if_indices), len(interfaces))
+
+ loopbacks = []
+ for i in interfaces:
+ # delete nat-enabled interface
+ self.assertIn(i.sw_if_index, nat_sw_if_indices)
+ i.remove_vpp_config()
+
+ # create interface with the same index
+ lo = VppLoInterface(self)
+ loopbacks.append(lo)
+ self.assertEqual(lo.sw_if_index, i.sw_if_index)
+
+ # check interface is not nat-enabled
+ nat_sw_if_indices = [
+ i.sw_if_index
+ for i in self.vapi.nat44_interface_dump()
+ + list(
+ self.vapi.vpp.details_iter(self.vapi.nat44_ed_output_interface_get)
+ )
+ ]
+ self.assertNotIn(lo.sw_if_index, nat_sw_if_indices)
+
+ for i in loopbacks:
+ i.remove_vpp_config()
+
@tag_fixme_ubuntu2204
class TestNAT44EDMW(TestNAT44ED):
)
self.send_and_expect(self.pg0, p, self.pg1)
+ def test_dynamic_ports_exhausted(self):
+ """NAT44ED dynamic translation test: address ports exhaused"""
+
+ sessions_per_batch = 128
+ n_available_ports = 65536 - 1024
+ n_sessions = n_available_ports + 2 * sessions_per_batch
+
+ # set high enough session limit for ports to be exhausted
+ self.plugin_disable()
+ self.plugin_enable(max_sessions=n_sessions)
+
+ self.nat_add_inside_interface(self.pg0)
+ self.nat_add_outside_interface(self.pg1)
+
+ # set timeouts to high for sessions to reallistically expire
+ config = self.vapi.nat44_show_running_config()
+ old_timeouts = config.timeouts
+ self.vapi.nat_set_timeouts(
+ udp=21600,
+ tcp_established=old_timeouts.tcp_established,
+ tcp_transitory=old_timeouts.tcp_transitory,
+ icmp=old_timeouts.icmp,
+ )
+
+ # in2out after NAT addresses added
+ self.nat_add_address(self.nat_addr)
+
+ for i in range(n_sessions // sessions_per_batch):
+ pkts = self.create_udp_stream(
+ self.pg0,
+ self.pg1,
+ sessions_per_batch,
+ base_port=i * sessions_per_batch + 100,
+ )
+
+ self.pg0.add_stream(pkts)
+ self.pg_start()
+
+ err = self.statistics.get_err_counter(
+ "/err/nat44-ed-in2out-slowpath/out of ports"
+ )
+ if err > sessions_per_batch:
+ break
+
+ # Check for ports to be used no more than once
+ ports = set()
+ sessions = self.vapi.cli("show nat44 sessions")
+ rx = re.compile(
+ f" *o2i flow: match: saddr {self.pg1.remote_ip4} sport [0-9]+ daddr {self.nat_addr} dport ([0-9]+) proto UDP.*"
+ )
+ for line in sessions.splitlines():
+ m = rx.match(line)
+ if m:
+ port = int(m.groups()[0])
+ self.assertNotIn(port, ports)
+ ports.add(port)
+
+ self.assertGreaterEqual(err, sessions_per_batch)
+
if __name__ == "__main__":
unittest.main(testRunner=VppTestRunner)