tests: Use errno value rather than a specific int
[vpp.git] / test / test_nat44_ed.py
index 7279b6a..eed89f1 100644 (file)
@@ -6,13 +6,14 @@ from random import randint, choice
 
 import re
 import scapy.compat
-from framework import tag_fixme_ubuntu2204, is_distro_ubuntu2204
-from framework import VppTestCase, VppTestRunner, VppLoInterface
+from framework import VppTestCase, VppLoInterface
+from asfframework import VppTestRunner, tag_fixme_ubuntu2204, is_distro_ubuntu2204
 from scapy.data import IP_PROTOS
 from scapy.layers.inet import IP, TCP, UDP, ICMP, GRE
 from scapy.layers.inet import IPerror, TCPerror
 from scapy.layers.l2 import Ether
 from scapy.packet import Raw
+from statistics import variance
 from syslog_rfc5424_parser import SyslogMessage, ParseError
 from syslog_rfc5424_parser.constants import SyslogSeverity
 from util import ppp, pr, ip4_range
@@ -142,7 +143,6 @@ class TestNAT44ED(VppTestCase):
         tag="",
         flags=0,
     ):
-
         if not (local_port and external_port):
             flags |= self.config_flags.NAT_IS_ADDR_ONLY
 
@@ -687,7 +687,6 @@ class TestNAT44ED(VppTestCase):
     def frag_in_order_in_plus_out(
         self, in_addr, out_addr, in_port, out_port, proto=IP_PROTOS.tcp
     ):
-
         layer = self.proto2layer(proto)
 
         if proto == IP_PROTOS.tcp:
@@ -743,7 +742,6 @@ class TestNAT44ED(VppTestCase):
     def frag_out_of_order_in_plus_out(
         self, in_addr, out_addr, in_port, out_port, proto=IP_PROTOS.tcp
     ):
-
         layer = self.proto2layer(proto)
 
         if proto == IP_PROTOS.tcp:
@@ -2316,14 +2314,17 @@ class TestNAT44ED(VppTestCase):
             raise
 
     def test_outside_address_distribution(self):
-        """Outside address distribution based on source address"""
+        """NAT44ED outside address distribution based on source address"""
 
+        addresses = 65
         x = 100
-        nat_addresses = []
 
-        for i in range(1, x):
+        nat_addresses = []
+        nat_distribution = {}
+        for i in range(1, addresses):
             a = "10.0.0.%d" % i
             nat_addresses.append(a)
+            nat_distribution[a] = set()
 
         self.nat_add_inside_interface(self.pg0)
         self.nat_add_outside_interface(self.pg1)
@@ -2362,16 +2363,11 @@ class TestNAT44ED(VppTestCase):
             self.assertTrue(info is not None)
             self.assertEqual(packet_index, info.index)
             p_sent = info.data
-            packed = socket.inet_aton(p_sent[IP].src)
-            numeric = struct.unpack("!L", packed)[0]
-            numeric = socket.htonl(numeric)
-            a = nat_addresses[(numeric - 1) % len(nat_addresses)]
-            self.assertEqual(
-                a,
-                p_recvd[IP].src,
-                "Invalid packet (src IP %s translated to %s, but expected %s)"
-                % (p_sent[IP].src, p_recvd[IP].src, a),
-            )
+            self.assertIn(p_recvd[IP].src, nat_distribution)
+            nat_distribution[p_recvd[IP].src].add(p_sent[IP].src)
+
+        var = variance(map(len, nat_distribution.values()), x / addresses)
+        self.assertLess(var, 0.33, msg="Bad outside address distribution")
 
     def test_dynamic_edge_ports(self):
         """NAT44ED dynamic translation test: edge ports"""
@@ -2943,10 +2939,13 @@ class TestNAT44EDMW(TestNAT44ED):
 
         limit = 5
 
-        # 2 interfaces pg0, pg1 (vrf10, limit 1 tcp session)
-        # non existing vrf_id makes process core dump
+        # 2 interfaces pg0, pg1 (vrf10, limit 5 tcp sessions)
         self.vapi.nat44_set_session_limit(session_limit=limit, vrf_id=10)
 
+        # expect error when bad is specified
+        with self.vapi.assert_negative_api_retval():
+            self.vapi.nat44_set_session_limit(session_limit=limit, vrf_id=20)
+
         self.nat_add_inside_interface(inside)
         self.nat_add_inside_interface(inside_vrf10)
         self.nat_add_outside_interface(outside)