import sys
import os
from vm_test_config import test_config
+import random
+import string
+import subprocess
#
# Tests for:
# Test Config variables
af_packet_config = test_config["af_packet"]
+af_xdp_config = test_config["af_xdp"]
layer2 = test_config["L2"]
layer3 = test_config["L3"]
generated tests are set as attributes.
"""
+ def get_valid_mtus(test):
+ client_if_types = test["client_if_type"].split(",")
+ server_if_types = test["server_if_type"].split(",")
+ contains_af_xdp = any(
+ if_type == "af_xdp" for if_type in client_if_types + server_if_types
+ )
+
+ # MTU <= 2048 Bytes for af_xdp interfaces
+ if contains_af_xdp:
+ return [mtu for mtu in test_config["mtus"] if mtu <= 2048]
+ else:
+ return test_config["mtus"]
+
for test in tests:
for ip_version in test_config["ip_versions"]:
- for mtu in test_config["mtus"]:
+ for mtu in get_valid_mtus(test):
test_name = (
f"test_id_{test['id']}_"
+ f"client_{test['client_if_type']}"
self.memif_interfaces.append(self.ingress_if_idx)
self.ingress_if_idxes.append(self.ingress_if_idx)
self.vpp_interfaces.append(self.ingress_if_idx)
+ elif client_if_type == "af_xdp":
+ self.ingress_if_idx = self.create_af_xdp(
+ namespace=self.client_namespace,
+ host_side_name=af_xdp_config["iprf_client_interface_on_linux"],
+ vpp_side_name=af_xdp_config["iprf_client_interface_on_vpp"],
+ ip4_prefix=(
+ layer2["client_ip4_prefix"]
+ if x_connect_mode == "L2"
+ else layer3["client_ip4_prefix"]
+ ),
+ ip6_prefix=(
+ layer2["client_ip6_prefix"]
+ if x_connect_mode == "L2"
+ else layer3["client_ip6_prefix"]
+ ),
+ version=client_if_version,
+ )
else:
print(
f"Unsupported client interface type: {client_if_type} "
self.memif_interfaces.append(self.egress_if_idx)
self.egress_if_idxes.append(self.egress_if_idx)
self.vpp_interfaces.append(self.egress_if_idx)
+ elif server_if_type == "af_xdp":
+ self.egress_if_idx = self.create_af_xdp(
+ namespace=self.server_namespace,
+ host_side_name=af_xdp_config["iprf_server_interface_on_linux"],
+ vpp_side_name=af_xdp_config["iprf_server_interface_on_vpp"],
+ ip4_prefix=server_ip4_prefix,
+ ip6_prefix=server_ip6_prefix,
+ version=server_if_version,
+ )
else:
print(
f"Unsupported server interface type: {server_if_type} "
time.sleep(2)
def tearDown(self):
- try:
- for interface_if_idx in self.tap_interfaces:
+ # Delete tap interfaces
+ for interface_if_idx in self.tap_interfaces:
+ try:
self.vapi.tap_delete_v2(sw_if_index=interface_if_idx)
- except Exception:
- pass
- try:
- for interface_if_idx in self.memif_interfaces:
+ except Exception:
+ pass
+
+ # Delete memif interfaces
+ for interface_if_idx in self.memif_interfaces:
+ try:
self.vapi.memif_delete(sw_if_index=interface_if_idx)
- except Exception:
- pass
+ except Exception:
+ pass
+
+ # Delete af_packet interfaces
try:
for interface in self.vapi.af_packet_dump():
if interface.host_if_name == self.iprf_client_host_interface_on_vpp:
- self.vapi.af_packet_delete(self.iprf_client_host_interface_on_vpp)
+ try:
+ self.vapi.af_packet_delete(
+ self.iprf_client_host_interface_on_vpp
+ )
+ except Exception:
+ pass
elif interface.host_if_name == self.iprf_server_host_interface_on_vpp:
- self.vapi.af_packet_delete(self.iprf_server_host_interface_on_vpp)
+ try:
+ self.vapi.af_packet_delete(
+ self.iprf_server_host_interface_on_vpp
+ )
+ except Exception:
+ pass
except Exception:
pass
+
+ # Delete AF_XDP interfaces
+ if hasattr(self, "af_xdp_interfaces"):
+ for interface_if_idx in self.af_xdp_interfaces:
+ try:
+ self.vapi.af_xdp_delete(interface_if_idx)
+ except Exception:
+ pass
+
+ # Delete host interfaces
try:
delete_all_host_interfaces(self.if_history_name)
except Exception:
pass
+
+ # Delete VRF tables
try:
self.vapi.ip_table_add_del_v2(
is_add=0, table={"table_id": layer3["ip4_vrf"]}
)
except Exception:
pass
+
+ # Delete bridge domains
try:
- self.vapi.bridge_domain_add_del_v2(bd_id=1, is_add=0)
- self.vapi.bridge_domain_add_del_v2(bd_id=2, is_add=0)
+ # Check if bridge domains exist before trying to delete them
+ for bd_id in [1, 2]:
+ try:
+ bd_details = self.vapi.bridge_domain_dump(bd_id=bd_id)
+ if len(bd_details) > 0:
+ self.vapi.bridge_domain_add_del_v2(bd_id=bd_id, is_add=0)
+ except Exception:
+ pass
except Exception:
pass
+
+ # Clean up namespaces and processes
try:
delete_all_namespaces(self.ns_history_file)
except Exception:
except Exception:
pass
try:
- if self.memif_process:
+ if hasattr(self, "memif_process") and self.memif_process:
self.memif_process.terminate()
self.memif_process.join()
except Exception:
else:
return False
+ def create_af_xdp(
+ self, namespace, host_side_name, vpp_side_name, ip4_prefix, ip6_prefix, version
+ ):
+ """Create an AF_XDP interface and configure it in VPP and Linux."""
+ try:
+ # Generate unique random suffixes for interface names to prevent conflicts
+ random_suffix = "".join(
+ random.choices(string.ascii_lowercase + string.digits, k=8)
+ )
+ unique_host_side_name = f"{host_side_name}{random_suffix}"
+ unique_vpp_side_name = f"{vpp_side_name}{random_suffix}"
+
+ self.logger.debug(
+ f"Creating AF_XDP interfaces with names: {unique_host_side_name} and {unique_vpp_side_name}"
+ )
+
+ # Clean up any existing interfaces with the same name
+ os.system(
+ f"ip netns exec {namespace} ip link del {unique_host_side_name} 2>/dev/null || true"
+ )
+ os.system(f"ip link del {unique_vpp_side_name} 2>/dev/null || true")
+
+ # Create the host interface
+ create_host_interface(
+ self.if_history_name,
+ namespace,
+ ip4_prefix,
+ ip6_prefix,
+ vpp_if_name=unique_vpp_side_name,
+ host_if_name=unique_host_side_name,
+ )
+
+ # Verify that the host interfaces were created successfully
+ max_check_attempts = 5
+ check_interval = 0.5
+
+ # Check for VPP-side interface
+ for attempt in range(max_check_attempts):
+ vpp_if_check = subprocess.run(
+ ["ip", "link", "show", "dev", unique_vpp_side_name],
+ capture_output=True,
+ text=True,
+ )
+ if vpp_if_check.returncode == 0:
+ self.logger.debug(
+ f"VPP-side interface {unique_vpp_side_name} exists"
+ )
+ break
+ self.logger.warning(
+ f"VPP-side interface {unique_vpp_side_name} not ready, attempt {attempt+1}/{max_check_attempts}"
+ )
+ time.sleep(check_interval)
+ else:
+ raise Exception(
+ f"VPP-side interface {unique_vpp_side_name} does not exist after {max_check_attempts} checks"
+ )
+
+ # Check for namespace-side interface
+ for attempt in range(max_check_attempts):
+ host_if_check = subprocess.run(
+ [
+ "ip",
+ "netns",
+ "exec",
+ namespace,
+ "ip",
+ "link",
+ "show",
+ "dev",
+ unique_host_side_name,
+ ],
+ capture_output=True,
+ text=True,
+ )
+ if host_if_check.returncode == 0:
+ self.logger.debug(
+ f"Host-side interface {unique_host_side_name} exists in namespace {namespace}"
+ )
+ break
+ self.logger.warning(
+ f"Host-side interface {unique_host_side_name} not ready, attempt {attempt+1}/{max_check_attempts}"
+ )
+ time.sleep(check_interval)
+ else:
+ raise Exception(
+ f"Host-side interface {unique_host_side_name} does not exist in namespace {namespace} after {max_check_attempts} checks"
+ )
+
+ # Add delay to ensure host interface is fully initialized
+ time.sleep(1)
+
+ api_args = {
+ "host_if": unique_vpp_side_name,
+ "rxq_num": 1,
+ }
+
+ # Clean any stale XDP sockets
+ os.system(
+ f"rm -f /dev/shm/vpp_*{unique_vpp_side_name}* 2>/dev/null || true"
+ )
+
+ # Set retry mechanism to ensure correct AF_XDP creation
+ retries = 3
+ last_error = None
+
+ for attempt in range(retries):
+ try:
+ if version == 1:
+ result = self.vapi.af_xdp_create(**api_args)
+ elif version == 2:
+ result = self.vapi.af_xdp_create_v2(**api_args)
+ elif version == 3:
+ result = self.vapi.af_xdp_create_v3(**api_args)
+ else:
+ raise ValueError(f"Unsupported AF_XDP version: {version}")
+ break
+ except Exception as e:
+ last_error = e
+ self.logger.warning(
+ f"AF_XDP creation attempt {attempt+1} failed: {e}"
+ )
+ time.sleep(1) # Wait before retry
+ else:
+ # All retries failed
+ raise Exception(
+ f"Failed to create AF_XDP interface after {retries} attempts: {last_error}"
+ )
+
+ sw_if_index = result.sw_if_index
+
+ # Set interface up
+ self.vapi.sw_interface_set_flags(sw_if_index=sw_if_index, flags=1)
+
+ # Add the interface to the VPP interface list
+ self.vpp_interfaces.append(sw_if_index)
+ self.linux_interfaces.append(["", unique_vpp_side_name])
+ self.linux_interfaces.append([namespace, unique_host_side_name])
+
+ # Track AF_XDP interfaces for tearDown
+ if not hasattr(self, "af_xdp_interfaces"):
+ self.af_xdp_interfaces = []
+ self.af_xdp_interfaces.append(sw_if_index)
+
+ # Add to ingress/egress lists based on namespace
+ if namespace == self.client_namespace:
+ self.ingress_if_idxes.append(sw_if_index)
+ elif namespace == self.server_namespace:
+ self.egress_if_idxes.append(sw_if_index)
+
+ # AF_XDP doesn't support GSO/checksum offload, so disable them
+ disable_interface_gso("", unique_vpp_side_name)
+ disable_interface_gso(namespace, unique_host_side_name)
+
+ return sw_if_index
+
+ except Exception as e:
+ self.logger.error(f"Error creating AF_XDP interface: {e}")
+ # Cleanup on failure
+ try:
+ os.system(
+ f"ip netns exec {namespace} ip link del {unique_host_side_name} 2>/dev/null || true"
+ )
+ os.system(f"ip link del {unique_vpp_side_name} 2>/dev/null || true")
+ except:
+ pass
+ raise
+
if __name__ == "__main__":
unittest.main(testRunner=VppTestRunner)