3 from ipaddress import ip_interface
4 from vpp_qemu_utils import create_namespace
5 from vpp_iperf import VppIperf
6 from framework import VppTestCase, VppTestRunner
7 from config import config
10 class TestTapQemu(VppTestCase):
11 """Test Tap interfaces inside a QEMU VM.
13 Start an iPerf connection stream between QEMU and VPP via
16 Linux_ns1 -- iperf_client -- tap1 -- VPP-BD -- tap2 --
17 -- iperfServer -- Linux_ns2
22 super(TestTapQemu, cls).setUpClass()
25 def tearDownClass(cls):
26 super(TestTapQemu, cls).tearDownClass()
29 """Perform test setup before running QEMU tests.
31 1. Create a namespace for the iPerf Server & Client.
32 2. Create 2 tap interfaces in VPP & add them to each namespace.
33 3. Add the tap interfaces to a bridge-domain.
35 super(TestTapQemu, self).setUp()
36 self.client_namespace = "iprf_client_ns"
37 self.server_namespace = "iprf_server_ns"
38 self.client_ip4_prefix = "10.0.0.101/24"
39 self.server_ip4_prefix = "10.0.0.102/24"
40 create_namespace(self.client_namespace)
41 create_namespace(self.server_namespace)
42 tap1_if_idx = self.create_tap(
43 101, self.client_namespace, self.client_ip4_prefix
45 tap2_if_idx = self.create_tap(
46 102, self.server_namespace, self.server_ip4_prefix
48 self.l2_connect_interfaces(tap1_if_idx, tap2_if_idx)
50 def create_tap(self, id, host_namespace, host_ip4_prefix):
51 result = self.vapi.api(
52 self.vapi.papi.tap_create_v2,
55 "use_random_mac": True,
56 "host_namespace_set": True,
57 "host_namespace": host_namespace,
58 "host_if_name_set": False,
59 "host_bridge_set": False,
60 "host_mac_addr_set": False,
61 "host_ip4_prefix": ip_interface(host_ip4_prefix),
62 "host_ip4_prefix_set": True,
65 sw_if_index = result.sw_if_index
67 self.vapi.papi.sw_interface_set_flags,
68 {"sw_if_index": sw_if_index, "flags": 1},
72 def dump_vpp_tap_interfaces(self):
73 return self.vapi.api(self.vapi.papi.sw_interface_tap_v2_dump, {})
75 def dump_bridge_domain_details(self):
76 return self.vapi.api(self.vapi.papi.bridge_domain_dump, {"bd_id": 1})
78 def l2_connect_interfaces(self, *sw_if_idxs):
79 for if_idx in sw_if_idxs:
81 self.vapi.papi.sw_interface_set_l2_bridge,
83 "rx_sw_if_index": if_idx,
91 @unittest.skipUnless(config.extended, "part of extended tests")
92 def test_tap_iperf(self):
93 """Start an iperf connection stream between QEMU & VPP via tap."""
95 iperf.client_ns = self.client_namespace
96 iperf.server_ns = self.server_namespace
97 iperf.server_ip = str(ip_interface(self.server_ip4_prefix).ip)
101 if __name__ == "__main__":
102 unittest.main(testRunner=VppTestRunner)