8e03968d19af4bcf7198faf66cb5ed8b97152d6d
[vpp.git] / test / test_session.py
1 #!/usr/bin/env python3
2
3 import unittest
4
5 from framework import tag_fixme_vpp_workers
6 from framework import VppTestCase, VppTestRunner
7 from framework import tag_run_solo
8 from vpp_ip_route import VppIpTable, VppIpRoute, VppRoutePath
9
10
11 @tag_fixme_vpp_workers
12 class TestSession(VppTestCase):
13     """ Session Test Case """
14
15     @classmethod
16     def setUpClass(cls):
17         super(TestSession, cls).setUpClass()
18
19     @classmethod
20     def tearDownClass(cls):
21         super(TestSession, cls).tearDownClass()
22
23     def setUp(self):
24         super(TestSession, self).setUp()
25
26         self.vapi.session_enable_disable(is_enable=1)
27         self.create_loopback_interfaces(2)
28
29         table_id = 0
30
31         for i in self.lo_interfaces:
32             i.admin_up()
33
34             if table_id != 0:
35                 tbl = VppIpTable(self, table_id)
36                 tbl.add_vpp_config()
37
38             i.set_table_ip4(table_id)
39             i.config_ip4()
40             table_id += 1
41
42         # Configure namespaces
43         self.vapi.app_namespace_add_del(namespace_id="0",
44                                         sw_if_index=self.loop0.sw_if_index)
45         self.vapi.app_namespace_add_del(namespace_id="1",
46                                         sw_if_index=self.loop1.sw_if_index)
47
48     def tearDown(self):
49         for i in self.lo_interfaces:
50             i.unconfig_ip4()
51             i.set_table_ip4(0)
52             i.admin_down()
53
54         super(TestSession, self).tearDown()
55         self.vapi.session_enable_disable(is_enable=1)
56
57     def test_segment_manager_alloc(self):
58         """ Session Segment Manager Multiple Segment Allocation """
59
60         # Add inter-table routes
61         ip_t01 = VppIpRoute(self, self.loop1.local_ip4, 32,
62                             [VppRoutePath("0.0.0.0",
63                                           0xffffffff,
64                                           nh_table_id=1)])
65         ip_t10 = VppIpRoute(self, self.loop0.local_ip4, 32,
66                             [VppRoutePath("0.0.0.0",
67                                           0xffffffff,
68                                           nh_table_id=0)], table_id=1)
69         ip_t01.add_vpp_config()
70         ip_t10.add_vpp_config()
71
72         # Start builtin server and client with small private segments
73         uri = "tcp://" + self.loop0.local_ip4 + "/1234"
74         error = self.vapi.cli("test echo server appns 0 fifo-size 64 " +
75                               "private-segment-size 1m uri " + uri)
76         if error:
77             self.logger.critical(error)
78             self.assertNotIn("failed", error)
79
80         error = self.vapi.cli("test echo client nclients 100 appns 1 " +
81                               "no-output fifo-size 64 syn-timeout 2 " +
82                               "private-segment-size 1m uri " + uri)
83         if error:
84             self.logger.critical(error)
85             self.assertNotIn("failed", error)
86
87         if self.vpp_dead:
88             self.assert_equal(0)
89
90         # Delete inter-table routes
91         ip_t01.remove_vpp_config()
92         ip_t10.remove_vpp_config()
93
94
95 @tag_fixme_vpp_workers
96 class TestSessionUnitTests(VppTestCase):
97     """ Session Unit Tests Case """
98
99     @classmethod
100     def setUpClass(cls):
101         super(TestSessionUnitTests, cls).setUpClass()
102
103     @classmethod
104     def tearDownClass(cls):
105         super(TestSessionUnitTests, cls).tearDownClass()
106
107     def setUp(self):
108         super(TestSessionUnitTests, self).setUp()
109         self.vapi.session_enable_disable(is_enable=1)
110
111     def test_session(self):
112         """ Session Unit Tests """
113         error = self.vapi.cli("test session all")
114
115         if error:
116             self.logger.critical(error)
117         self.assertNotIn("failed", error)
118
119     def tearDown(self):
120         super(TestSessionUnitTests, self).tearDown()
121         self.vapi.session_enable_disable(is_enable=0)
122
123
124 @tag_run_solo
125 class TestSvmFifoUnitTests(VppTestCase):
126     """ SVM Fifo Unit Tests Case """
127
128     @classmethod
129     def setUpClass(cls):
130         super(TestSvmFifoUnitTests, cls).setUpClass()
131
132     @classmethod
133     def tearDownClass(cls):
134         super(TestSvmFifoUnitTests, cls).tearDownClass()
135
136     def setUp(self):
137         super(TestSvmFifoUnitTests, self).setUp()
138
139     def test_svm_fifo(self):
140         """ SVM Fifo Unit Tests """
141         error = self.vapi.cli("test svm fifo all")
142
143         if error:
144             self.logger.critical(error)
145         self.assertNotIn("failed", error)
146
147     def tearDown(self):
148         super(TestSvmFifoUnitTests, self).tearDown()
149
150 if __name__ == '__main__':
151     unittest.main(testRunner=VppTestRunner)