X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=test%2Ftest_nat44_ed.py;h=eed89f1a399ffe5142a4e4fd1bd78b09ae904b1c;hb=853cc9f2ad3ee52cbdd891fb09d51c25678baed0;hp=7279b6a7d56c00008a2238b9c0f00472c882688c;hpb=6b97c43005f6458ce2e253f87af6f609eaebef60;p=vpp.git diff --git a/test/test_nat44_ed.py b/test/test_nat44_ed.py index 7279b6a7d56..eed89f1a399 100644 --- a/test/test_nat44_ed.py +++ b/test/test_nat44_ed.py @@ -6,13 +6,14 @@ from random import randint, choice import re import scapy.compat -from framework import tag_fixme_ubuntu2204, is_distro_ubuntu2204 -from framework import VppTestCase, VppTestRunner, VppLoInterface +from framework import VppTestCase, VppLoInterface +from asfframework import VppTestRunner, tag_fixme_ubuntu2204, is_distro_ubuntu2204 from scapy.data import IP_PROTOS from scapy.layers.inet import IP, TCP, UDP, ICMP, GRE from scapy.layers.inet import IPerror, TCPerror from scapy.layers.l2 import Ether from scapy.packet import Raw +from statistics import variance from syslog_rfc5424_parser import SyslogMessage, ParseError from syslog_rfc5424_parser.constants import SyslogSeverity from util import ppp, pr, ip4_range @@ -142,7 +143,6 @@ class TestNAT44ED(VppTestCase): tag="", flags=0, ): - if not (local_port and external_port): flags |= self.config_flags.NAT_IS_ADDR_ONLY @@ -687,7 +687,6 @@ class TestNAT44ED(VppTestCase): def frag_in_order_in_plus_out( self, in_addr, out_addr, in_port, out_port, proto=IP_PROTOS.tcp ): - layer = self.proto2layer(proto) if proto == IP_PROTOS.tcp: @@ -743,7 +742,6 @@ class TestNAT44ED(VppTestCase): def frag_out_of_order_in_plus_out( self, in_addr, out_addr, in_port, out_port, proto=IP_PROTOS.tcp ): - layer = self.proto2layer(proto) if proto == IP_PROTOS.tcp: @@ -2316,14 +2314,17 @@ class TestNAT44ED(VppTestCase): raise def test_outside_address_distribution(self): - """Outside address distribution based on source address""" + """NAT44ED outside address distribution based on source address""" + addresses = 65 x = 100 - nat_addresses = [] - for i in range(1, x): + nat_addresses = [] + nat_distribution = {} + for i in range(1, addresses): a = "10.0.0.%d" % i nat_addresses.append(a) + nat_distribution[a] = set() self.nat_add_inside_interface(self.pg0) self.nat_add_outside_interface(self.pg1) @@ -2362,16 +2363,11 @@ class TestNAT44ED(VppTestCase): self.assertTrue(info is not None) self.assertEqual(packet_index, info.index) p_sent = info.data - packed = socket.inet_aton(p_sent[IP].src) - numeric = struct.unpack("!L", packed)[0] - numeric = socket.htonl(numeric) - a = nat_addresses[(numeric - 1) % len(nat_addresses)] - self.assertEqual( - a, - p_recvd[IP].src, - "Invalid packet (src IP %s translated to %s, but expected %s)" - % (p_sent[IP].src, p_recvd[IP].src, a), - ) + self.assertIn(p_recvd[IP].src, nat_distribution) + nat_distribution[p_recvd[IP].src].add(p_sent[IP].src) + + var = variance(map(len, nat_distribution.values()), x / addresses) + self.assertLess(var, 0.33, msg="Bad outside address distribution") def test_dynamic_edge_ports(self): """NAT44ED dynamic translation test: edge ports""" @@ -2943,10 +2939,13 @@ class TestNAT44EDMW(TestNAT44ED): limit = 5 - # 2 interfaces pg0, pg1 (vrf10, limit 1 tcp session) - # non existing vrf_id makes process core dump + # 2 interfaces pg0, pg1 (vrf10, limit 5 tcp sessions) self.vapi.nat44_set_session_limit(session_limit=limit, vrf_id=10) + # expect error when bad is specified + with self.vapi.assert_negative_api_retval(): + self.vapi.nat44_set_session_limit(session_limit=limit, vrf_id=20) + self.nat_add_inside_interface(inside) self.nat_add_inside_interface(inside_vrf10) self.nat_add_outside_interface(outside)