from syslog_rfc5424_parser.constants import SyslogFacility, SyslogSeverity
from io import BytesIO
from vpp_papi import VppEnum
+from vpp_ip_route import VppIpRoute, VppRoutePath, FibPathType
+from vpp_neighbor import VppNeighbor
from scapy.all import bind_layers, Packet, ByteEnumField, ShortField, \
IPField, IntField, LongField, XByteField, FlagsField, FieldLenField, \
PacketListField
Clear NAT44 configuration.
"""
if hasattr(self, 'pg7') and hasattr(self, 'pg8'):
- # I found no elegant way to do this
- self.vapi.ip_add_del_route(
- dst_address=self.pg7.remote_ip4n,
- dst_address_length=32,
- next_hop_address=self.pg7.remote_ip4n,
- next_hop_sw_if_index=self.pg7.sw_if_index,
- is_add=0)
- self.vapi.ip_add_del_route(
- dst_address=self.pg8.remote_ip4n,
- dst_address_length=32,
- next_hop_address=self.pg8.remote_ip4n,
- next_hop_sw_if_index=self.pg8.sw_if_index,
- is_add=0)
-
- for intf in [self.pg7, self.pg8]:
- self.vapi.ip_neighbor_add_del(
- intf.sw_if_index,
- intf.remote_mac,
- intf.remote_ip4,
- flags=(VppEnum.vl_api_ip_neighbor_flags_t.
- IP_API_NEIGHBOR_FLAG_STATIC),
- is_add=0)
-
if self.pg7.has_ip4_config:
self.pg7.unconfig_ip4()
capture = self.pg1.get_capture(len(pkts))
self.verify_capture_out(capture)
self.nat44_add_address(self.nat_addr, is_add=0)
- self.vapi.cli("ipfix flush") # FIXME this should be an API call
+ self.vapi.ipfix_flush()
capture = self.pg3.get_capture(9)
ipfix = IPFIXDecoder()
# first load template
self.pg_start()
self.pg1.assert_nothing_captured()
sleep(1)
- self.vapi.cli("ipfix flush") # FIXME this should be an API call
+ self.vapi.ipfix_flush()
capture = self.pg3.get_capture(9)
ipfix = IPFIXDecoder()
# first load template
self.pg_start()
self.pg1.assert_nothing_captured()
sleep(1)
- self.vapi.cli("ipfix flush") # FIXME this should be an API call
+ self.vapi.ipfix_flush()
capture = self.pg3.get_capture(9)
ipfix = IPFIXDecoder()
# first load template
capture = self.pg2.get_capture(len(pkts))
self.verify_capture_out(capture, nat_ip1)
+ def create_routes_and_neigbors(self):
+ r1 = VppIpRoute(self, self.pg7.remote_ip4, 32,
+ [VppRoutePath(self.pg7.remote_ip4,
+ self.pg7.sw_if_index)])
+ r2 = VppIpRoute(self, self.pg8.remote_ip4, 32,
+ [VppRoutePath(self.pg8.remote_ip4,
+ self.pg8.sw_if_index)])
+ r1.add_vpp_config()
+ r2.add_vpp_config()
+
+ n1 = VppNeighbor(self,
+ self.pg7.sw_if_index,
+ self.pg7.remote_mac,
+ self.pg7.remote_ip4,
+ is_static=1)
+ n2 = VppNeighbor(self,
+ self.pg8.sw_if_index,
+ self.pg8.remote_mac,
+ self.pg8.remote_ip4,
+ is_static=1)
+ n1.add_vpp_config()
+ n2.add_vpp_config()
+
def test_dynamic_ipless_interfaces(self):
""" NAT44 interfaces without configured IP address """
-
- self.vapi.ip_neighbor_add_del(
- self.pg7.sw_if_index,
- self.pg7.remote_mac,
- self.pg7.remote_ip4,
- flags=(VppEnum.vl_api_ip_neighbor_flags_t.
- IP_API_NEIGHBOR_FLAG_STATIC))
- self.vapi.ip_neighbor_add_del(
- self.pg8.sw_if_index,
- self.pg8.remote_mac,
- self.pg8.remote_ip4,
- flags=(VppEnum.vl_api_ip_neighbor_flags_t.
- IP_API_NEIGHBOR_FLAG_STATIC))
-
- self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
- dst_address_length=32,
- next_hop_address=self.pg7.remote_ip4n,
- next_hop_sw_if_index=self.pg7.sw_if_index)
- self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
- dst_address_length=32,
- next_hop_address=self.pg8.remote_ip4n,
- next_hop_sw_if_index=self.pg8.sw_if_index)
-
+ self.create_routes_and_neigbors()
self.nat44_add_address(self.nat_addr)
flags = self.config_flags.NAT_IS_INSIDE
self.vapi.nat44_interface_add_del_feature(
def test_static_ipless_interfaces(self):
""" NAT44 interfaces without configured IP address - 1:1 NAT """
- self.vapi.ip_neighbor_add_del(
- self.pg7.sw_if_index,
- self.pg7.remote_mac,
- self.pg7.remote_ip4,
- flags=(VppEnum.vl_api_ip_neighbor_flags_t.
- IP_API_NEIGHBOR_FLAG_STATIC))
- self.vapi.ip_neighbor_add_del(
- self.pg8.sw_if_index,
- self.pg8.remote_mac,
- self.pg8.remote_ip4,
- flags=(VppEnum.vl_api_ip_neighbor_flags_t.
- IP_API_NEIGHBOR_FLAG_STATIC))
-
- self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
- dst_address_length=32,
- next_hop_address=self.pg7.remote_ip4n,
- next_hop_sw_if_index=self.pg7.sw_if_index)
- self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
- dst_address_length=32,
- next_hop_address=self.pg8.remote_ip4n,
- next_hop_sw_if_index=self.pg8.sw_if_index)
-
+ self.create_routes_and_neigbors()
self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
flags = self.config_flags.NAT_IS_INSIDE
self.vapi.nat44_interface_add_del_feature(
self.udp_port_out = 30607
self.icmp_id_out = 30608
- self.vapi.ip_neighbor_add_del(
- self.pg7.sw_if_index,
- self.pg7.remote_mac,
- self.pg7.remote_ip4,
- flags=(VppEnum.vl_api_ip_neighbor_flags_t.
- IP_API_NEIGHBOR_FLAG_STATIC))
- self.vapi.ip_neighbor_add_del(
- self.pg8.sw_if_index,
- self.pg8.remote_mac,
- self.pg8.remote_ip4,
- flags=(VppEnum.vl_api_ip_neighbor_flags_t.
- IP_API_NEIGHBOR_FLAG_STATIC))
-
- self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
- dst_address_length=32,
- next_hop_address=self.pg7.remote_ip4n,
- next_hop_sw_if_index=self.pg7.sw_if_index)
- self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
- dst_address_length=32,
- next_hop_address=self.pg8.remote_ip4n,
- next_hop_sw_if_index=self.pg8.sw_if_index)
-
+ self.create_routes_and_neigbors()
self.nat44_add_address(self.nat_addr)
self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
self.tcp_port_in, self.tcp_port_out,
nat_ip_vrf10 = "10.0.0.10"
nat_ip_vrf20 = "10.0.0.20"
- self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
- dst_address_length=32,
- next_hop_address=self.pg3.remote_ip4n,
- next_hop_sw_if_index=self.pg3.sw_if_index,
- table_id=10)
- self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
- dst_address_length=32,
- next_hop_address=self.pg3.remote_ip4n,
- next_hop_sw_if_index=self.pg3.sw_if_index,
- table_id=20)
+ r1 = VppIpRoute(self, self.pg3.remote_ip4, 32,
+ [VppRoutePath(self.pg3.remote_ip4,
+ self.pg3.sw_if_index)],
+ table_id=10)
+ r2 = VppIpRoute(self, self.pg3.remote_ip4, 32,
+ [VppRoutePath(self.pg3.remote_ip4,
+ self.pg3.sw_if_index)],
+ table_id=20)
+ r1.add_vpp_config()
+ r2.add_vpp_config()
self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
sw_if_index=self.pg1.sw_if_index,
is_add=1)
+ reas_cfg1 = self.vapi.nat_get_reass()
+ # this test was intermittently failing in some cases
+ # until we temporarily bump the reassembly timeouts
+ self.vapi.nat_set_reass(timeout=20, max_reass=1024, max_frag=5,
+ drop_frag=0)
+
self.frag_in_order(proto=IP_PROTOS.tcp)
self.frag_in_order(proto=IP_PROTOS.udp)
self.frag_in_order(proto=IP_PROTOS.icmp)
+ # restore the reassembly timeouts
+ self.vapi.nat_set_reass(timeout=reas_cfg1.ip4_timeout,
+ max_reass=reas_cfg1.ip4_max_reass,
+ max_frag=reas_cfg1.ip4_max_frag,
+ drop_frag=reas_cfg1.ip4_drop_frag)
+
def test_frag_forwarding(self):
""" NAT44 forwarding fragment test """
self.vapi.nat44_add_del_interface_addr(
self.pg_start()
self.pg1.assert_nothing_captured()
sleep(1)
- self.vapi.cli("ipfix flush") # FIXME this should be an API call
+ self.vapi.ipfix_flush()
capture = self.pg3.get_capture(9)
ipfix = IPFIXDecoder()
# first load template
cls.pg5.set_table_ip4(1)
cls.pg5.config_ip4()
cls.pg5.admin_up()
- cls.vapi.ip_add_del_route(dst_address=cls.pg5.remote_ip4n,
- dst_address_length=32,
- next_hop_address=zero_ip4n,
- next_hop_sw_if_index=cls.pg5.sw_if_index,
- table_id=1)
+ r1 = VppIpRoute(cls, cls.pg5.remote_ip4, 32,
+ [VppRoutePath("0.0.0.0",
+ cls.pg5.sw_if_index)],
+ table_id=1,
+ register=False)
+ r1.add_vpp_config()
cls.pg6._local_ip4 = "10.1.2.1"
cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET,
cls.pg6.set_table_ip4(1)
cls.pg6.config_ip4()
cls.pg6.admin_up()
- cls.vapi.ip_add_del_route(dst_address=cls.pg6.remote_ip4n,
- dst_address_length=32,
- next_hop_address=zero_ip4n,
- next_hop_sw_if_index=cls.pg6.sw_if_index,
- table_id=1)
-
- cls.vapi.ip_add_del_route(dst_address=cls.pg6.remote_ip4n,
- dst_address_length=16,
- next_hop_address=zero_ip4n, table_id=0,
- next_hop_table_id=1)
- cls.vapi.ip_add_del_route(dst_address=zero_ip4n,
- dst_address_length=0,
- next_hop_address=zero_ip4n, table_id=1,
- next_hop_table_id=0)
- cls.vapi.ip_add_del_route(dst_address=zero_ip4n,
- dst_address_length=0,
- next_hop_address=cls.pg1.local_ip4n,
- next_hop_sw_if_index=cls.pg1.sw_if_index,
- table_id=0)
+
+ r2 = VppIpRoute(cls, cls.pg6.remote_ip4, 32,
+ [VppRoutePath("0.0.0.0",
+ cls.pg6.sw_if_index)],
+ table_id=1,
+ register=False)
+ r3 = VppIpRoute(cls, cls.pg6.remote_ip4, 16,
+ [VppRoutePath("0.0.0.0",
+ 0xffffffff,
+ nh_table_id=1)],
+ table_id=0,
+ register=False)
+ r4 = VppIpRoute(cls, "0.0.0.0", 0,
+ [VppRoutePath("0.0.0.0", 0xffffffff,
+ nh_table_id=0)],
+ table_id=1,
+ register=False)
+ r5 = VppIpRoute(cls, "0.0.0.0", 0,
+ [VppRoutePath(cls.pg1.local_ip4,
+ cls.pg1.sw_if_index)],
+ register=False)
+ r2.add_vpp_config()
+ r3.add_vpp_config()
+ r4.add_vpp_config()
+ r5.add_vpp_config()
cls.pg5.resolve_arp()
cls.pg6.resolve_arp()
sw_if_index=self.pg1.sw_if_index,
is_add=1)
self.vapi.nat44_forwarding_enable_disable(enable=True)
+ reas_cfg1 = self.vapi.nat_get_reass()
+ # this test was intermittently failing in some cases
+ # until we temporarily bump the reassembly timeouts
+ self.vapi.nat_set_reass(timeout=20, max_reass=1024, max_frag=5,
+ drop_frag=0)
self.frag_in_order(proto=IP_PROTOS.tcp, dont_translate=True)
+ # restore the reassembly timeouts
+ self.vapi.nat_set_reass(timeout=reas_cfg1.ip4_timeout,
+ max_reass=reas_cfg1.ip4_max_reass,
+ max_frag=reas_cfg1.ip4_max_frag,
+ drop_frag=reas_cfg1.ip4_drop_frag)
def test_frag_out_of_order(self):
""" NAT44 translate fragments arriving out of order """
self.config_flags.NAT_IS_EXT_HOST_VALID)
self.assertTrue(sessions[0].flags &
self.config_flags.NAT_IS_TWICE_NAT)
- self.logger.error(self.vapi.cli("show nat44 sessions detail"))
+ self.logger.info(self.vapi.cli("show nat44 sessions detail"))
self.vapi.nat44_del_session(
address=sessions[0].inside_ip_address,
port=sessions[0].inside_port,
capture = self.pg1.assert_nothing_captured()
# verify IPFIX logging
- self.vapi.cli("ipfix flush") # FIXME this should be an API call
+ self.vapi.ipfix_flush()
sleep(1)
capture = self.pg2.get_capture(10)
ipfix = IPFIXDecoder()
cls.pg1.config_ip6()
cls.pg1.resolve_ndp()
- cls.vapi.ip_add_del_route(dst_address=b'\x00' * 16,
- dst_address_length=0,
- next_hop_address=cls.pg1.remote_ip6n,
- next_hop_sw_if_index=cls.pg1.sw_if_index,
- is_ipv6=True)
+ r1 = VppIpRoute(cls, "::", 0,
+ [VppRoutePath(cls.pg1.remote_ip6,
+ cls.pg1.sw_if_index)],
+ register=False)
+ r1.add_vpp_config()
except Exception:
super(TestNAT44Out2InDPO, cls).tearDownClass()
self.assertEqual(1000, dms[0].ses_num)
# verify IPFIX logging
- self.vapi.cli("ipfix flush") # FIXME this should be an API call
+ self.vapi.ipfix_flush()
sleep(1)
capture = self.pg2.get_capture(2)
ipfix = IPFIXDecoder()
self.pg_start()
self.pg1.assert_nothing_captured()
sleep(1)
- self.vapi.cli("ipfix flush") # FIXME this should be an API call
+ self.vapi.ipfix_flush()
capture = self.pg3.get_capture(9)
ipfix = IPFIXDecoder()
# first load template
self.pg_start()
self.pg1.assert_nothing_captured()
sleep(1)
- self.vapi.cli("ipfix flush") # FIXME this should be an API call
+ self.vapi.ipfix_flush()
capture = self.pg3.get_capture(1)
# verify events in data set
for p in capture:
self.pg_start()
self.pg1.assert_nothing_captured()
sleep(1)
- self.vapi.cli("ipfix flush") # FIXME this should be an API call
+ self.vapi.ipfix_flush()
capture = self.pg3.get_capture(9)
ipfix = IPFIXDecoder()
# first load template
self.pg_start()
p = self.pg1.get_capture(1)
self.tcp_port_out = p[0][TCP].sport
- self.vapi.cli("ipfix flush") # FIXME this should be an API call
+ self.vapi.ipfix_flush()
capture = self.pg3.get_capture(10)
ipfix = IPFIXDecoder()
# first load template
end_addr=self.nat_addr,
vrf_id=0xFFFFFFFF,
is_add=0)
- self.vapi.cli("ipfix flush") # FIXME this should be an API call
+ self.vapi.ipfix_flush()
capture = self.pg3.get_capture(2)
# verify events in data set
for p in capture:
aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
self.vapi.dslite_set_aftr_addr(ip4_addr=aftr_ip4, ip6_addr=aftr_ip6)
- self.vapi.ip_add_del_route(dst_address=aftr_ip6_n,
- dst_address_length=128,
- next_hop_address=self.pg1.remote_ip6n,
- next_hop_sw_if_index=self.pg1.sw_if_index,
- is_ipv6=1)
+ r1 = VppIpRoute(self, aftr_ip6, 128,
+ [VppRoutePath(self.pg1.remote_ip6,
+ self.pg1.sw_if_index)])
+ r1.add_vpp_config()
# UDP encapsulation
p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /