from vpp_papi import VppEnum
from vpp_ip_route import VppIpRoute, VppRoutePath, FibPathType
from vpp_neighbor import VppNeighbor
+from vpp_ip import VppIpAddress, VppIpPrefix
from scapy.all import bind_layers, Packet, ByteEnumField, ShortField, \
IPField, IntField, LongField, XByteField, FlagsField, FieldLenField, \
PacketListField
cls.vapi.ip_table_add_del(is_add=1, table_id=10)
cls.vapi.ip_table_add_del(is_add=1, table_id=20)
- cls.pg4._local_ip4 = "172.16.255.1"
- cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
+ cls.pg4._local_ip4 = VppIpPrefix("172.16.255.1",
+ cls.pg4.local_ip4_prefix.len)
cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
cls.pg4.set_table_ip4(10)
- cls.pg5._local_ip4 = "172.17.255.3"
- cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
+ cls.pg5._local_ip4 = VppIpPrefix("172.17.255.3",
+ cls.pg5.local_ip4_prefix.len)
cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
cls.pg5.set_table_ip4(10)
- cls.pg6._local_ip4 = "172.16.255.1"
- cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
+ cls.pg6._local_ip4 = VppIpPrefix("172.16.255.1",
+ cls.pg6.local_ip4_prefix.len)
cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
cls.pg6.set_table_ip4(20)
for i in cls.overlapping_interfaces:
cls.pg9.generate_remote_hosts(2)
cls.pg9.config_ip4()
- ip_addr_n = socket.inet_pton(socket.AF_INET, "10.0.0.1")
cls.vapi.sw_interface_add_del_address(
- sw_if_index=cls.pg9.sw_if_index, address=ip_addr_n,
- address_length=24)
+ sw_if_index=cls.pg9.sw_if_index,
+ prefix=VppIpPrefix("10.0.0.1", 24).encode())
+
cls.pg9.admin_up()
cls.pg9.resolve_arp()
cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
sw_if_index=self.pg1.sw_if_index,
is_add=1)
+ reas_cfg1 = self.vapi.nat_get_reass()
+ # this test was intermittently failing in some cases
+ # until we temporarily bump the reassembly timeouts
+ self.vapi.nat_set_reass(timeout=20, max_reass=1024, max_frag=5,
+ drop_frag=0)
+
self.frag_in_order(proto=IP_PROTOS.tcp)
self.frag_in_order(proto=IP_PROTOS.udp)
self.frag_in_order(proto=IP_PROTOS.icmp)
+ # restore the reassembly timeouts
+ self.vapi.nat_set_reass(timeout=reas_cfg1.ip4_timeout,
+ max_reass=reas_cfg1.ip4_max_reass,
+ max_frag=reas_cfg1.ip4_max_frag,
+ drop_frag=reas_cfg1.ip4_drop_frag)
+
def test_frag_forwarding(self):
""" NAT44 forwarding fragment test """
self.vapi.nat44_add_del_interface_addr(
cls.pg4.generate_remote_hosts(2)
cls.pg4.config_ip4()
- ip_addr_n = socket.inet_pton(socket.AF_INET, "10.0.0.1")
cls.vapi.sw_interface_add_del_address(
- sw_if_index=cls.pg4.sw_if_index, address=ip_addr_n,
- address_length=24)
+ sw_if_index=cls.pg4.sw_if_index,
+ prefix=VppIpPrefix("10.0.0.1", 24).encode())
+
cls.pg4.admin_up()
cls.pg4.resolve_arp()
cls.pg4._remote_hosts[1]._ip4 = cls.pg4._remote_hosts[0]._ip4
zero_ip4n = socket.inet_pton(socket.AF_INET, "0.0.0.0")
cls.vapi.ip_table_add_del(is_add=1, table_id=1)
- cls.pg5._local_ip4 = "10.1.1.1"
- cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET,
- cls.pg5.local_ip4)
+ cls.pg5._local_ip4 = VppIpPrefix("10.1.1.1",
+ cls.pg5.local_ip4_prefix.len)
cls.pg5._remote_hosts[0]._ip4 = "10.1.1.2"
cls.pg5._remote_hosts[0]._ip4n = socket.inet_pton(
socket.AF_INET, cls.pg5.remote_ip4)
register=False)
r1.add_vpp_config()
- cls.pg6._local_ip4 = "10.1.2.1"
- cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET,
- cls.pg6.local_ip4)
+ cls.pg6._local_ip4 = VppIpPrefix("10.1.2.1",
+ cls.pg6.local_ip4_prefix.len)
cls.pg6._remote_hosts[0]._ip4 = "10.1.2.2"
cls.pg6._remote_hosts[0]._ip4n = socket.inet_pton(
socket.AF_INET, cls.pg6.remote_ip4)
sw_if_index=self.pg1.sw_if_index,
is_add=1)
self.vapi.nat44_forwarding_enable_disable(enable=True)
+ reas_cfg1 = self.vapi.nat_get_reass()
+ # this test was intermittently failing in some cases
+ # until we temporarily bump the reassembly timeouts
+ self.vapi.nat_set_reass(timeout=20, max_reass=1024, max_frag=5,
+ drop_frag=0)
self.frag_in_order(proto=IP_PROTOS.tcp, dont_translate=True)
+ # restore the reassembly timeouts
+ self.vapi.nat_set_reass(timeout=reas_cfg1.ip4_timeout,
+ max_reass=reas_cfg1.ip4_max_reass,
+ max_frag=reas_cfg1.ip4_max_frag,
+ drop_frag=reas_cfg1.ip4_drop_frag)
def test_frag_out_of_order(self):
""" NAT44 translate fragments arriving out of order """