import struct
import unittest
from io import BytesIO
-from time import sleep
import scapy.compat
from framework import VppTestCase, VppTestRunner
self.assertEqual(struct.pack("!H", self.udp_port_out),
record[227])
else:
- self.fail("Invalid protocol")
+ self.fail(f"Invalid protocol {scapy.compat.orb(record[4])}")
self.assertEqual(3, nat44_ses_create_num)
self.assertEqual(3, nat44_ses_delete_num)
# Client side - generate traffic
pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
- self.pg0.add_stream(pkts)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
+ capture = self.send_and_expect_some(self.pg0, pkts, self.pg0)
# Client side - verify ICMP type 11 packets
- capture = self.pg0.get_capture(len(pkts))
self.verify_capture_in_with_icmp_errors(capture, self.pg0)
def test_dynamic_icmp_errors_out2in_ttl_1(self):
capture = self.pg1.get_capture(len(pkts))
self.verify_capture_out(capture)
pkts = self.create_stream_out(self.pg1, ttl=1)
- self.pg1.add_stream(pkts)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
+ capture = self.send_and_expect_some(self.pg1, pkts, self.pg1)
# Server side - verify ICMP type 11 packets
- capture = self.pg1.get_capture(len(pkts))
self.verify_capture_out_with_icmp_errors(capture,
src_ip=self.pg1.local_ip4)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
self.pg1.assert_nothing_captured()
- sleep(1)
self.vapi.ipfix_flush()
capture = self.pg3.get_capture(7)
ipfix = IPFIXDecoder()
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
self.pg1.assert_nothing_captured()
- sleep(1)
self.vapi.ipfix_flush()
capture = self.pg3.get_capture(7)
ipfix = IPFIXDecoder()
def test_output_feature(self):
""" NAT44EI output feature (in2out postrouting) """
self.nat44_add_address(self.nat_addr)
- flags = self.config_flags.NAT44_EI_IF_INSIDE
- self.vapi.nat44_ei_interface_add_del_output_feature(
- is_add=1, flags=flags,
- sw_if_index=self.pg0.sw_if_index)
- self.vapi.nat44_ei_interface_add_del_output_feature(
- is_add=1, flags=flags,
- sw_if_index=self.pg1.sw_if_index)
- self.vapi.nat44_ei_interface_add_del_output_feature(
- is_add=1,
- sw_if_index=self.pg3.sw_if_index)
+ self.vapi.nat44_ei_add_del_output_interface(
+ sw_if_index=self.pg3.sw_if_index, is_add=1)
# in2out
pkts = self.create_stream_in(self.pg0, self.pg3)
self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
- flags = self.config_flags.NAT44_EI_IF_INSIDE
- self.vapi.nat44_ei_interface_add_del_output_feature(
- is_add=1, flags=flags,
- sw_if_index=self.pg4.sw_if_index)
- self.vapi.nat44_ei_interface_add_del_output_feature(
- is_add=1, flags=flags,
- sw_if_index=self.pg6.sw_if_index)
- self.vapi.nat44_ei_interface_add_del_output_feature(
- is_add=1,
- sw_if_index=self.pg3.sw_if_index)
+ self.vapi.nat44_ei_add_del_output_interface(
+ sw_if_index=self.pg3.sw_if_index, is_add=1)
# in2out VRF 10
pkts = self.create_stream_in(self.pg4, self.pg3)
server_out_port = 8765
self.nat44_add_address(self.nat_addr)
- flags = self.config_flags.NAT44_EI_IF_INSIDE
- self.vapi.nat44_ei_interface_add_del_output_feature(
- is_add=1, flags=flags,
- sw_if_index=self.pg0.sw_if_index)
- self.vapi.nat44_ei_interface_add_del_output_feature(
- is_add=1,
- sw_if_index=self.pg1.sw_if_index)
+ self.vapi.nat44_ei_add_del_output_interface(
+ sw_if_index=self.pg0.sw_if_index, is_add=1)
+ self.vapi.nat44_ei_add_del_output_interface(
+ sw_if_index=self.pg1.sw_if_index, is_add=1)
# add static mapping for server
self.nat44_add_static_mapping(server.ip4, self.nat_addr,
# do not send ACK, active retry send HA event again
self.pg_enable_capture(self.pg_interfaces)
- sleep(12)
+ self.virtual_sleep(12)
stats = self.statistics['/nat44-ei/ha/retry-count']
self.assertEqual(stats[:, 0].sum(), 3)
stats = self.statistics['/nat44-ei/ha/missed-count']
return self.vapi.nat44_ei_show_fq_options().frame_queue_nelts
def test_set_frame_queue_nelts(self):
- """ NAT44 EI API test - worker handoff frame queue elements """
+ """ NAT44EI API test - worker handoff frame queue elements """
self.assertEqual(self.reconfigure_frame_queue_nelts(512), 512)
def show_commands_at_teardown(self):
"Invalid packet (src IP %s translated to %s, but expected %s)"
% (p_sent[IP].src, p_recvd[IP].src, a))
+ def test_default_user_sessions(self):
+ """ NAT44EI default per-user session limit is used and reported """
+ nat44_ei_config = self.vapi.nat44_ei_show_running_config()
+ # a nonzero default should be reported for user_sessions
+ self.assertNotEqual(nat44_ei_config.user_sessions, 0)
+
class TestNAT44Out2InDPO(MethodHolder):
""" NAT44EI Test Cases using out2in DPO """