from io import BytesIO
from random import randint, choice
+import re
import scapy.compat
-from framework import VppTestCase, VppTestRunner
+from framework import VppTestCase, VppLoInterface
+from asfframework import VppTestRunner, tag_fixme_ubuntu2204, is_distro_ubuntu2204
from scapy.data import IP_PROTOS
from scapy.layers.inet import IP, TCP, UDP, ICMP, GRE
from scapy.layers.inet import IPerror, TCPerror
from scapy.layers.l2 import Ether
from scapy.packet import Raw
+from statistics import variance
from syslog_rfc5424_parser import SyslogMessage, ParseError
from syslog_rfc5424_parser.constants import SyslogSeverity
from util import ppp, pr, ip4_range
if not self.vpp_dead:
self.plugin_disable()
- def plugin_enable(self):
- self.vapi.nat44_ed_plugin_enable_disable(sessions=self.max_sessions, enable=1)
+ def plugin_enable(self, max_sessions=None):
+ max_sessions = max_sessions or self.max_sessions
+ self.vapi.nat44_ed_plugin_enable_disable(sessions=max_sessions, enable=1)
def plugin_disable(self):
self.vapi.nat44_ed_plugin_enable_disable(enable=0)
@staticmethod
def random_port():
- return randint(1025, 65535)
+ return randint(1024, 65535)
@staticmethod
def proto2layer(proto):
tag="",
flags=0,
):
-
if not (local_port and external_port):
flags |= self.config_flags.NAT_IS_ADDR_ONLY
@classmethod
def setUpClass(cls):
super().setUpClass()
+ if is_distro_ubuntu2204 == True and not hasattr(cls, "vpp"):
+ return
cls.create_pg_interfaces(range(12))
cls.interfaces = list(cls.pg_interfaces[:4])
return pkts
+ def create_udp_stream(self, in_if, out_if, count, base_port=6303):
+ return [
+ (
+ Ether(dst=in_if.local_mac, src=in_if.remote_mac)
+ / IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=64)
+ / UDP(sport=base_port + i, dport=20)
+ )
+ for i in range(count)
+ ]
+
def create_stream_frag(
self, src_if, dst, sport, dport, data, proto=IP_PROTOS.tcp, echo_reply=False
):
def frag_in_order_in_plus_out(
self, in_addr, out_addr, in_port, out_port, proto=IP_PROTOS.tcp
):
-
layer = self.proto2layer(proto)
if proto == IP_PROTOS.tcp:
def frag_out_of_order_in_plus_out(
self, in_addr, out_addr, in_port, out_port, proto=IP_PROTOS.tcp
):
-
layer = self.proto2layer(proto)
if proto == IP_PROTOS.tcp:
raise
def test_outside_address_distribution(self):
- """Outside address distribution based on source address"""
+ """NAT44ED outside address distribution based on source address"""
+ addresses = 65
x = 100
- nat_addresses = []
- for i in range(1, x):
+ nat_addresses = []
+ nat_distribution = {}
+ for i in range(1, addresses):
a = "10.0.0.%d" % i
nat_addresses.append(a)
+ nat_distribution[a] = set()
self.nat_add_inside_interface(self.pg0)
self.nat_add_outside_interface(self.pg1)
self.assertTrue(info is not None)
self.assertEqual(packet_index, info.index)
p_sent = info.data
- packed = socket.inet_aton(p_sent[IP].src)
- numeric = struct.unpack("!L", packed)[0]
- numeric = socket.htonl(numeric)
- a = nat_addresses[(numeric - 1) % len(nat_addresses)]
+ self.assertIn(p_recvd[IP].src, nat_distribution)
+ nat_distribution[p_recvd[IP].src].add(p_sent[IP].src)
+
+ var = variance(map(len, nat_distribution.values()), x / addresses)
+ self.assertLess(var, 0.33, msg="Bad outside address distribution")
+
+ def test_dynamic_edge_ports(self):
+ """NAT44ED dynamic translation test: edge ports"""
+
+ worker_count = self.vpp_worker_count or 1
+ port_offset = 1024
+ port_per_thread = (65536 - port_offset) // worker_count
+ port_count = port_per_thread * worker_count
+
+ # worker thread edge ports
+ thread_edge_ports = {0, port_offset - 1, 65535}
+ for i in range(0, worker_count):
+ port_thread_offset = (port_per_thread * i) + port_offset
+ for port_range_offset in [0, port_per_thread - 1]:
+ port = port_thread_offset + port_range_offset
+ thread_edge_ports.add(port)
+ thread_drop_ports = set(
+ filter(
+ lambda x: x not in range(port_offset, port_offset + port_count),
+ thread_edge_ports,
+ )
+ )
+
+ in_if = self.pg7
+ out_if = self.pg8
+
+ self.nat_add_address(self.nat_addr)
+
+ try:
+ self.configure_ip4_interface(in_if, hosts=worker_count)
+ self.configure_ip4_interface(out_if)
+
+ self.nat_add_inside_interface(in_if)
+ self.nat_add_outside_interface(out_if)
+
+ # in2out
+ tc1 = self.statistics["/nat44-ed/in2out/slowpath/tcp"]
+ uc1 = self.statistics["/nat44-ed/in2out/slowpath/udp"]
+ ic1 = self.statistics["/nat44-ed/in2out/slowpath/icmp"]
+ dc1 = self.statistics["/nat44-ed/in2out/slowpath/drops"]
+
+ pkt_count = worker_count * len(thread_edge_ports)
+
+ i2o_pkts = [[] for x in range(0, worker_count)]
+ for i in range(0, worker_count):
+ remote_host = in_if.remote_hosts[i]
+ for port in thread_edge_ports:
+ p = (
+ Ether(dst=in_if.local_mac, src=in_if.remote_mac)
+ / IP(src=remote_host.ip4, dst=out_if.remote_ip4)
+ / TCP(sport=port, dport=port)
+ )
+ i2o_pkts[i].append(p)
+
+ p = (
+ Ether(dst=in_if.local_mac, src=in_if.remote_mac)
+ / IP(src=remote_host.ip4, dst=out_if.remote_ip4)
+ / UDP(sport=port, dport=port)
+ )
+ i2o_pkts[i].append(p)
+
+ p = (
+ Ether(dst=in_if.local_mac, src=in_if.remote_mac)
+ / IP(src=remote_host.ip4, dst=out_if.remote_ip4)
+ / ICMP(id=port, seq=port, type="echo-request")
+ )
+ i2o_pkts[i].append(p)
+
+ for i in range(0, worker_count):
+ if len(i2o_pkts[i]) > 0:
+ in_if.add_stream(i2o_pkts[i], worker=i)
+
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = out_if.get_capture(pkt_count * 3)
+ for packet in capture:
+ self.assert_packet_checksums_valid(packet)
+ if packet.haslayer(TCP):
+ self.assert_in_range(
+ packet[TCP].sport,
+ port_offset,
+ port_offset + port_count,
+ "src TCP port",
+ )
+ elif packet.haslayer(UDP):
+ self.assert_in_range(
+ packet[UDP].sport,
+ port_offset,
+ port_offset + port_count,
+ "src UDP port",
+ )
+ elif packet.haslayer(ICMP):
+ self.assert_in_range(
+ packet[ICMP].id,
+ port_offset,
+ port_offset + port_count,
+ "ICMP id",
+ )
+ else:
+ self.fail(
+ ppp("Unexpected or invalid packet (outside network):", packet)
+ )
+
+ if_idx = in_if.sw_if_index
+ tc2 = self.statistics["/nat44-ed/in2out/slowpath/tcp"]
+ uc2 = self.statistics["/nat44-ed/in2out/slowpath/udp"]
+ ic2 = self.statistics["/nat44-ed/in2out/slowpath/icmp"]
+ dc2 = self.statistics["/nat44-ed/in2out/slowpath/drops"]
+
+ self.assertEqual(tc2[:, if_idx].sum() - tc1[:, if_idx].sum(), pkt_count)
+ self.assertEqual(uc2[:, if_idx].sum() - uc1[:, if_idx].sum(), pkt_count)
+ self.assertEqual(ic2[:, if_idx].sum() - ic1[:, if_idx].sum(), pkt_count)
+ self.assertEqual(dc2[:, if_idx].sum() - dc1[:, if_idx].sum(), 0)
+
+ # out2in
+ tc1 = self.statistics["/nat44-ed/out2in/fastpath/tcp"]
+ uc1 = self.statistics["/nat44-ed/out2in/fastpath/udp"]
+ ic1 = self.statistics["/nat44-ed/out2in/fastpath/icmp"]
+ dc1 = self.statistics["/nat44-ed/out2in/fastpath/drops"]
+ dc3 = self.statistics["/nat44-ed/out2in/slowpath/drops"]
+
+ # replies to unchanged thread ports should pass on each worker,
+ # excluding packets outside dynamic port range
+ drop_count = worker_count * len(thread_drop_ports)
+ pass_count = worker_count * len(thread_edge_ports) - drop_count
+
+ o2i_pkts = [[] for x in range(0, worker_count)]
+ for i in range(0, worker_count):
+ for port in thread_edge_ports:
+ p = (
+ Ether(dst=out_if.local_mac, src=out_if.remote_mac)
+ / IP(src=out_if.remote_ip4, dst=self.nat_addr)
+ / TCP(sport=port, dport=port)
+ )
+ o2i_pkts[i].append(p)
+
+ p = (
+ Ether(dst=out_if.local_mac, src=out_if.remote_mac)
+ / IP(src=out_if.remote_ip4, dst=self.nat_addr)
+ / UDP(sport=port, dport=port)
+ )
+ o2i_pkts[i].append(p)
+
+ p = (
+ Ether(dst=out_if.local_mac, src=out_if.remote_mac)
+ / IP(src=out_if.remote_ip4, dst=self.nat_addr)
+ / ICMP(id=port, seq=port, type="echo-reply")
+ )
+ o2i_pkts[i].append(p)
+
+ for i in range(0, worker_count):
+ if len(o2i_pkts[i]) > 0:
+ out_if.add_stream(o2i_pkts[i], worker=i)
+
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = in_if.get_capture(pass_count * 3)
+ for packet in capture:
+ self.assert_packet_checksums_valid(packet)
+ if packet.haslayer(TCP):
+ self.assertIn(packet[TCP].dport, thread_edge_ports, "dst TCP port")
+ self.assertEqual(packet[TCP].dport, packet[TCP].sport, "TCP ports")
+ elif packet.haslayer(UDP):
+ self.assertIn(packet[UDP].dport, thread_edge_ports, "dst UDP port")
+ self.assertEqual(packet[UDP].dport, packet[UDP].sport, "UDP ports")
+ elif packet.haslayer(ICMP):
+ self.assertIn(packet[ICMP].id, thread_edge_ports, "ICMP id")
+ self.assertEqual(packet[ICMP].id, packet[ICMP].seq, "ICMP id & seq")
+ else:
+ self.fail(
+ ppp("Unexpected or invalid packet (inside network):", packet)
+ )
+
+ if_idx = out_if.sw_if_index
+ tc2 = self.statistics["/nat44-ed/out2in/fastpath/tcp"]
+ uc2 = self.statistics["/nat44-ed/out2in/fastpath/udp"]
+ ic2 = self.statistics["/nat44-ed/out2in/fastpath/icmp"]
+ dc2 = self.statistics["/nat44-ed/out2in/fastpath/drops"]
+ dc4 = self.statistics["/nat44-ed/out2in/slowpath/drops"]
+
+ self.assertEqual(tc2[:, if_idx].sum() - tc1[:, if_idx].sum(), pass_count)
+ self.assertEqual(uc2[:, if_idx].sum() - uc1[:, if_idx].sum(), pass_count)
+ self.assertEqual(ic2[:, if_idx].sum() - ic1[:, if_idx].sum(), pass_count)
+ self.assertEqual(dc2[:, if_idx].sum() - dc1[:, if_idx].sum(), 0)
self.assertEqual(
- a,
- p_recvd[IP].src,
- "Invalid packet (src IP %s translated to %s, but expected %s)"
- % (p_sent[IP].src, p_recvd[IP].src, a),
+ dc4[:, if_idx].sum() - dc3[:, if_idx].sum(), drop_count * 3
)
+ finally:
+ in_if.unconfig()
+ out_if.unconfig()
+
+ def test_delete_interface(self):
+ """NAT44ED delete nat interface"""
+
+ self.nat_add_address(self.nat_addr)
+
+ interfaces = self.create_loopback_interfaces(4)
+ self.nat_add_outside_interface(interfaces[0])
+ self.nat_add_inside_interface(interfaces[1])
+ self.nat_add_outside_interface(interfaces[2])
+ self.nat_add_inside_interface(interfaces[2])
+ self.vapi.nat44_ed_add_del_output_interface(
+ sw_if_index=interfaces[3].sw_if_index, is_add=1
+ )
+
+ nat_sw_if_indices = [
+ i.sw_if_index
+ for i in self.vapi.nat44_interface_dump()
+ + list(self.vapi.vpp.details_iter(self.vapi.nat44_ed_output_interface_get))
+ ]
+ self.assertEqual(len(nat_sw_if_indices), len(interfaces))
+
+ loopbacks = []
+ for i in interfaces:
+ # delete nat-enabled interface
+ self.assertIn(i.sw_if_index, nat_sw_if_indices)
+ i.remove_vpp_config()
+
+ # create interface with the same index
+ lo = VppLoInterface(self)
+ loopbacks.append(lo)
+ self.assertEqual(lo.sw_if_index, i.sw_if_index)
+
+ # check interface is not nat-enabled
+ nat_sw_if_indices = [
+ i.sw_if_index
+ for i in self.vapi.nat44_interface_dump()
+ + list(
+ self.vapi.vpp.details_iter(self.vapi.nat44_ed_output_interface_get)
+ )
+ ]
+ self.assertNotIn(lo.sw_if_index, nat_sw_if_indices)
+
+ for i in loopbacks:
+ i.remove_vpp_config()
+
+@tag_fixme_ubuntu2204
class TestNAT44EDMW(TestNAT44ED):
"""NAT44ED MW Test Case"""
limit = 5
- # 2 interfaces pg0, pg1 (vrf10, limit 1 tcp session)
- # non existing vrf_id makes process core dump
+ # 2 interfaces pg0, pg1 (vrf10, limit 5 tcp sessions)
self.vapi.nat44_set_session_limit(session_limit=limit, vrf_id=10)
+ # expect error when bad is specified
+ with self.vapi.assert_negative_api_retval():
+ self.vapi.nat44_set_session_limit(session_limit=limit, vrf_id=20)
+
self.nat_add_inside_interface(inside)
self.nat_add_inside_interface(inside_vrf10)
self.nat_add_outside_interface(outside)
)
self.send_and_expect(self.pg0, p, self.pg1)
+ def test_dynamic_ports_exhausted(self):
+ """NAT44ED dynamic translation test: address ports exhaused"""
+
+ sessions_per_batch = 128
+ n_available_ports = 65536 - 1024
+ n_sessions = n_available_ports + 2 * sessions_per_batch
+
+ # set high enough session limit for ports to be exhausted
+ self.plugin_disable()
+ self.plugin_enable(max_sessions=n_sessions)
+
+ self.nat_add_inside_interface(self.pg0)
+ self.nat_add_outside_interface(self.pg1)
+
+ # set timeouts to high for sessions to reallistically expire
+ config = self.vapi.nat44_show_running_config()
+ old_timeouts = config.timeouts
+ self.vapi.nat_set_timeouts(
+ udp=21600,
+ tcp_established=old_timeouts.tcp_established,
+ tcp_transitory=old_timeouts.tcp_transitory,
+ icmp=old_timeouts.icmp,
+ )
+
+ # in2out after NAT addresses added
+ self.nat_add_address(self.nat_addr)
+
+ for i in range(n_sessions // sessions_per_batch):
+ pkts = self.create_udp_stream(
+ self.pg0,
+ self.pg1,
+ sessions_per_batch,
+ base_port=i * sessions_per_batch + 100,
+ )
+
+ self.pg0.add_stream(pkts)
+ self.pg_start()
+
+ err = self.statistics.get_err_counter(
+ "/err/nat44-ed-in2out-slowpath/out of ports"
+ )
+ if err > sessions_per_batch:
+ break
+
+ # Check for ports to be used no more than once
+ ports = set()
+ sessions = self.vapi.cli("show nat44 sessions")
+ rx = re.compile(
+ f" *o2i flow: match: saddr {self.pg1.remote_ip4} sport [0-9]+ daddr {self.nat_addr} dport ([0-9]+) proto UDP.*"
+ )
+ for line in sessions.splitlines():
+ m = rx.match(line)
+ if m:
+ port = int(m.groups()[0])
+ self.assertNotIn(port, ports)
+ ports.add(port)
+
+ self.assertGreaterEqual(err, sessions_per_batch)
+
if __name__ == "__main__":
unittest.main(testRunner=VppTestRunner)