d01381537e1ee6812684202b61a3dc8c47bbdfe3
[vpp.git] / test / asf / test_session.py
1 #!/usr/bin/env python3
2
3 import unittest
4
5 from asfframework import tag_fixme_vpp_workers
6 from asfframework import VppTestCase, VppTestRunner
7 from asfframework import tag_run_solo
8 from vpp_ip_route import VppIpTable, VppIpRoute, VppRoutePath
9
10
11 @tag_fixme_vpp_workers
12 class TestSession(VppTestCase):
13     """Session Test Case"""
14
15     @classmethod
16     def setUpClass(cls):
17         super(TestSession, cls).setUpClass()
18
19     @classmethod
20     def tearDownClass(cls):
21         super(TestSession, cls).tearDownClass()
22
23     def setUp(self):
24         super(TestSession, self).setUp()
25
26         self.vapi.session_enable_disable(is_enable=1)
27         self.create_loopback_interfaces(2)
28
29         table_id = 0
30
31         for i in self.lo_interfaces:
32             i.admin_up()
33
34             if table_id != 0:
35                 tbl = VppIpTable(self, table_id)
36                 tbl.add_vpp_config()
37
38             i.set_table_ip4(table_id)
39             i.config_ip4()
40             table_id += 1
41
42         # Configure namespaces
43         self.vapi.app_namespace_add_del_v4(
44             namespace_id="0", sw_if_index=self.loop0.sw_if_index
45         )
46         self.vapi.app_namespace_add_del_v4(
47             namespace_id="1", sw_if_index=self.loop1.sw_if_index
48         )
49
50     def tearDown(self):
51         for i in self.lo_interfaces:
52             i.unconfig_ip4()
53             i.set_table_ip4(0)
54             i.admin_down()
55
56         super(TestSession, self).tearDown()
57         self.vapi.session_enable_disable(is_enable=1)
58
59     def test_segment_manager_alloc(self):
60         """Session Segment Manager Multiple Segment Allocation"""
61
62         # Add inter-table routes
63         ip_t01 = VppIpRoute(
64             self,
65             self.loop1.local_ip4,
66             32,
67             [VppRoutePath("0.0.0.0", 0xFFFFFFFF, nh_table_id=1)],
68         )
69         ip_t10 = VppIpRoute(
70             self,
71             self.loop0.local_ip4,
72             32,
73             [VppRoutePath("0.0.0.0", 0xFFFFFFFF, nh_table_id=0)],
74             table_id=1,
75         )
76         ip_t01.add_vpp_config()
77         ip_t10.add_vpp_config()
78
79         # Start builtin server and client with small private segments
80         uri = "tcp://" + self.loop0.local_ip4 + "/1234"
81         error = self.vapi.cli(
82             "test echo server appns 0 fifo-size 64 "
83             + "private-segment-size 1m uri "
84             + uri
85         )
86         if error:
87             self.logger.critical(error)
88             self.assertNotIn("failed", error)
89
90         error = self.vapi.cli(
91             "test echo client nclients 100 appns 1 "
92             + "no-output fifo-size 64 syn-timeout 2 "
93             + "private-segment-size 1m uri "
94             + uri
95         )
96         if error:
97             self.logger.critical(error)
98             self.assertNotIn("failed", error)
99
100         if self.vpp_dead:
101             self.assert_equal(0)
102
103         # Delete inter-table routes
104         ip_t01.remove_vpp_config()
105         ip_t10.remove_vpp_config()
106
107
108 @tag_fixme_vpp_workers
109 class TestSessionUnitTests(VppTestCase):
110     """Session Unit Tests Case"""
111
112     @classmethod
113     def setUpClass(cls):
114         super(TestSessionUnitTests, cls).setUpClass()
115
116     @classmethod
117     def tearDownClass(cls):
118         super(TestSessionUnitTests, cls).tearDownClass()
119
120     def setUp(self):
121         super(TestSessionUnitTests, self).setUp()
122         self.vapi.session_enable_disable(is_enable=1)
123
124     def test_session(self):
125         """Session Unit Tests"""
126         error = self.vapi.cli("test session all")
127
128         if error:
129             self.logger.critical(error)
130         self.assertNotIn("failed", error)
131
132     def tearDown(self):
133         super(TestSessionUnitTests, self).tearDown()
134         self.vapi.session_enable_disable(is_enable=0)
135
136
137 @tag_run_solo
138 class TestSegmentManagerTests(VppTestCase):
139     """SVM Fifo Unit Tests Case"""
140
141     @classmethod
142     def setUpClass(cls):
143         super(TestSegmentManagerTests, cls).setUpClass()
144
145     @classmethod
146     def tearDownClass(cls):
147         super(TestSegmentManagerTests, cls).tearDownClass()
148
149     def setUp(self):
150         super(TestSegmentManagerTests, self).setUp()
151
152     def test_segment_manager(self):
153         """Segment manager Tests"""
154         error = self.vapi.cli("test segment-manager all")
155
156         if error:
157             self.logger.critical(error)
158         self.assertNotIn("failed", error)
159
160     def tearDown(self):
161         super(TestSegmentManagerTests, self).tearDown()
162
163
164 @tag_run_solo
165 class TestSvmFifoUnitTests(VppTestCase):
166     """SVM Fifo Unit Tests Case"""
167
168     @classmethod
169     def setUpClass(cls):
170         super(TestSvmFifoUnitTests, cls).setUpClass()
171
172     @classmethod
173     def tearDownClass(cls):
174         super(TestSvmFifoUnitTests, cls).tearDownClass()
175
176     def setUp(self):
177         super(TestSvmFifoUnitTests, self).setUp()
178
179     def test_svm_fifo(self):
180         """SVM Fifo Unit Tests"""
181         error = self.vapi.cli("test svm fifo all")
182
183         if error:
184             self.logger.critical(error)
185         self.assertNotIn("failed", error)
186
187     def tearDown(self):
188         super(TestSvmFifoUnitTests, self).tearDown()
189
190
191 if __name__ == "__main__":
192     unittest.main(testRunner=VppTestRunner)