class TestNAT44ED(NAT44EDTestCase):
""" NAT44ED Test Case """
+ def test_icmp_error(self):
+ """ NAT44ED test ICMP error message with inner header"""
+
+ payload = "H" * 10
+
+ self.nat_add_address(self.nat_addr)
+ self.nat_add_inside_interface(self.pg0)
+ self.nat_add_outside_interface(self.pg1)
+
+ # in2out (initiate connection)
+ p1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+ UDP(sport=21, dport=20) / payload)
+
+ self.pg0.add_stream(p1)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(1)[0]
+
+ # out2in (send error message)
+ p2 = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
+ IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
+ ICMP(type='dest-unreach', code='port-unreachable') /
+ capture[IP:])
+
+ self.pg1.add_stream(p2)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ capture = self.pg0.get_capture(1)[0]
+
+ self.logger.info(ppp("p1 packet:", p1))
+ self.logger.info(ppp("p2 packet:", p2))
+ self.logger.info(ppp("capture packet:", capture))
+
+ def test_icmp_echo_reply_trailer(self):
+ """ ICMP echo reply with ethernet trailer"""
+
+ self.nat_add_address(self.nat_addr)
+ self.nat_add_inside_interface(self.pg0)
+ self.nat_add_outside_interface(self.pg1)
+
+ # in2out
+ p1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+ ICMP(type=8, id=0xabcd, seq=0))
+
+ self.pg0.add_stream(p1)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ c = self.pg1.get_capture(1)[0]
+
+ self.logger.debug(self.vapi.cli("show trace"))
+
+ # out2in
+ p2 = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
+ IP(src=self.pg1.remote_ip4, dst=self.nat_addr, id=0xee59) /
+ ICMP(type=0, id=c[ICMP].id, seq=0))
+
+ # force checksum calculation
+ p2 = p2.__class__(bytes(p2))
+
+ self.logger.debug(ppp("Packet before modification:", p2))
+
+ # hex representation of vss monitoring ethernet trailer
+ # this seems to be just added to end of packet without modifying
+ # IP or ICMP lengths / checksums
+ p2 = p2 / Raw("\x00\x00\x52\x54\x00\x46\xab\x04\x84\x18")
+ # change it so that IP/ICMP is unaffected
+ p2[IP].len = 28
+
+ self.logger.debug(ppp("Packet with added trailer:", p2))
+
+ self.pg1.add_stream(p2)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ self.pg0.get_capture(1)
+
def test_users_dump(self):
""" NAT44ED API test - nat44_user_dump """
self.pg_start()
capture = self.pg1.get_capture(len(pkts))
self.verify_capture_out(capture, ignore_port=True)
+ self.logger.debug(self.vapi.cli("show trace"))
# out2in
pkts = self.create_stream_out(self.pg1)
self.pg_start()
capture = self.pg0.get_capture(len(pkts))
self.verify_capture_in(capture, self.pg0)
+ self.logger.debug(self.vapi.cli("show trace"))
+
+ # in2out
+ pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(len(pkts))
+ self.verify_capture_out(capture, ignore_port=True)
+ self.logger.debug(self.vapi.cli("show trace"))
+
+ # out2in
+ pkts = self.create_stream_out(self.pg1, ttl=2)
+ self.pg1.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture(len(pkts))
+ self.verify_capture_in(capture, self.pg0)
+ self.logger.debug(self.vapi.cli("show trace"))
+
+ # in2out
+ pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture(len(pkts))
+ for p in capture:
+ self.assertIn(ICMP, p)
+ self.assertEqual(p[ICMP].type, 11) # 11 == time-exceeded
def test_static_with_port_out2(self):
""" NAT44ED 1:1 NAPT asymmetrical rule """
for p in capture:
self.assertEqual(p[IP].dst, backend)
- def test_multiple_vrf(self):
- """ NAT44ED Multiple VRF setup """
+ def test_multiple_vrf_1(self):
+ """ Multiple VRF - both client & service in VRF1 """
external_addr = '1.2.3.4'
external_port = 80
local_port = 8080
port = 0
- self.vapi.nat44_forwarding_enable_disable(enable=1)
- self.nat_add_address(self.nat_addr)
flags = self.config_flags.NAT_IS_INSIDE
- self.vapi.nat44_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- is_add=1)
- self.vapi.nat44_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- is_add=1, flags=flags)
- self.vapi.nat44_interface_add_del_output_feature(
- sw_if_index=self.pg1.sw_if_index,
- is_add=1)
self.vapi.nat44_interface_add_del_feature(
sw_if_index=self.pg5.sw_if_index,
is_add=1)
self.nat_add_static_mapping(self.pg5.remote_ip4, external_addr,
local_port, external_port, vrf_id=1,
proto=IP_PROTOS.tcp, flags=flags)
- self.nat_add_static_mapping(
- self.pg0.remote_ip4,
- external_sw_if_index=self.pg0.sw_if_index,
- local_port=local_port,
- vrf_id=0,
- external_port=external_port,
- proto=IP_PROTOS.tcp,
- flags=flags
- )
- # from client to service (both VRF1)
p = (Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac) /
IP(src=self.pg6.remote_ip4, dst=external_addr) /
TCP(sport=12345, dport=external_port))
self.logger.error(ppp("Unexpected or invalid packet:", p))
raise
- # from service back to client (both VRF1)
p = (Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac) /
IP(src=self.pg5.remote_ip4, dst=self.pg6.remote_ip4) /
TCP(sport=local_port, dport=12345))
self.logger.error(ppp("Unexpected or invalid packet:", p))
raise
- # dynamic NAT from VRF1 to VRF0 (output-feature)
+ def test_multiple_vrf_2(self):
+ """ Multiple VRF - dynamic NAT from VRF1 to VRF0 (output-feature) """
+
+ external_addr = '1.2.3.4'
+ external_port = 80
+ local_port = 8080
+ port = 0
+
+ self.nat_add_address(self.nat_addr)
+ flags = self.config_flags.NAT_IS_INSIDE
+ self.vapi.nat44_interface_add_del_output_feature(
+ sw_if_index=self.pg1.sw_if_index,
+ is_add=1)
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=self.pg5.sw_if_index,
+ is_add=1)
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=self.pg5.sw_if_index,
+ is_add=1, flags=flags)
+ flags = self.config_flags.NAT_IS_OUT2IN_ONLY
+ self.nat_add_static_mapping(self.pg5.remote_ip4, external_addr,
+ local_port, external_port, vrf_id=1,
+ proto=IP_PROTOS.tcp, flags=flags)
+
p = (Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac) /
IP(src=self.pg5.remote_ip4, dst=self.pg1.remote_ip4) /
TCP(sport=2345, dport=22))
self.logger.error(ppp("Unexpected or invalid packet:", p))
raise
+ def test_multiple_vrf_3(self):
+ """ Multiple VRF - client in VRF1, service in VRF0 """
+
+ external_addr = '1.2.3.4'
+ external_port = 80
+ local_port = 8080
+ port = 0
+
+ flags = self.config_flags.NAT_IS_INSIDE
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=self.pg0.sw_if_index,
+ is_add=1)
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=self.pg0.sw_if_index,
+ is_add=1, flags=flags)
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=self.pg6.sw_if_index,
+ is_add=1)
+ flags = self.config_flags.NAT_IS_OUT2IN_ONLY
+ self.nat_add_static_mapping(
+ self.pg0.remote_ip4,
+ external_sw_if_index=self.pg0.sw_if_index,
+ local_port=local_port,
+ vrf_id=0,
+ external_port=external_port,
+ proto=IP_PROTOS.tcp,
+ flags=flags
+ )
+
# from client VRF1 to service VRF0
p = (Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac) /
IP(src=self.pg6.remote_ip4, dst=self.pg0.local_ip4) /
self.logger.error(ppp("Unexpected or invalid packet:", p))
raise
+ def test_multiple_vrf_4(self):
+ """ Multiple VRF - client in VRF0, service in VRF1 """
+
+ external_addr = '1.2.3.4'
+ external_port = 80
+ local_port = 8080
+ port = 0
+
+ flags = self.config_flags.NAT_IS_INSIDE
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=self.pg0.sw_if_index,
+ is_add=1)
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=self.pg0.sw_if_index,
+ is_add=1, flags=flags)
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=self.pg5.sw_if_index,
+ is_add=1)
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=self.pg5.sw_if_index,
+ is_add=1, flags=flags)
+ flags = self.config_flags.NAT_IS_OUT2IN_ONLY
+ self.nat_add_static_mapping(self.pg5.remote_ip4, external_addr,
+ local_port, external_port, vrf_id=1,
+ proto=IP_PROTOS.tcp, flags=flags)
+
# from client VRF0 to service VRF1
p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
IP(src=self.pg0.remote_ip4, dst=external_addr) /
self.logger.error(ppp("Unexpected or invalid packet:", p))
raise
+ def test_multiple_vrf_5(self):
+ """ Multiple VRF - forwarding - no translation """
+
+ external_addr = '1.2.3.4'
+ external_port = 80
+ local_port = 8080
+ port = 0
+
+ self.vapi.nat44_forwarding_enable_disable(enable=1)
+ flags = self.config_flags.NAT_IS_INSIDE
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=self.pg0.sw_if_index,
+ is_add=1)
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=self.pg0.sw_if_index,
+ is_add=1, flags=flags)
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=self.pg5.sw_if_index,
+ is_add=1)
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=self.pg5.sw_if_index,
+ is_add=1, flags=flags)
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=self.pg6.sw_if_index,
+ is_add=1)
+ flags = self.config_flags.NAT_IS_OUT2IN_ONLY
+ self.nat_add_static_mapping(self.pg5.remote_ip4, external_addr,
+ local_port, external_port, vrf_id=1,
+ proto=IP_PROTOS.tcp, flags=flags)
+ self.nat_add_static_mapping(
+ self.pg0.remote_ip4,
+ external_sw_if_index=self.pg0.sw_if_index,
+ local_port=local_port,
+ vrf_id=0,
+ external_port=external_port,
+ proto=IP_PROTOS.tcp,
+ flags=flags
+ )
+
# from client to server (both VRF1, no translation)
p = (Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac) /
IP(src=self.pg6.remote_ip4, dst=self.pg5.remote_ip4) /
self.nat_add_address(self.nat_addr)
self.vapi.nat44_interface_add_del_output_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=self.config_flags.NAT_IS_INSIDE, is_add=1)
- self.vapi.nat44_interface_add_del_output_feature(
- sw_if_index=self.pg1.sw_if_index,
- flags=self.config_flags.NAT_IS_OUTSIDE, is_add=1)
+ sw_if_index=self.pg1.sw_if_index, is_add=1)
# First ensure that the NAT is working sans ACL
new_vrf_id = 22
self.nat_add_address(self.nat_addr)
- flags = self.config_flags.NAT_IS_INSIDE
self.vapi.nat44_interface_add_del_output_feature(
- sw_if_index=self.pg7.sw_if_index,
- flags=flags, is_add=1)
- self.vapi.nat44_interface_add_del_output_feature(
- sw_if_index=self.pg8.sw_if_index,
- is_add=1)
+ sw_if_index=self.pg8.sw_if_index, is_add=1)
try:
self.configure_ip4_interface(self.pg7, table_id=new_vrf_id)