acl: API cleanup
[vpp.git] / test / test_pipe.py
1 #!/usr/bin/env python3
2 from socket import AF_INET, AF_INET6, inet_pton
3 import unittest
4 from ipaddress import IPv4Network
5
6 from scapy.packet import Raw
7 from scapy.layers.l2 import Ether
8 from scapy.layers.inet import IP, UDP
9
10 from framework import VppTestCase, VppTestRunner
11 from vpp_interface import VppInterface
12 from vpp_ip_route import VppIpTable, VppIpRoute, VppRoutePath
13 from vpp_acl import AclRule, VppAcl, VppAclInterface
14
15 NUM_PKTS = 67
16
17
18 class VppPipe(VppInterface):
19     """
20     VPP Pipe
21     """
22
23     @property
24     def east(self):
25         return self.result.pipe_sw_if_index[1]
26
27     @property
28     def west(self):
29         return self.result.pipe_sw_if_index[0]
30
31     def __init__(self, test, instance=0xffffffff):
32         super(VppPipe, self).__init__(test)
33         self._test = test
34         self.instance = instance
35
36     def add_vpp_config(self):
37         self.result = self._test.vapi.pipe_create(
38             0 if self.instance == 0xffffffff else 1,
39             self.instance)
40         self.set_sw_if_index(self.result.sw_if_index)
41
42     def remove_vpp_config(self):
43         self._test.vapi.pipe_delete(
44             self.result.sw_if_index)
45
46     def object_id(self):
47         return "pipe-%d" % (self._sw_if_index)
48
49     def query_vpp_config(self):
50         pipes = self._test.vapi.pipe_dump()
51         for p in pipes:
52             if p.sw_if_index == self.result.sw_if_index:
53                 return True
54         return False
55
56     def set_unnumbered(self, ip_sw_if_index, is_add=True):
57         res = self._test.vapi.sw_interface_set_unnumbered(ip_sw_if_index,
58                                                           self.east, is_add)
59         res = self._test.vapi.sw_interface_set_unnumbered(ip_sw_if_index,
60                                                           self.west, is_add)
61
62
63 class TestPipe(VppTestCase):
64     """ Pipes """
65
66     @classmethod
67     def setUpClass(cls):
68         super(TestPipe, cls).setUpClass()
69
70     @classmethod
71     def tearDownClass(cls):
72         super(TestPipe, cls).tearDownClass()
73
74     def setUp(self):
75         super(TestPipe, self).setUp()
76
77         self.create_pg_interfaces(range(4))
78
79         for i in self.pg_interfaces:
80             i.admin_up()
81
82     def tearDown(self):
83         for i in self.pg_interfaces:
84             i.admin_down()
85
86         super(TestPipe, self).tearDown()
87
88     def test_pipe(self):
89         """ Pipes """
90
91         pipes = [VppPipe(self), VppPipe(self, 10)]
92
93         for p in pipes:
94             p.add_vpp_config()
95             p.admin_up()
96
97         #
98         # L2 cross-connect pipe0 east with pg0 and west with pg1
99         #
100         self.vapi.sw_interface_set_l2_xconnect(self.pg0.sw_if_index,
101                                                pipes[0].east,
102                                                enable=1)
103         self.vapi.sw_interface_set_l2_xconnect(pipes[0].east,
104                                                self.pg0.sw_if_index,
105                                                enable=1)
106         self.vapi.sw_interface_set_l2_xconnect(self.pg1.sw_if_index,
107                                                pipes[0].west,
108                                                enable=1)
109         self.vapi.sw_interface_set_l2_xconnect(pipes[0].west,
110                                                self.pg1.sw_if_index,
111                                                enable=1)
112
113         # test bi-directional L2 flow pg0<->pg1
114         p = (Ether(src=self.pg0.remote_mac,
115                    dst=self.pg1.remote_mac) /
116              IP(src="1.1.1.1",
117                 dst="1.1.1.2") /
118              UDP(sport=1234, dport=1234) /
119              Raw(b'\xa5' * 100))
120
121         self.send_and_expect(self.pg0, p * NUM_PKTS, self.pg1)
122         self.send_and_expect(self.pg1, p * NUM_PKTS, self.pg0)
123
124         #
125         # Attach ACL to ensure features are run on the pipe
126         #
127         rule_1 = AclRule(is_permit=0, proto=17,
128                          src_prefix=IPv4Network("1.1.1.1/32"),
129                          dst_prefix=IPv4Network("1.1.1.2/32"), ports=1234)
130         acl = VppAcl(self, rules=[rule_1])
131         acl.add_vpp_config()
132
133         # Apply the ACL on the pipe on output
134         acl_if_e = VppAclInterface(self, sw_if_index=pipes[0].east, n_input=0,
135                                    acls=[acl])
136         acl_if_e.add_vpp_config()
137
138         self.send_and_assert_no_replies(self.pg0, p * NUM_PKTS)
139         self.send_and_expect(self.pg1, p * NUM_PKTS, self.pg0)
140
141         # remove from output and apply on input
142         acl_if_e.remove_vpp_config()
143         acl_if_w = VppAclInterface(self, sw_if_index=pipes[0].west, n_input=1,
144                                    acls=[acl])
145         acl_if_w.add_vpp_config()
146
147         self.send_and_assert_no_replies(self.pg0, p * NUM_PKTS)
148         self.send_and_expect(self.pg1, p * NUM_PKTS, self.pg0)
149
150         acl_if_w.remove_vpp_config()
151         self.send_and_expect(self.pg0, p * NUM_PKTS, self.pg1)
152         self.send_and_expect(self.pg1, p * NUM_PKTS, self.pg0)
153
154         #
155         # L3 routes in two separate tables so a pipe can be used to L3
156         # x-connect
157         #
158         tables = []
159         tables.append(VppIpTable(self, 1))
160         tables.append(VppIpTable(self, 2))
161
162         for t in tables:
163             t.add_vpp_config()
164
165         self.pg2.set_table_ip4(1)
166         self.pg2.config_ip4()
167         self.pg2.resolve_arp()
168         self.pg3.set_table_ip4(2)
169         self.pg3.config_ip4()
170         self.pg3.resolve_arp()
171
172         routes = []
173         routes.append(VppIpRoute(self, "1.1.1.1", 32,
174                                  [VppRoutePath(self.pg3.remote_ip4,
175                                                self.pg3.sw_if_index)],
176                                  table_id=2))
177         routes.append(VppIpRoute(self, "1.1.1.1", 32,
178                                  [VppRoutePath("0.0.0.0", pipes[1].east)],
179                                  table_id=1))
180         routes.append(VppIpRoute(self, "1.1.1.2", 32,
181                                  [VppRoutePath("0.0.0.0", pipes[1].west)],
182                                  table_id=2))
183         routes.append(VppIpRoute(self, "1.1.1.2", 32,
184                                  [VppRoutePath(self.pg2.remote_ip4,
185                                                self.pg2.sw_if_index)],
186                                  table_id=1))
187
188         for r in routes:
189             r.add_vpp_config()
190
191         p_east = (Ether(src=self.pg2.remote_mac,
192                         dst=self.pg2.local_mac) /
193                   IP(src="1.1.1.2",
194                      dst="1.1.1.1") /
195                   UDP(sport=1234, dport=1234) /
196                   Raw(b'\xa5' * 100))
197
198         # bind the pipe ends to the correct tables
199         self.vapi.sw_interface_set_table(pipes[1].west, 0, 2)
200         self.vapi.sw_interface_set_table(pipes[1].east, 0, 1)
201
202         # IP is not enabled on the pipes at this point
203         self.send_and_assert_no_replies(self.pg2, p_east * NUM_PKTS)
204
205         # IP enable the Pipes by making them unnumbered
206         pipes[0].set_unnumbered(self.pg2.sw_if_index)
207         pipes[1].set_unnumbered(self.pg3.sw_if_index)
208
209         self.send_and_expect(self.pg2, p_east * NUM_PKTS, self.pg3)
210
211         # and the return path
212         p_west = (Ether(src=self.pg3.remote_mac,
213                         dst=self.pg3.local_mac) /
214                   IP(src="1.1.1.1",
215                      dst="1.1.1.2") /
216                   UDP(sport=1234, dport=1234) /
217                   Raw(b'\xa5' * 100))
218         self.send_and_expect(self.pg3, p_west * NUM_PKTS, self.pg2)
219
220         #
221         # Use ACLs to test features run on the Pipes
222         #
223         acl_if_e1 = VppAclInterface(self, sw_if_index=pipes[1].east, n_input=0,
224                                     acls=[acl])
225         acl_if_e1.add_vpp_config()
226         self.send_and_assert_no_replies(self.pg2, p_east * NUM_PKTS)
227         self.send_and_expect(self.pg3, p_west * NUM_PKTS, self.pg2)
228
229         # remove from output and apply on input
230         acl_if_e1.remove_vpp_config()
231         acl_if_w1 = VppAclInterface(self, sw_if_index=pipes[1].west, n_input=1,
232                                     acls=[acl])
233         acl_if_w1.add_vpp_config()
234         self.send_and_assert_no_replies(self.pg2, p_east * NUM_PKTS)
235         self.send_and_expect(self.pg3, p_west * NUM_PKTS, self.pg2)
236         acl_if_w1.remove_vpp_config()
237
238         self.send_and_expect(self.pg2, p_east * NUM_PKTS, self.pg3)
239         self.send_and_expect(self.pg3, p_west * NUM_PKTS, self.pg2)
240
241         # cleanup (so the tables delete)
242         self.pg2.unconfig_ip4()
243         self.pg2.set_table_ip4(0)
244         self.pg3.unconfig_ip4()
245         self.pg3.set_table_ip4(0)
246         self.vapi.sw_interface_set_table(pipes[1].west, 0, 0)
247         self.vapi.sw_interface_set_table(pipes[1].east, 0, 0)
248
249
250 if __name__ == '__main__':
251     unittest.main(testRunner=VppTestRunner)