from scapy.packet import Raw
from syslog_rfc5424_parser import SyslogMessage, ParseError
from syslog_rfc5424_parser.constants import SyslogSeverity
-from util import ppp, ip4_range
+from util import ppp, pr, ip4_range
from vpp_acl import AclRule, VppAcl, VppAclInterface
from vpp_ip_route import VppIpRoute, VppRoutePath
from vpp_papi import VppEnum
self.nat_add_outside_interface(self.pg1)
# in2out (initiate connection)
- p1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ p1 = [Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
- UDP(sport=21, dport=20) / payload)
+ UDP(sport=21, dport=20) / payload,
+ Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+ TCP(sport=21, dport=20, flags="S") / payload,
+ Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+ ICMP(type='echo-request', id=7777) / payload,
+ ]
- self.pg0.add_stream(p1)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
- capture = self.pg1.get_capture(1)[0]
+ capture = self.send_and_expect(self.pg0, p1, self.pg1)
# out2in (send error message)
- p2 = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
+ p2 = [Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
ICMP(type='dest-unreach', code='port-unreachable') /
- capture[IP:])
-
- self.pg1.add_stream(p2)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
+ c[IP:]
+ for c in capture]
- capture = self.pg0.get_capture(1)[0]
+ capture = self.send_and_expect(self.pg1, p2, self.pg0)
- self.logger.info(ppp("p1 packet:", p1))
- self.logger.info(ppp("p2 packet:", p2))
- self.logger.info(ppp("capture packet:", capture))
+ for c in capture:
+ try:
+ assert c[IP].dst == self.pg0.remote_ip4
+ assert c[IPerror].src == self.pg0.remote_ip4
+ except AssertionError as a:
+ raise AssertionError(
+ f"Packet {pr(c)} not translated properly") from a
def test_icmp_echo_reply_trailer(self):
""" ICMP echo reply with ethernet trailer"""
self.vapi.nat44_forwarding_enable_disable(enable=1)
self.nat_add_address(self.nat_addr)
- self.vapi.nat44_interface_add_del_output_feature(
- sw_if_index=self.pg1.sw_if_index, is_add=1,)
+ self.vapi.nat44_ed_add_del_output_interface(
+ sw_if_index=self.pg1.sw_if_index,
+ is_add=1)
# session initiated from service host - translate
pkts = self.create_stream_in(self.pg0, self.pg1)
self.nat_add_address(self.nat_addr)
self.nat_add_outside_interface(self.pg0)
- self.vapi.nat44_interface_add_del_output_feature(
- sw_if_index=self.pg1.sw_if_index, is_add=1)
+ self.vapi.nat44_ed_add_del_output_interface(
+ sw_if_index=self.pg1.sw_if_index,
+ is_add=1)
# in2out
pkts = self.create_stream_in(self.pg0, self.pg1)
self.nat_add_address(self.nat_addr)
flags = self.config_flags.NAT_IS_INSIDE
- self.vapi.nat44_interface_add_del_output_feature(
+ self.vapi.nat44_ed_add_del_output_interface(
sw_if_index=self.pg1.sw_if_index,
is_add=1)
self.vapi.nat44_interface_add_del_feature(
def test_show_max_translations(self):
""" NAT44ED API test - max translations per thread """
- nat_config = self.vapi.nat_show_config_2()
+ config = self.vapi.nat44_show_running_config()
self.assertEqual(self.max_sessions,
- nat_config.max_translations_per_thread)
+ config.sessions)
def test_lru_cleanup(self):
""" NAT44ED LRU cleanup algorithm """
self.tcp_external_port)
# Wait at least the transitory time, the session is in established
- # state anyway. RST followed by a data packet should keep it
- # established.
+ # state anyway. RST followed by a data packet should move it to
+ # transitory state.
self.virtual_sleep(6)
p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
flags="P"))
self.send_and_expect(self.pg0, p, self.pg1)
- # State is established, session should be still open after 6 seconds
- self.virtual_sleep(6)
-
- p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
- IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
- TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
- flags="R"))
- self.send_and_expect(self.pg0, p, self.pg1)
-
# State is transitory, session should be closed after 6 seconds
self.virtual_sleep(6)
# SYN out2in
p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
- TCP(sport=self.tcp_external_port, dport=self.tcp_port_out))
+ TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
+ flags='SA'))
self.send_and_expect(self.pg1, p, self.pg0)
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+ TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
+ flags="A"))
+ self.send_and_expect(self.pg0, p, self.pg1)
+
# FIN in2out
p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
flags="F"))
self.send_and_expect(self.pg1, p, self.pg0)
- # SYN in2out
- p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
- TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
- self.send_and_expect(self.pg0, p, self.pg1)
-
- # SYN out2in
- p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
- IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
- TCP(sport=self.tcp_external_port, dport=self.tcp_port_out))
- self.send_and_expect(self.pg1, p, self.pg0)
+ self.init_tcp_session(self.pg0, self.pg1, self.tcp_port_in,
+ self.tcp_external_port)
# 2 records should be produced - first one del & add
capture = self.pg3.get_capture(2)
def test_tcp_close(self):
""" NAT44ED Close TCP session from inside network - output feature """
- old_timeouts = self.vapi.nat_get_timeouts()
+ config = self.vapi.nat44_show_running_config()
+ old_timeouts = config.timeouts
new_transitory = 2
self.vapi.nat_set_timeouts(
udp=old_timeouts.udp,
p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
TCP(sport=in_port, dport=ext_port,
- flags="S", seq=101, ack=301))
+ flags="SA", seq=101, ack=301))
self.send_and_expect(self.pg0, p, self.pg1)
+ # send ACK packet out -> in
+ p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
+ IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
+ TCP(sport=ext_port, dport=out_port,
+ flags="A", seq=300, ack=101))
+ self.send_and_expect(self.pg1, p, self.pg0)
+
self.virtual_sleep(3)
# send ACK packet in -> out - should be forwarded and session alive
p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
new_vrf_id = 22
self.nat_add_address(self.nat_addr)
- self.vapi.nat44_interface_add_del_output_feature(
- sw_if_index=self.pg8.sw_if_index, is_add=1)
-
+ self.vapi.nat44_ed_add_del_output_interface(
+ sw_if_index=self.pg8.sw_if_index,
+ is_add=1)
try:
self.configure_ip4_interface(self.pg7, table_id=new_vrf_id)
self.configure_ip4_interface(self.pg8, table_id=new_vrf_id)