3 # Utility functions for QEMU tests ##
9 def can_create_namespaces(namespace="vpp_chk_4212"):
10 """Check if the environment allows creating the namespaces"""
13 result = subprocess.run(["ip", "netns", "add", namespace], capture_output=True)
14 if result.returncode != 0:
16 result = subprocess.run(["ip", "netns", "del", namespace], capture_output=True)
17 if result.returncode != 0:
24 def create_namespace(ns):
25 """create one or more namespaces.
28 ns -- a string value or an iterable of namespace names
30 if isinstance(ns, str):
35 for namespace in namespaces:
36 result = subprocess.run(["ip", "netns", "add", namespace])
37 if result.returncode != 0:
38 raise Exception(f"Error while creating namespace {namespace}")
39 except subprocess.CalledProcessError as e:
40 raise Exception("Error creating namespace:", e.output)
43 def add_namespace_route(ns, prefix, gw_ip):
44 """Add a route to a namespace.
47 ns -- namespace string value
48 prefix -- NETWORK/MASK or "default"
53 ["ip", "netns", "exec", ns, "ip", "route", "add", prefix, "via", gw_ip],
56 except subprocess.CalledProcessError as e:
57 raise Exception("Error adding route to namespace:", e.output)
60 def delete_host_interfaces(*host_interface_names):
61 """Delete host interfaces.
64 host_interface_names - sequence of host interface names to be deleted
66 for host_interface_name in host_interface_names:
69 ["ip", "link", "del", host_interface_name], capture_output=True
71 except subprocess.CalledProcessError as e:
72 raise Exception("Error deleting host interface:", e.output)
75 def create_host_interface(
76 host_interface_name, vpp_interface_name, host_namespace, *host_ip_prefixes
78 """Create a host interface of type veth.
81 host_interface_name -- name of the veth interface on the host side
82 vpp_interface_name -- name of the veth interface on the VPP side
83 host_namespace -- host namespace into which the host_interface needs to be set
84 host_ip_prefixes -- a sequence of ip/prefix-lengths to be set
88 process = subprocess.run(
103 if process.returncode != 0:
104 print(f"Error creating host interface: {process.stderr}")
107 process = subprocess.run(
108 ["ip", "link", "set", host_interface_name, "netns", host_namespace],
111 if process.returncode != 0:
112 print(f"Error setting host interface namespace: {process.stderr}")
115 process = subprocess.run(
116 ["ip", "link", "set", "dev", vpp_interface_name, "up"], capture_output=True
118 if process.returncode != 0:
119 print(f"Error bringing up the host interface: {process.stderr}")
122 process = subprocess.run(
137 if process.returncode != 0:
139 f"Error bringing up the host interface in namespace: "
144 for host_ip_prefix in host_ip_prefixes:
145 process = subprocess.run(
160 if process.returncode != 0:
162 f"Error setting ip prefix on the host interface: "
166 except subprocess.CalledProcessError as e:
167 raise Exception("Error adding route to namespace:", e.output)
170 def set_interface_mtu(namespace, interface, mtu, logger):
171 """set an mtu number on a linux device interface."""
172 args = ["ip", "link", "set", "mtu", str(mtu), "dev", interface]
174 args = ["ip", "netns", "exec", namespace] + args
177 f"Setting mtu:{mtu} on linux interface:{interface} "
178 f"in namespace:{namespace}"
181 except subprocess.CalledProcessError as e:
182 raise Exception("Error updating mtu:", e.output)
185 def enable_interface_gso(namespace, interface):
186 """enable gso offload on a linux device interface."""
187 args = ["ethtool", "-K", interface, "rx", "on", "tx", "on"]
189 args = ["ip", "netns", "exec", namespace] + args
191 process = subprocess.run(args, capture_output=True)
192 if process.returncode != 0:
194 f"Error enabling GSO offload on linux device interface: "
198 except subprocess.CalledProcessError as e:
199 raise Exception("Error enabling gso:", e.output)
202 def disable_interface_gso(namespace, interface):
203 """disable gso offload on a linux device interface."""
204 args = ["ethtool", "-K", interface, "rx", "off", "tx", "off"]
206 args = ["ip", "netns", "exec", namespace] + args
208 process = subprocess.run(args, capture_output=True)
209 if process.returncode != 0:
211 f"Error disabling GSO offload on linux device interface: "
215 except subprocess.CalledProcessError as e:
216 raise Exception("Error disabling gso:", e.output)
219 def delete_namespace(namespaces):
220 """delete one or more namespaces.
223 namespaces -- a list of namespace names
226 for namespace in namespaces:
227 result = subprocess.run(
228 ["ip", "netns", "del", namespace], capture_output=True
230 if result.returncode != 0:
231 raise Exception(f"Error while deleting namespace {namespace}")
232 except subprocess.CalledProcessError as e:
233 raise Exception("Error deleting namespace:", e.output)
236 def list_namespace(ns):
237 """List the IP address of a namespace"""
239 subprocess.run(["ip", "netns", "exec", ns, "ip", "addr"])
240 except subprocess.CalledProcessError as e:
241 raise Exception("Error listing namespace IP:", e.output)