tests: replace pycodestyle with black
[vpp.git] / test / test_vtr.py
1 #!/usr/bin/env python3
2
3 import unittest
4 import random
5
6 from scapy.packet import Raw
7 from scapy.layers.l2 import Ether, Dot1Q
8 from scapy.layers.inet import IP, UDP
9
10 from util import Host
11 from framework import VppTestCase, VppTestRunner
12 from vpp_sub_interface import L2_VTR_OP, VppDot1QSubint, VppDot1ADSubint
13 from collections import namedtuple
14
15 Tag = namedtuple("Tag", ["dot1", "vlan"])
16 DOT1AD = 0x88A8
17 DOT1Q = 0x8100
18
19
20 class TestVtr(VppTestCase):
21     """VTR Test Case"""
22
23     @classmethod
24     def setUpClass(cls):
25         super(TestVtr, cls).setUpClass()
26
27         # Test variables
28         cls.bd_id = 1
29         cls.mac_entries_count = 5
30         cls.Atag = 100
31         cls.Btag = 200
32         cls.dot1ad_sub_id = 20
33
34         try:
35             ifs = range(3)
36             cls.create_pg_interfaces(ifs)
37
38             cls.sub_interfaces = [
39                 VppDot1ADSubint(cls, cls.pg1, cls.dot1ad_sub_id, cls.Btag, cls.Atag),
40                 VppDot1QSubint(cls, cls.pg2, cls.Btag),
41             ]
42
43             interfaces = list(cls.pg_interfaces)
44             interfaces.extend(cls.sub_interfaces)
45
46             # Create BD with MAC learning enabled and put interfaces and
47             #  sub-interfaces to this BD
48             for pg_if in cls.pg_interfaces:
49                 sw_if_index = (
50                     pg_if.sub_if.sw_if_index
51                     if hasattr(pg_if, "sub_if")
52                     else pg_if.sw_if_index
53                 )
54                 cls.vapi.sw_interface_set_l2_bridge(
55                     rx_sw_if_index=sw_if_index, bd_id=cls.bd_id
56                 )
57
58             # setup all interfaces
59             for i in interfaces:
60                 i.admin_up()
61
62             # mapping between packet-generator index and lists of test hosts
63             cls.hosts_by_pg_idx = dict()
64
65             # create test host entries and inject packets to learn MAC entries
66             # in the bridge-domain
67             cls.create_hosts_and_learn(cls.mac_entries_count)
68             cls.logger.info(cls.vapi.ppcli("show l2fib"))
69
70         except Exception:
71             super(TestVtr, cls).tearDownClass()
72             raise
73
74     @classmethod
75     def tearDownClass(cls):
76         super(TestVtr, cls).tearDownClass()
77
78     def setUp(self):
79         """
80         Clear trace and packet infos before running each test.
81         """
82         super(TestVtr, self).setUp()
83         self.reset_packet_infos()
84
85     def tearDown(self):
86         """
87         Show various debug prints after each test.
88         """
89         super(TestVtr, self).tearDown()
90
91     def show_commands_at_teardown(self):
92         self.logger.info(self.vapi.ppcli("show l2fib verbose"))
93         self.logger.info(self.vapi.ppcli("show bridge-domain %s detail" % self.bd_id))
94
95     @classmethod
96     def create_hosts_and_learn(cls, count):
97         for pg_if in cls.pg_interfaces:
98             cls.hosts_by_pg_idx[pg_if.sw_if_index] = []
99             hosts = cls.hosts_by_pg_idx[pg_if.sw_if_index]
100             packets = []
101             for j in range(1, count + 1):
102                 host = Host(
103                     "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
104                     "172.17.1%02x.%u" % (pg_if.sw_if_index, j),
105                 )
106                 packet = Ether(dst="ff:ff:ff:ff:ff:ff", src=host.mac)
107                 hosts.append(host)
108                 if hasattr(pg_if, "sub_if"):
109                     packet = pg_if.sub_if.add_dot1_layer(packet)
110                 packets.append(packet)
111             pg_if.add_stream(packets)
112         cls.logger.info("Sending broadcast eth frames for MAC learning")
113         cls.pg_enable_capture(cls.pg_interfaces)
114         cls.pg_start()
115
116     def create_packet(self, src_if, dst_if, do_dot1=True):
117         packet_sizes = [64, 512, 1518, 9018]
118         dst_host = random.choice(self.hosts_by_pg_idx[dst_if.sw_if_index])
119         src_host = random.choice(self.hosts_by_pg_idx[src_if.sw_if_index])
120         pkt_info = self.create_packet_info(src_if, dst_if)
121         payload = self.info_to_payload(pkt_info)
122         p = (
123             Ether(dst=dst_host.mac, src=src_host.mac)
124             / IP(src=src_host.ip4, dst=dst_host.ip4)
125             / UDP(sport=1234, dport=1234)
126             / Raw(payload)
127         )
128         pkt_info.data = p.copy()
129         if do_dot1 and hasattr(src_if, "sub_if"):
130             p = src_if.sub_if.add_dot1_layer(p)
131         size = random.choice(packet_sizes)
132         self.extend_packet(p, size)
133         return p
134
135     def _add_tag(self, packet, vlan, tag_type):
136         payload = packet.payload
137         inner_type = packet.type
138         packet.remove_payload()
139         packet.add_payload(Dot1Q(vlan=vlan) / payload)
140         packet.payload.type = inner_type
141         packet.payload.vlan = vlan
142         packet.type = tag_type
143         return packet
144
145     def _remove_tag(self, packet, vlan=None, tag_type=None):
146         if tag_type:
147             self.assertEqual(packet.type, tag_type)
148
149         payload = packet.payload
150         if vlan:
151             self.assertEqual(payload.vlan, vlan)
152         inner_type = payload.type
153         payload = payload.payload
154         packet.remove_payload()
155         packet.add_payload(payload)
156         packet.type = inner_type
157
158     def add_tags(self, packet, tags):
159         for t in reversed(tags):
160             self._add_tag(packet, t.vlan, t.dot1)
161
162     def remove_tags(self, packet, tags):
163         for t in tags:
164             self._remove_tag(packet, t.vlan, t.dot1)
165
166     def vtr_test(self, swif, tags):
167         p = self.create_packet(swif, self.pg0)
168         swif.add_stream(p)
169         self.pg_enable_capture(self.pg_interfaces)
170         self.pg_start()
171         rx = self.pg0.get_capture(1)
172
173         if tags:
174             self.remove_tags(rx[0], tags)
175         self.assertTrue(Dot1Q not in rx[0])
176
177         if not tags:
178             return
179
180         i = VppDot1QSubint(self, self.pg0, tags[0].vlan)
181         self.vapi.sw_interface_set_l2_bridge(
182             rx_sw_if_index=i.sw_if_index, bd_id=self.bd_id, enable=1
183         )
184         i.admin_up()
185
186         p = self.create_packet(self.pg0, swif, do_dot1=False)
187         self.add_tags(p, tags)
188         self.pg0.add_stream(p)
189         self.pg_enable_capture(self.pg_interfaces)
190         self.pg_start()
191         rx = swif.get_capture(1)
192         swif.sub_if.remove_dot1_layer(rx[0])
193         self.assertTrue(Dot1Q not in rx[0])
194
195         self.vapi.sw_interface_set_l2_bridge(
196             rx_sw_if_index=i.sw_if_index, bd_id=self.bd_id, enable=0
197         )
198         i.remove_vpp_config()
199
200     def test_1ad_vtr_pop_1(self):
201         """1AD VTR pop 1 test"""
202         self.pg1.sub_if.set_vtr(L2_VTR_OP.L2_POP_1)
203         self.vtr_test(self.pg1, [Tag(dot1=DOT1Q, vlan=100)])
204
205     def test_1ad_vtr_pop_2(self):
206         """1AD VTR pop 2 test"""
207         self.pg1.sub_if.set_vtr(L2_VTR_OP.L2_POP_2)
208         self.vtr_test(self.pg1, [])
209
210     def test_1ad_vtr_push_1ad(self):
211         """1AD VTR push 1 1AD test"""
212         self.pg1.sub_if.set_vtr(L2_VTR_OP.L2_PUSH_1, tag=300)
213         self.vtr_test(
214             self.pg1,
215             [
216                 Tag(dot1=DOT1AD, vlan=300),
217                 Tag(dot1=DOT1AD, vlan=200),
218                 Tag(dot1=DOT1Q, vlan=100),
219             ],
220         )
221
222     def test_1ad_vtr_push_2ad(self):
223         """1AD VTR push 2 1AD test"""
224         self.pg1.sub_if.set_vtr(L2_VTR_OP.L2_PUSH_2, outer=400, inner=300)
225         self.vtr_test(
226             self.pg1,
227             [
228                 Tag(dot1=DOT1AD, vlan=400),
229                 Tag(dot1=DOT1Q, vlan=300),
230                 Tag(dot1=DOT1AD, vlan=200),
231                 Tag(dot1=DOT1Q, vlan=100),
232             ],
233         )
234
235     def test_1ad_vtr_push_1q(self):
236         """1AD VTR push 1 1Q test"""
237         self.pg1.sub_if.set_vtr(L2_VTR_OP.L2_PUSH_1, tag=300, push1q=1)
238         self.vtr_test(
239             self.pg1,
240             [
241                 Tag(dot1=DOT1Q, vlan=300),
242                 Tag(dot1=DOT1AD, vlan=200),
243                 Tag(dot1=DOT1Q, vlan=100),
244             ],
245         )
246
247     def test_1ad_vtr_push_2q(self):
248         """1AD VTR push 2 1Q test"""
249         self.pg1.sub_if.set_vtr(L2_VTR_OP.L2_PUSH_2, outer=400, inner=300, push1q=1)
250         self.vtr_test(
251             self.pg1,
252             [
253                 Tag(dot1=DOT1Q, vlan=400),
254                 Tag(dot1=DOT1Q, vlan=300),
255                 Tag(dot1=DOT1AD, vlan=200),
256                 Tag(dot1=DOT1Q, vlan=100),
257             ],
258         )
259
260     def test_1ad_vtr_translate_1_1ad(self):
261         """1AD VTR translate 1 -> 1 1AD test"""
262         self.pg1.sub_if.set_vtr(L2_VTR_OP.L2_TRANSLATE_1_1, tag=300)
263         self.vtr_test(self.pg1, [Tag(dot1=DOT1AD, vlan=300), Tag(dot1=DOT1Q, vlan=100)])
264
265     def test_1ad_vtr_translate_1_2ad(self):
266         """1AD VTR translate 1 -> 2 1AD test"""
267         self.pg1.sub_if.set_vtr(L2_VTR_OP.L2_TRANSLATE_1_2, inner=300, outer=400)
268         self.vtr_test(
269             self.pg1,
270             [
271                 Tag(dot1=DOT1AD, vlan=400),
272                 Tag(dot1=DOT1Q, vlan=300),
273                 Tag(dot1=DOT1Q, vlan=100),
274             ],
275         )
276
277     def test_1ad_vtr_translate_2_1ad(self):
278         """1AD VTR translate 2 -> 1 1AD test"""
279         self.pg1.sub_if.set_vtr(L2_VTR_OP.L2_TRANSLATE_2_1, tag=300)
280         self.vtr_test(self.pg1, [Tag(dot1=DOT1AD, vlan=300)])
281
282     def test_1ad_vtr_translate_2_2ad(self):
283         """1AD VTR translate 2 -> 2 1AD test"""
284         self.pg1.sub_if.set_vtr(L2_VTR_OP.L2_TRANSLATE_2_2, inner=300, outer=400)
285         self.vtr_test(self.pg1, [Tag(dot1=DOT1AD, vlan=400), Tag(dot1=DOT1Q, vlan=300)])
286
287     def test_1ad_vtr_translate_1_1q(self):
288         """1AD VTR translate 1 -> 1 1Q test"""
289         self.pg1.sub_if.set_vtr(L2_VTR_OP.L2_TRANSLATE_1_1, tag=300, push1q=1)
290         self.vtr_test(self.pg1, [Tag(dot1=DOT1Q, vlan=300), Tag(dot1=DOT1Q, vlan=100)])
291
292     def test_1ad_vtr_translate_1_2q(self):
293         """1AD VTR translate 1 -> 2 1Q test"""
294         self.pg1.sub_if.set_vtr(
295             L2_VTR_OP.L2_TRANSLATE_1_2, inner=300, outer=400, push1q=1
296         )
297         self.vtr_test(
298             self.pg1,
299             [
300                 Tag(dot1=DOT1Q, vlan=400),
301                 Tag(dot1=DOT1Q, vlan=300),
302                 Tag(dot1=DOT1Q, vlan=100),
303             ],
304         )
305
306     def test_1ad_vtr_translate_2_1q(self):
307         """1AD VTR translate 2 -> 1 1Q test"""
308         self.pg1.sub_if.set_vtr(L2_VTR_OP.L2_TRANSLATE_2_1, tag=300, push1q=1)
309         self.vtr_test(self.pg1, [Tag(dot1=DOT1Q, vlan=300)])
310
311     def test_1ad_vtr_translate_2_2q(self):
312         """1AD VTR translate 2 -> 2 1Q test"""
313         self.pg1.sub_if.set_vtr(
314             L2_VTR_OP.L2_TRANSLATE_2_2, inner=300, outer=400, push1q=1
315         )
316         self.vtr_test(self.pg1, [Tag(dot1=DOT1Q, vlan=400), Tag(dot1=DOT1Q, vlan=300)])
317
318     def test_1q_vtr_pop_1(self):
319         """1Q VTR pop 1 test"""
320         self.pg2.sub_if.set_vtr(L2_VTR_OP.L2_POP_1)
321         self.vtr_test(self.pg2, [])
322
323     def test_1q_vtr_push_1(self):
324         """1Q VTR push 1 test"""
325         self.pg2.sub_if.set_vtr(L2_VTR_OP.L2_PUSH_1, tag=300)
326         self.vtr_test(self.pg2, [Tag(dot1=DOT1AD, vlan=300), Tag(dot1=DOT1Q, vlan=200)])
327
328     def test_1q_vtr_push_2(self):
329         """1Q VTR push 2 test"""
330         self.pg2.sub_if.set_vtr(L2_VTR_OP.L2_PUSH_2, outer=400, inner=300)
331         self.vtr_test(
332             self.pg2,
333             [
334                 Tag(dot1=DOT1AD, vlan=400),
335                 Tag(dot1=DOT1Q, vlan=300),
336                 Tag(dot1=DOT1Q, vlan=200),
337             ],
338         )
339
340     def test_1q_vtr_translate_1_1(self):
341         """1Q VTR translate 1 -> 1 test"""
342         self.pg2.sub_if.set_vtr(L2_VTR_OP.L2_TRANSLATE_1_1, tag=300)
343         self.vtr_test(self.pg2, [Tag(dot1=DOT1AD, vlan=300)])
344
345     def test_1q_vtr_translate_1_2(self):
346         """1Q VTR translate 1 -> 2 test"""
347         self.pg2.sub_if.set_vtr(L2_VTR_OP.L2_TRANSLATE_1_2, inner=300, outer=400)
348         self.vtr_test(self.pg2, [Tag(dot1=DOT1AD, vlan=400), Tag(dot1=DOT1Q, vlan=300)])
349
350     def test_if_vtr_disable(self):
351         """Disable VTR on non-sub-interfaces"""
352         # First set the VTR fields to junk
353         self.vapi.l2_interface_vlan_tag_rewrite(
354             sw_if_index=self.pg0.sw_if_index,
355             vtr_op=L2_VTR_OP.L2_PUSH_2,
356             push_dot1q=1,
357             tag1=19,
358             tag2=630,
359         )
360
361         if_state = self.vapi.sw_interface_dump(sw_if_index=self.pg0.sw_if_index)
362         self.assertEqual(if_state[0].sw_if_index, self.pg0.sw_if_index)
363         self.assertNotEqual(if_state[0].vtr_op, L2_VTR_OP.L2_DISABLED)
364
365         # Then ensure that a request to disable VTR is honored.
366         self.vapi.l2_interface_vlan_tag_rewrite(
367             sw_if_index=self.pg0.sw_if_index, vtr_op=L2_VTR_OP.L2_DISABLED
368         )
369
370         if_state = self.vapi.sw_interface_dump(sw_if_index=self.pg0.sw_if_index)
371         self.assertEqual(if_state[0].sw_if_index, self.pg0.sw_if_index)
372         self.assertEqual(if_state[0].vtr_op, L2_VTR_OP.L2_DISABLED)
373
374     def test_if_vtr_push_1q(self):
375         """1Q VTR push 1 on non-sub-interfaces"""
376         self.vapi.l2_interface_vlan_tag_rewrite(
377             sw_if_index=self.pg0.sw_if_index,
378             vtr_op=L2_VTR_OP.L2_PUSH_1,
379             push_dot1q=1,
380             tag1=150,
381         )
382
383         if_state = self.vapi.sw_interface_dump(sw_if_index=self.pg0.sw_if_index)
384         self.assertEqual(if_state[0].sw_if_index, self.pg0.sw_if_index)
385         self.assertEqual(if_state[0].vtr_op, L2_VTR_OP.L2_PUSH_1)
386         self.assertEqual(if_state[0].vtr_tag1, 150)
387         self.assertNotEqual(if_state[0].vtr_push_dot1q, 0)
388
389     def test_if_vtr_push_2ad(self):
390         """1AD VTR push 2 on non-sub-interfaces"""
391         self.vapi.l2_interface_vlan_tag_rewrite(
392             sw_if_index=self.pg0.sw_if_index,
393             vtr_op=L2_VTR_OP.L2_PUSH_2,
394             push_dot1q=0,
395             tag1=450,
396             tag2=350,
397         )
398
399         if_state = self.vapi.sw_interface_dump(sw_if_index=self.pg0.sw_if_index)
400         self.assertEqual(if_state[0].sw_if_index, self.pg0.sw_if_index)
401         self.assertEqual(if_state[0].vtr_op, L2_VTR_OP.L2_PUSH_2)
402         self.assertEqual(if_state[0].vtr_tag1, 450)  # outer
403         self.assertEqual(if_state[0].vtr_tag2, 350)  # inner
404         self.assertEqual(if_state[0].vtr_push_dot1q, 0)
405
406
407 if __name__ == "__main__":
408     unittest.main(testRunner=VppTestRunner)