3 # Utility functions for QEMU tests ##
9 def can_create_namespaces():
10 """Check if the environment allows creating the namespaces"""
13 namespace = "vpp_chk_4212"
14 result = subprocess.run(["ip", "netns", "add", namespace], capture_output=True)
15 if result.returncode != 0:
17 result = subprocess.run(["ip", "netns", "del", namespace], capture_output=True)
18 if result.returncode != 0:
25 def create_namespace(ns):
26 """create one or more namespaces.
29 ns -- a string value or an iterable of namespace names
31 if isinstance(ns, str):
36 for namespace in namespaces:
37 result = subprocess.run(["ip", "netns", "add", namespace])
38 if result.returncode != 0:
39 raise Exception(f"Error while creating namespace {namespace}")
40 except subprocess.CalledProcessError as e:
41 raise Exception("Error creating namespace:", e.output)
44 def add_namespace_route(ns, prefix, gw_ip):
45 """Add a route to a namespace.
48 ns -- namespace string value
49 prefix -- NETWORK/MASK or "default"
54 ["ip", "netns", "exec", ns, "ip", "route", "add", prefix, "via", gw_ip],
57 except subprocess.CalledProcessError as e:
58 raise Exception("Error adding route to namespace:", e.output)
61 def delete_host_interfaces(*host_interface_names):
62 """Delete host interfaces.
65 host_interface_names - sequence of host interface names to be deleted
67 for host_interface_name in host_interface_names:
70 ["ip", "link", "del", host_interface_name], capture_output=True
72 except subprocess.CalledProcessError as e:
73 raise Exception("Error deleting host interface:", e.output)
76 def create_host_interface(
77 host_interface_name, vpp_interface_name, host_namespace, *host_ip_prefixes
79 """Create a host interface of type veth.
82 host_interface_name -- name of the veth interface on the host side
83 vpp_interface_name -- name of the veth interface on the VPP side
84 host_namespace -- host namespace into which the host_interface needs to be set
85 host_ip_prefixes -- a sequence of ip/prefix-lengths to be set
89 process = subprocess.run(
104 if process.returncode != 0:
105 print(f"Error creating host interface: {process.stderr}")
108 process = subprocess.run(
109 ["ip", "link", "set", host_interface_name, "netns", host_namespace],
112 if process.returncode != 0:
113 print(f"Error setting host interface namespace: {process.stderr}")
116 process = subprocess.run(
117 ["ip", "link", "set", "dev", vpp_interface_name, "up"], capture_output=True
119 if process.returncode != 0:
120 print(f"Error bringing up the host interface: {process.stderr}")
123 process = subprocess.run(
138 if process.returncode != 0:
140 f"Error bringing up the host interface in namespace: "
145 for host_ip_prefix in host_ip_prefixes:
146 process = subprocess.run(
161 if process.returncode != 0:
163 f"Error setting ip prefix on the host interface: "
167 except subprocess.CalledProcessError as e:
168 raise Exception("Error adding route to namespace:", e.output)
171 def set_interface_mtu(namespace, interface, mtu, logger):
172 """set an mtu number on a linux device interface."""
173 args = ["ip", "link", "set", "mtu", str(mtu), "dev", interface]
175 args = ["ip", "netns", "exec", namespace] + args
178 f"Setting mtu:{mtu} on linux interface:{interface} "
179 f"in namespace:{namespace}"
182 except subprocess.CalledProcessError as e:
183 raise Exception("Error updating mtu:", e.output)
186 def enable_interface_gso(namespace, interface):
187 """enable gso offload on a linux device interface."""
188 args = ["ethtool", "-K", interface, "rx", "on", "tx", "on"]
190 args = ["ip", "netns", "exec", namespace] + args
192 process = subprocess.run(args, capture_output=True)
193 if process.returncode != 0:
195 f"Error enabling GSO offload on linux device interface: "
199 except subprocess.CalledProcessError as e:
200 raise Exception("Error enabling gso:", e.output)
203 def disable_interface_gso(namespace, interface):
204 """disable gso offload on a linux device interface."""
205 args = ["ethtool", "-K", interface, "rx", "off", "tx", "off"]
207 args = ["ip", "netns", "exec", namespace] + args
209 process = subprocess.run(args, capture_output=True)
210 if process.returncode != 0:
212 f"Error disabling GSO offload on linux device interface: "
216 except subprocess.CalledProcessError as e:
217 raise Exception("Error disabling gso:", e.output)
220 def delete_namespace(namespaces):
221 """delete one or more namespaces.
224 namespaces -- a list of namespace names
227 for namespace in namespaces:
228 result = subprocess.run(
229 ["ip", "netns", "del", namespace], capture_output=True
231 if result.returncode != 0:
232 raise Exception(f"Error while deleting namespace {namespace}")
233 except subprocess.CalledProcessError as e:
234 raise Exception("Error deleting namespace:", e.output)
237 def list_namespace(ns):
238 """List the IP address of a namespace"""
240 subprocess.run(["ip", "netns", "exec", ns, "ip", "addr"])
241 except subprocess.CalledProcessError as e:
242 raise Exception("Error listing namespace IP:", e.output)