import re
import scapy.compat
-from framework import tag_fixme_ubuntu2204, is_distro_ubuntu2204
-from framework import VppTestCase, VppTestRunner, VppLoInterface
+from framework import VppTestCase, VppLoInterface
+from asfframework import VppTestRunner, tag_fixme_ubuntu2204, is_distro_ubuntu2204
from scapy.data import IP_PROTOS
from scapy.layers.inet import IP, TCP, UDP, ICMP, GRE
from scapy.layers.inet import IPerror, TCPerror
from scapy.layers.l2 import Ether
from scapy.packet import Raw
+from statistics import variance
from syslog_rfc5424_parser import SyslogMessage, ParseError
from syslog_rfc5424_parser.constants import SyslogSeverity
from util import ppp, pr, ip4_range
tag="",
flags=0,
):
-
if not (local_port and external_port):
flags |= self.config_flags.NAT_IS_ADDR_ONLY
def frag_in_order_in_plus_out(
self, in_addr, out_addr, in_port, out_port, proto=IP_PROTOS.tcp
):
-
layer = self.proto2layer(proto)
if proto == IP_PROTOS.tcp:
def frag_out_of_order_in_plus_out(
self, in_addr, out_addr, in_port, out_port, proto=IP_PROTOS.tcp
):
-
layer = self.proto2layer(proto)
if proto == IP_PROTOS.tcp:
raise
def test_outside_address_distribution(self):
- """Outside address distribution based on source address"""
+ """NAT44ED outside address distribution based on source address"""
+ addresses = 65
x = 100
- nat_addresses = []
- for i in range(1, x):
+ nat_addresses = []
+ nat_distribution = {}
+ for i in range(1, addresses):
a = "10.0.0.%d" % i
nat_addresses.append(a)
+ nat_distribution[a] = set()
self.nat_add_inside_interface(self.pg0)
self.nat_add_outside_interface(self.pg1)
self.assertTrue(info is not None)
self.assertEqual(packet_index, info.index)
p_sent = info.data
- packed = socket.inet_aton(p_sent[IP].src)
- numeric = struct.unpack("!L", packed)[0]
- numeric = socket.htonl(numeric)
- a = nat_addresses[(numeric - 1) % len(nat_addresses)]
- self.assertEqual(
- a,
- p_recvd[IP].src,
- "Invalid packet (src IP %s translated to %s, but expected %s)"
- % (p_sent[IP].src, p_recvd[IP].src, a),
- )
+ self.assertIn(p_recvd[IP].src, nat_distribution)
+ nat_distribution[p_recvd[IP].src].add(p_sent[IP].src)
+
+ var = variance(map(len, nat_distribution.values()), x / addresses)
+ self.assertLess(var, 0.33, msg="Bad outside address distribution")
def test_dynamic_edge_ports(self):
"""NAT44ED dynamic translation test: edge ports"""
limit = 5
- # 2 interfaces pg0, pg1 (vrf10, limit 1 tcp session)
- # non existing vrf_id makes process core dump
+ # 2 interfaces pg0, pg1 (vrf10, limit 5 tcp sessions)
self.vapi.nat44_set_session_limit(session_limit=limit, vrf_id=10)
+ # expect error when bad is specified
+ with self.vapi.assert_negative_api_retval():
+ self.vapi.nat44_set_session_limit(session_limit=limit, vrf_id=20)
+
self.nat_add_inside_interface(inside)
self.nat_add_inside_interface(inside_vrf10)
self.nat_add_outside_interface(outside)