3 # Utility functions for QEMU tests ##
9 def create_namespace(ns):
10 """create one or more namespaces.
13 ns -- a string value or an iterable of namespace names
15 if isinstance(ns, str):
20 for namespace in namespaces:
21 subprocess.run(["ip", "netns", "add", namespace])
22 except subprocess.CalledProcessError as e:
23 raise Exception("Error creating namespace:", e.output)
26 def add_namespace_route(ns, prefix, gw_ip):
27 """Add a route to a namespace.
30 ns -- namespace string value
31 prefix -- NETWORK/MASK or "default"
36 ["ip", "netns", "exec", ns, "ip", "route", "add", prefix, "via", gw_ip],
39 except subprocess.CalledProcessError as e:
40 raise Exception("Error adding route to namespace:", e.output)
43 def delete_host_interfaces(*host_interface_names):
44 """Delete host interfaces.
47 host_interface_names - sequence of host interface names to be deleted
49 for host_interface_name in host_interface_names:
52 ["ip", "link", "del", host_interface_name], capture_output=True
54 except subprocess.CalledProcessError as e:
55 raise Exception("Error deleting host interface:", e.output)
58 def create_host_interface(
59 host_interface_name, vpp_interface_name, host_namespace, *host_ip_prefixes
61 """Create a host interface of type veth.
64 host_interface_name -- name of the veth interface on the host side
65 vpp_interface_name -- name of the veth interface on the VPP side
66 host_namespace -- host namespace into which the host_interface needs to be set
67 host_ip_prefixes -- a sequence of ip/prefix-lengths to be set
71 process = subprocess.run(
86 if process.returncode != 0:
87 print(f"Error creating host interface: {process.stderr}")
90 process = subprocess.run(
91 ["ip", "link", "set", host_interface_name, "netns", host_namespace],
94 if process.returncode != 0:
95 print(f"Error setting host interface namespace: {process.stderr}")
98 process = subprocess.run(
99 ["ip", "link", "set", "dev", vpp_interface_name, "up"], capture_output=True
101 if process.returncode != 0:
102 print(f"Error bringing up the host interface: {process.stderr}")
105 process = subprocess.run(
120 if process.returncode != 0:
122 f"Error bringing up the host interface in namespace: "
127 for host_ip_prefix in host_ip_prefixes:
128 process = subprocess.run(
143 if process.returncode != 0:
145 f"Error setting ip prefix on the host interface: "
149 except subprocess.CalledProcessError as e:
150 raise Exception("Error adding route to namespace:", e.output)
153 def set_interface_mtu(namespace, interface, mtu, logger):
154 """set an mtu number on a linux device interface."""
155 args = ["ip", "link", "set", "mtu", str(mtu), "dev", interface]
157 args = ["ip", "netns", "exec", namespace] + args
160 f"Setting mtu:{mtu} on linux interface:{interface} "
161 f"in namespace:{namespace}"
164 except subprocess.CalledProcessError as e:
165 raise Exception("Error updating mtu:", e.output)
168 def enable_interface_gso(namespace, interface):
169 """enable gso offload on a linux device interface."""
170 args = ["ethtool", "-K", interface, "rx", "on", "tx", "on"]
172 args = ["ip", "netns", "exec", namespace] + args
174 process = subprocess.run(args, capture_output=True)
175 if process.returncode != 0:
177 f"Error enabling GSO offload on linux device interface: "
181 except subprocess.CalledProcessError as e:
182 raise Exception("Error enabling gso:", e.output)
185 def disable_interface_gso(namespace, interface):
186 """disable gso offload on a linux device interface."""
187 args = ["ethtool", "-K", interface, "rx", "off", "tx", "off"]
189 args = ["ip", "netns", "exec", namespace] + args
191 process = subprocess.run(args, capture_output=True)
192 if process.returncode != 0:
194 f"Error disabling GSO offload on linux device interface: "
198 except subprocess.CalledProcessError as e:
199 raise Exception("Error disabling gso:", e.output)
202 def delete_namespace(namespaces):
203 """delete one or more namespaces.
206 namespaces -- a list of namespace names
209 for namespace in namespaces:
210 subprocess.run(["ip", "netns", "del", namespace], capture_output=True)
211 except subprocess.CalledProcessError as e:
212 raise Exception("Error deleting namespace:", e.output)
215 def list_namespace(ns):
216 """List the IP address of a namespace"""
218 subprocess.run(["ip", "netns", "exec", ns, "ip", "addr"])
219 except subprocess.CalledProcessError as e:
220 raise Exception("Error listing namespace IP:", e.output)