tests: Use errno value rather than a specific int
[vpp.git] / test / test_nat44_ed.py
index 7745987..eed89f1 100644 (file)
@@ -4,13 +4,16 @@ import unittest
 from io import BytesIO
 from random import randint, choice
 
+import re
 import scapy.compat
-from framework import VppTestCase, VppTestRunner
+from framework import VppTestCase, VppLoInterface
+from asfframework import VppTestRunner, tag_fixme_ubuntu2204, is_distro_ubuntu2204
 from scapy.data import IP_PROTOS
 from scapy.layers.inet import IP, TCP, UDP, ICMP, GRE
 from scapy.layers.inet import IPerror, TCPerror
 from scapy.layers.l2 import Ether
 from scapy.packet import Raw
+from statistics import variance
 from syslog_rfc5424_parser import SyslogMessage, ParseError
 from syslog_rfc5424_parser.constants import SyslogSeverity
 from util import ppp, pr, ip4_range
@@ -47,8 +50,9 @@ class TestNAT44ED(VppTestCase):
         if not self.vpp_dead:
             self.plugin_disable()
 
-    def plugin_enable(self):
-        self.vapi.nat44_ed_plugin_enable_disable(sessions=self.max_sessions, enable=1)
+    def plugin_enable(self, max_sessions=None):
+        max_sessions = max_sessions or self.max_sessions
+        self.vapi.nat44_ed_plugin_enable_disable(sessions=max_sessions, enable=1)
 
     def plugin_disable(self):
         self.vapi.nat44_ed_plugin_enable_disable(enable=0)
@@ -71,7 +75,7 @@ class TestNAT44ED(VppTestCase):
 
     @staticmethod
     def random_port():
-        return randint(1025, 65535)
+        return randint(1024, 65535)
 
     @staticmethod
     def proto2layer(proto):
@@ -139,7 +143,6 @@ class TestNAT44ED(VppTestCase):
         tag="",
         flags=0,
     ):
-
         if not (local_port and external_port):
             flags |= self.config_flags.NAT_IS_ADDR_ONLY
 
@@ -159,6 +162,8 @@ class TestNAT44ED(VppTestCase):
     @classmethod
     def setUpClass(cls):
         super().setUpClass()
+        if is_distro_ubuntu2204 == True and not hasattr(cls, "vpp"):
+            return
 
         cls.create_pg_interfaces(range(12))
         cls.interfaces = list(cls.pg_interfaces[:4])
@@ -613,6 +618,16 @@ class TestNAT44ED(VppTestCase):
 
         return pkts
 
+    def create_udp_stream(self, in_if, out_if, count, base_port=6303):
+        return [
+            (
+                Ether(dst=in_if.local_mac, src=in_if.remote_mac)
+                / IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=64)
+                / UDP(sport=base_port + i, dport=20)
+            )
+            for i in range(count)
+        ]
+
     def create_stream_frag(
         self, src_if, dst, sport, dport, data, proto=IP_PROTOS.tcp, echo_reply=False
     ):
@@ -672,7 +687,6 @@ class TestNAT44ED(VppTestCase):
     def frag_in_order_in_plus_out(
         self, in_addr, out_addr, in_port, out_port, proto=IP_PROTOS.tcp
     ):
-
         layer = self.proto2layer(proto)
 
         if proto == IP_PROTOS.tcp:
@@ -728,7 +742,6 @@ class TestNAT44ED(VppTestCase):
     def frag_out_of_order_in_plus_out(
         self, in_addr, out_addr, in_port, out_port, proto=IP_PROTOS.tcp
     ):
-
         layer = self.proto2layer(proto)
 
         if proto == IP_PROTOS.tcp:
@@ -2301,14 +2314,17 @@ class TestNAT44ED(VppTestCase):
             raise
 
     def test_outside_address_distribution(self):
-        """Outside address distribution based on source address"""
+        """NAT44ED outside address distribution based on source address"""
 
+        addresses = 65
         x = 100
-        nat_addresses = []
 
-        for i in range(1, x):
+        nat_addresses = []
+        nat_distribution = {}
+        for i in range(1, addresses):
             a = "10.0.0.%d" % i
             nat_addresses.append(a)
+            nat_distribution[a] = set()
 
         self.nat_add_inside_interface(self.pg0)
         self.nat_add_outside_interface(self.pg1)
@@ -2347,18 +2363,250 @@ class TestNAT44ED(VppTestCase):
             self.assertTrue(info is not None)
             self.assertEqual(packet_index, info.index)
             p_sent = info.data
-            packed = socket.inet_aton(p_sent[IP].src)
-            numeric = struct.unpack("!L", packed)[0]
-            numeric = socket.htonl(numeric)
-            a = nat_addresses[(numeric - 1) % len(nat_addresses)]
+            self.assertIn(p_recvd[IP].src, nat_distribution)
+            nat_distribution[p_recvd[IP].src].add(p_sent[IP].src)
+
+        var = variance(map(len, nat_distribution.values()), x / addresses)
+        self.assertLess(var, 0.33, msg="Bad outside address distribution")
+
+    def test_dynamic_edge_ports(self):
+        """NAT44ED dynamic translation test: edge ports"""
+
+        worker_count = self.vpp_worker_count or 1
+        port_offset = 1024
+        port_per_thread = (65536 - port_offset) // worker_count
+        port_count = port_per_thread * worker_count
+
+        # worker thread edge ports
+        thread_edge_ports = {0, port_offset - 1, 65535}
+        for i in range(0, worker_count):
+            port_thread_offset = (port_per_thread * i) + port_offset
+            for port_range_offset in [0, port_per_thread - 1]:
+                port = port_thread_offset + port_range_offset
+                thread_edge_ports.add(port)
+        thread_drop_ports = set(
+            filter(
+                lambda x: x not in range(port_offset, port_offset + port_count),
+                thread_edge_ports,
+            )
+        )
+
+        in_if = self.pg7
+        out_if = self.pg8
+
+        self.nat_add_address(self.nat_addr)
+
+        try:
+            self.configure_ip4_interface(in_if, hosts=worker_count)
+            self.configure_ip4_interface(out_if)
+
+            self.nat_add_inside_interface(in_if)
+            self.nat_add_outside_interface(out_if)
+
+            # in2out
+            tc1 = self.statistics["/nat44-ed/in2out/slowpath/tcp"]
+            uc1 = self.statistics["/nat44-ed/in2out/slowpath/udp"]
+            ic1 = self.statistics["/nat44-ed/in2out/slowpath/icmp"]
+            dc1 = self.statistics["/nat44-ed/in2out/slowpath/drops"]
+
+            pkt_count = worker_count * len(thread_edge_ports)
+
+            i2o_pkts = [[] for x in range(0, worker_count)]
+            for i in range(0, worker_count):
+                remote_host = in_if.remote_hosts[i]
+                for port in thread_edge_ports:
+                    p = (
+                        Ether(dst=in_if.local_mac, src=in_if.remote_mac)
+                        / IP(src=remote_host.ip4, dst=out_if.remote_ip4)
+                        / TCP(sport=port, dport=port)
+                    )
+                    i2o_pkts[i].append(p)
+
+                    p = (
+                        Ether(dst=in_if.local_mac, src=in_if.remote_mac)
+                        / IP(src=remote_host.ip4, dst=out_if.remote_ip4)
+                        / UDP(sport=port, dport=port)
+                    )
+                    i2o_pkts[i].append(p)
+
+                    p = (
+                        Ether(dst=in_if.local_mac, src=in_if.remote_mac)
+                        / IP(src=remote_host.ip4, dst=out_if.remote_ip4)
+                        / ICMP(id=port, seq=port, type="echo-request")
+                    )
+                    i2o_pkts[i].append(p)
+
+            for i in range(0, worker_count):
+                if len(i2o_pkts[i]) > 0:
+                    in_if.add_stream(i2o_pkts[i], worker=i)
+
+            self.pg_enable_capture(self.pg_interfaces)
+            self.pg_start()
+            capture = out_if.get_capture(pkt_count * 3)
+            for packet in capture:
+                self.assert_packet_checksums_valid(packet)
+                if packet.haslayer(TCP):
+                    self.assert_in_range(
+                        packet[TCP].sport,
+                        port_offset,
+                        port_offset + port_count,
+                        "src TCP port",
+                    )
+                elif packet.haslayer(UDP):
+                    self.assert_in_range(
+                        packet[UDP].sport,
+                        port_offset,
+                        port_offset + port_count,
+                        "src UDP port",
+                    )
+                elif packet.haslayer(ICMP):
+                    self.assert_in_range(
+                        packet[ICMP].id,
+                        port_offset,
+                        port_offset + port_count,
+                        "ICMP id",
+                    )
+                else:
+                    self.fail(
+                        ppp("Unexpected or invalid packet (outside network):", packet)
+                    )
+
+            if_idx = in_if.sw_if_index
+            tc2 = self.statistics["/nat44-ed/in2out/slowpath/tcp"]
+            uc2 = self.statistics["/nat44-ed/in2out/slowpath/udp"]
+            ic2 = self.statistics["/nat44-ed/in2out/slowpath/icmp"]
+            dc2 = self.statistics["/nat44-ed/in2out/slowpath/drops"]
+
+            self.assertEqual(tc2[:, if_idx].sum() - tc1[:, if_idx].sum(), pkt_count)
+            self.assertEqual(uc2[:, if_idx].sum() - uc1[:, if_idx].sum(), pkt_count)
+            self.assertEqual(ic2[:, if_idx].sum() - ic1[:, if_idx].sum(), pkt_count)
+            self.assertEqual(dc2[:, if_idx].sum() - dc1[:, if_idx].sum(), 0)
+
+            # out2in
+            tc1 = self.statistics["/nat44-ed/out2in/fastpath/tcp"]
+            uc1 = self.statistics["/nat44-ed/out2in/fastpath/udp"]
+            ic1 = self.statistics["/nat44-ed/out2in/fastpath/icmp"]
+            dc1 = self.statistics["/nat44-ed/out2in/fastpath/drops"]
+            dc3 = self.statistics["/nat44-ed/out2in/slowpath/drops"]
+
+            # replies to unchanged thread ports should pass on each worker,
+            # excluding packets outside dynamic port range
+            drop_count = worker_count * len(thread_drop_ports)
+            pass_count = worker_count * len(thread_edge_ports) - drop_count
+
+            o2i_pkts = [[] for x in range(0, worker_count)]
+            for i in range(0, worker_count):
+                for port in thread_edge_ports:
+                    p = (
+                        Ether(dst=out_if.local_mac, src=out_if.remote_mac)
+                        / IP(src=out_if.remote_ip4, dst=self.nat_addr)
+                        / TCP(sport=port, dport=port)
+                    )
+                    o2i_pkts[i].append(p)
+
+                    p = (
+                        Ether(dst=out_if.local_mac, src=out_if.remote_mac)
+                        / IP(src=out_if.remote_ip4, dst=self.nat_addr)
+                        / UDP(sport=port, dport=port)
+                    )
+                    o2i_pkts[i].append(p)
+
+                    p = (
+                        Ether(dst=out_if.local_mac, src=out_if.remote_mac)
+                        / IP(src=out_if.remote_ip4, dst=self.nat_addr)
+                        / ICMP(id=port, seq=port, type="echo-reply")
+                    )
+                    o2i_pkts[i].append(p)
+
+            for i in range(0, worker_count):
+                if len(o2i_pkts[i]) > 0:
+                    out_if.add_stream(o2i_pkts[i], worker=i)
+
+            self.pg_enable_capture(self.pg_interfaces)
+            self.pg_start()
+            capture = in_if.get_capture(pass_count * 3)
+            for packet in capture:
+                self.assert_packet_checksums_valid(packet)
+                if packet.haslayer(TCP):
+                    self.assertIn(packet[TCP].dport, thread_edge_ports, "dst TCP port")
+                    self.assertEqual(packet[TCP].dport, packet[TCP].sport, "TCP ports")
+                elif packet.haslayer(UDP):
+                    self.assertIn(packet[UDP].dport, thread_edge_ports, "dst UDP port")
+                    self.assertEqual(packet[UDP].dport, packet[UDP].sport, "UDP ports")
+                elif packet.haslayer(ICMP):
+                    self.assertIn(packet[ICMP].id, thread_edge_ports, "ICMP id")
+                    self.assertEqual(packet[ICMP].id, packet[ICMP].seq, "ICMP id & seq")
+                else:
+                    self.fail(
+                        ppp("Unexpected or invalid packet (inside network):", packet)
+                    )
+
+            if_idx = out_if.sw_if_index
+            tc2 = self.statistics["/nat44-ed/out2in/fastpath/tcp"]
+            uc2 = self.statistics["/nat44-ed/out2in/fastpath/udp"]
+            ic2 = self.statistics["/nat44-ed/out2in/fastpath/icmp"]
+            dc2 = self.statistics["/nat44-ed/out2in/fastpath/drops"]
+            dc4 = self.statistics["/nat44-ed/out2in/slowpath/drops"]
+
+            self.assertEqual(tc2[:, if_idx].sum() - tc1[:, if_idx].sum(), pass_count)
+            self.assertEqual(uc2[:, if_idx].sum() - uc1[:, if_idx].sum(), pass_count)
+            self.assertEqual(ic2[:, if_idx].sum() - ic1[:, if_idx].sum(), pass_count)
+            self.assertEqual(dc2[:, if_idx].sum() - dc1[:, if_idx].sum(), 0)
             self.assertEqual(
-                a,
-                p_recvd[IP].src,
-                "Invalid packet (src IP %s translated to %s, but expected %s)"
-                % (p_sent[IP].src, p_recvd[IP].src, a),
+                dc4[:, if_idx].sum() - dc3[:, if_idx].sum(), drop_count * 3
             )
 
+        finally:
+            in_if.unconfig()
+            out_if.unconfig()
+
+    def test_delete_interface(self):
+        """NAT44ED delete nat interface"""
+
+        self.nat_add_address(self.nat_addr)
+
+        interfaces = self.create_loopback_interfaces(4)
+        self.nat_add_outside_interface(interfaces[0])
+        self.nat_add_inside_interface(interfaces[1])
+        self.nat_add_outside_interface(interfaces[2])
+        self.nat_add_inside_interface(interfaces[2])
+        self.vapi.nat44_ed_add_del_output_interface(
+            sw_if_index=interfaces[3].sw_if_index, is_add=1
+        )
+
+        nat_sw_if_indices = [
+            i.sw_if_index
+            for i in self.vapi.nat44_interface_dump()
+            + list(self.vapi.vpp.details_iter(self.vapi.nat44_ed_output_interface_get))
+        ]
+        self.assertEqual(len(nat_sw_if_indices), len(interfaces))
+
+        loopbacks = []
+        for i in interfaces:
+            # delete nat-enabled interface
+            self.assertIn(i.sw_if_index, nat_sw_if_indices)
+            i.remove_vpp_config()
+
+            # create interface with the same index
+            lo = VppLoInterface(self)
+            loopbacks.append(lo)
+            self.assertEqual(lo.sw_if_index, i.sw_if_index)
+
+            # check interface is not nat-enabled
+            nat_sw_if_indices = [
+                i.sw_if_index
+                for i in self.vapi.nat44_interface_dump()
+                + list(
+                    self.vapi.vpp.details_iter(self.vapi.nat44_ed_output_interface_get)
+                )
+            ]
+            self.assertNotIn(lo.sw_if_index, nat_sw_if_indices)
+
+        for i in loopbacks:
+            i.remove_vpp_config()
+
 
+@tag_fixme_ubuntu2204
 class TestNAT44EDMW(TestNAT44ED):
     """NAT44ED MW Test Case"""
 
@@ -2691,10 +2939,13 @@ class TestNAT44EDMW(TestNAT44ED):
 
         limit = 5
 
-        # 2 interfaces pg0, pg1 (vrf10, limit 1 tcp session)
-        # non existing vrf_id makes process core dump
+        # 2 interfaces pg0, pg1 (vrf10, limit 5 tcp sessions)
         self.vapi.nat44_set_session_limit(session_limit=limit, vrf_id=10)
 
+        # expect error when bad is specified
+        with self.vapi.assert_negative_api_retval():
+            self.vapi.nat44_set_session_limit(session_limit=limit, vrf_id=20)
+
         self.nat_add_inside_interface(inside)
         self.nat_add_inside_interface(inside_vrf10)
         self.nat_add_outside_interface(outside)
@@ -4581,6 +4832,65 @@ class TestNAT44EDMW(TestNAT44ED):
         )
         self.send_and_expect(self.pg0, p, self.pg1)
 
+    def test_dynamic_ports_exhausted(self):
+        """NAT44ED dynamic translation test: address ports exhaused"""
+
+        sessions_per_batch = 128
+        n_available_ports = 65536 - 1024
+        n_sessions = n_available_ports + 2 * sessions_per_batch
+
+        # set high enough session limit for ports to be exhausted
+        self.plugin_disable()
+        self.plugin_enable(max_sessions=n_sessions)
+
+        self.nat_add_inside_interface(self.pg0)
+        self.nat_add_outside_interface(self.pg1)
+
+        # set timeouts to high for sessions to reallistically expire
+        config = self.vapi.nat44_show_running_config()
+        old_timeouts = config.timeouts
+        self.vapi.nat_set_timeouts(
+            udp=21600,
+            tcp_established=old_timeouts.tcp_established,
+            tcp_transitory=old_timeouts.tcp_transitory,
+            icmp=old_timeouts.icmp,
+        )
+
+        # in2out after NAT addresses added
+        self.nat_add_address(self.nat_addr)
+
+        for i in range(n_sessions // sessions_per_batch):
+            pkts = self.create_udp_stream(
+                self.pg0,
+                self.pg1,
+                sessions_per_batch,
+                base_port=i * sessions_per_batch + 100,
+            )
+
+            self.pg0.add_stream(pkts)
+            self.pg_start()
+
+            err = self.statistics.get_err_counter(
+                "/err/nat44-ed-in2out-slowpath/out of ports"
+            )
+            if err > sessions_per_batch:
+                break
+
+        # Check for ports to be used no more than once
+        ports = set()
+        sessions = self.vapi.cli("show nat44 sessions")
+        rx = re.compile(
+            f" *o2i flow: match: saddr {self.pg1.remote_ip4} sport [0-9]+ daddr {self.nat_addr} dport ([0-9]+) proto UDP.*"
+        )
+        for line in sessions.splitlines():
+            m = rx.match(line)
+            if m:
+                port = int(m.groups()[0])
+                self.assertNotIn(port, ports)
+                ports.add(port)
+
+        self.assertGreaterEqual(err, sessions_per_batch)
+
 
 if __name__ == "__main__":
     unittest.main(testRunner=VppTestRunner)