tests: run a test inside a QEMU VM
[vpp.git] / test / test_vm_tap.py
1 #!/usr/bin/env python3
2 import unittest
3 from ipaddress import ip_interface
4 from vpp_qemu_utils import create_namespace
5 from vpp_iperf import VppIperf
6 from framework import VppTestCase, VppTestRunner
7 from config import config
8
9
10 class TestTapQemu(VppTestCase):
11     """Test Tap interfaces inside a QEMU VM.
12
13     Start an iPerf connection stream between QEMU and VPP via
14     tap v2 interfaces.
15
16     Linux_ns1 -- iperf_client -- tap1 -- VPP-BD -- tap2 --
17                               -- iperfServer -- Linux_ns2
18     """
19
20     @classmethod
21     def setUpClass(cls):
22         super(TestTapQemu, cls).setUpClass()
23
24     @classmethod
25     def tearDownClass(cls):
26         super(TestTapQemu, cls).tearDownClass()
27
28     def setUp(self):
29         """Perform test setup before running QEMU tests.
30
31         1. Create a namespace for the iPerf Server & Client.
32         2. Create 2 tap interfaces in VPP & add them to each namespace.
33         3. Add the tap interfaces to a bridge-domain.
34         """
35         super(TestTapQemu, self).setUp()
36         self.client_namespace = "iprf_client_ns"
37         self.server_namespace = "iprf_server_ns"
38         self.client_ip4_prefix = "10.0.0.101/24"
39         self.server_ip4_prefix = "10.0.0.102/24"
40         create_namespace(self.client_namespace)
41         create_namespace(self.server_namespace)
42         tap1_if_idx = self.create_tap(
43             101, self.client_namespace, self.client_ip4_prefix
44         )
45         tap2_if_idx = self.create_tap(
46             102, self.server_namespace, self.server_ip4_prefix
47         )
48         self.l2_connect_interfaces(tap1_if_idx, tap2_if_idx)
49
50     def create_tap(self, id, host_namespace, host_ip4_prefix):
51         result = self.vapi.api(
52             self.vapi.papi.tap_create_v2,
53             {
54                 "id": id,
55                 "use_random_mac": True,
56                 "host_namespace_set": True,
57                 "host_namespace": host_namespace,
58                 "host_if_name_set": False,
59                 "host_bridge_set": False,
60                 "host_mac_addr_set": False,
61                 "host_ip4_prefix": ip_interface(host_ip4_prefix),
62                 "host_ip4_prefix_set": True,
63             },
64         )
65         sw_if_index = result.sw_if_index
66         self.vapi.api(
67             self.vapi.papi.sw_interface_set_flags,
68             {"sw_if_index": sw_if_index, "flags": 1},
69         )
70         return sw_if_index
71
72     def dump_vpp_tap_interfaces(self):
73         return self.vapi.api(self.vapi.papi.sw_interface_tap_v2_dump, {})
74
75     def dump_bridge_domain_details(self):
76         return self.vapi.api(self.vapi.papi.bridge_domain_dump, {"bd_id": 1})
77
78     def l2_connect_interfaces(self, *sw_if_idxs):
79         for if_idx in sw_if_idxs:
80             self.vapi.api(
81                 self.vapi.papi.sw_interface_set_l2_bridge,
82                 {
83                     "rx_sw_if_index": if_idx,
84                     "bd_id": 1,
85                     "shg": 0,
86                     "port_type": 0,
87                     "enable": True,
88                 },
89             )
90
91     @unittest.skipUnless(config.extended, "part of extended tests")
92     def test_tap_iperf(self):
93         """Start an iperf connection stream between QEMU & VPP via tap."""
94         iperf = VppIperf()
95         iperf.client_ns = self.client_namespace
96         iperf.server_ns = self.server_namespace
97         iperf.server_ip = str(ip_interface(self.server_ip4_prefix).ip)
98         iperf.start()
99
100
101 if __name__ == "__main__":
102     unittest.main(testRunner=VppTestRunner)