6 from scapy.packet import Raw
7 from scapy.layers.l2 import Ether, Dot1Q
8 from scapy.layers.inet import IP, UDP
11 from framework import VppTestCase
12 from asfframework import VppTestRunner
13 from vpp_sub_interface import L2_VTR_OP, VppDot1QSubint, VppDot1ADSubint
14 from collections import namedtuple
16 Tag = namedtuple("Tag", ["dot1", "vlan"])
21 class TestVtr(VppTestCase):
26 super(TestVtr, cls).setUpClass()
30 cls.mac_entries_count = 5
33 cls.dot1ad_sub_id = 20
37 cls.create_pg_interfaces(ifs)
39 cls.sub_interfaces = [
40 VppDot1ADSubint(cls, cls.pg1, cls.dot1ad_sub_id, cls.Btag, cls.Atag),
41 VppDot1QSubint(cls, cls.pg2, cls.Btag),
44 interfaces = list(cls.pg_interfaces)
45 interfaces.extend(cls.sub_interfaces)
47 # Create BD with MAC learning enabled and put interfaces and
48 # sub-interfaces to this BD
49 for pg_if in cls.pg_interfaces:
51 pg_if.sub_if.sw_if_index
52 if hasattr(pg_if, "sub_if")
53 else pg_if.sw_if_index
55 cls.vapi.sw_interface_set_l2_bridge(
56 rx_sw_if_index=sw_if_index, bd_id=cls.bd_id
59 # setup all interfaces
63 # mapping between packet-generator index and lists of test hosts
64 cls.hosts_by_pg_idx = dict()
66 # create test host entries and inject packets to learn MAC entries
67 # in the bridge-domain
68 cls.create_hosts_and_learn(cls.mac_entries_count)
69 cls.logger.info(cls.vapi.ppcli("show l2fib"))
72 super(TestVtr, cls).tearDownClass()
76 def tearDownClass(cls):
77 super(TestVtr, cls).tearDownClass()
81 Clear trace and packet infos before running each test.
83 super(TestVtr, self).setUp()
84 self.reset_packet_infos()
88 Show various debug prints after each test.
90 super(TestVtr, self).tearDown()
92 def show_commands_at_teardown(self):
93 self.logger.info(self.vapi.ppcli("show l2fib verbose"))
94 self.logger.info(self.vapi.ppcli("show bridge-domain %s detail" % self.bd_id))
97 def create_hosts_and_learn(cls, count):
98 for pg_if in cls.pg_interfaces:
99 cls.hosts_by_pg_idx[pg_if.sw_if_index] = []
100 hosts = cls.hosts_by_pg_idx[pg_if.sw_if_index]
102 for j in range(1, count + 1):
104 "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
105 "172.17.1%02x.%u" % (pg_if.sw_if_index, j),
107 packet = Ether(dst="ff:ff:ff:ff:ff:ff", src=host.mac)
109 if hasattr(pg_if, "sub_if"):
110 packet = pg_if.sub_if.add_dot1_layer(packet)
111 packets.append(packet)
112 pg_if.add_stream(packets)
113 cls.logger.info("Sending broadcast eth frames for MAC learning")
114 cls.pg_enable_capture(cls.pg_interfaces)
117 def create_packet(self, src_if, dst_if, do_dot1=True):
118 packet_sizes = [64, 512, 1518, 9018]
119 dst_host = random.choice(self.hosts_by_pg_idx[dst_if.sw_if_index])
120 src_host = random.choice(self.hosts_by_pg_idx[src_if.sw_if_index])
121 pkt_info = self.create_packet_info(src_if, dst_if)
122 payload = self.info_to_payload(pkt_info)
124 Ether(dst=dst_host.mac, src=src_host.mac)
125 / IP(src=src_host.ip4, dst=dst_host.ip4)
126 / UDP(sport=1234, dport=1234)
129 pkt_info.data = p.copy()
130 if do_dot1 and hasattr(src_if, "sub_if"):
131 p = src_if.sub_if.add_dot1_layer(p)
132 size = random.choice(packet_sizes)
133 self.extend_packet(p, size)
136 def _add_tag(self, packet, vlan, tag_type):
137 payload = packet.payload
138 inner_type = packet.type
139 packet.remove_payload()
140 packet.add_payload(Dot1Q(vlan=vlan) / payload)
141 packet.payload.type = inner_type
142 packet.payload.vlan = vlan
143 packet.type = tag_type
146 def _remove_tag(self, packet, vlan=None, tag_type=None):
148 self.assertEqual(packet.type, tag_type)
150 payload = packet.payload
152 self.assertEqual(payload.vlan, vlan)
153 inner_type = payload.type
154 payload = payload.payload
155 packet.remove_payload()
156 packet.add_payload(payload)
157 packet.type = inner_type
159 def add_tags(self, packet, tags):
160 for t in reversed(tags):
161 self._add_tag(packet, t.vlan, t.dot1)
163 def remove_tags(self, packet, tags):
165 self._remove_tag(packet, t.vlan, t.dot1)
167 def vtr_test(self, swif, tags):
168 p = self.create_packet(swif, self.pg0)
170 self.pg_enable_capture(self.pg_interfaces)
172 rx = self.pg0.get_capture(1)
175 self.remove_tags(rx[0], tags)
176 self.assertTrue(Dot1Q not in rx[0])
181 i = VppDot1QSubint(self, self.pg0, tags[0].vlan)
182 self.vapi.sw_interface_set_l2_bridge(
183 rx_sw_if_index=i.sw_if_index, bd_id=self.bd_id, enable=1
187 p = self.create_packet(self.pg0, swif, do_dot1=False)
188 self.add_tags(p, tags)
189 self.pg0.add_stream(p)
190 self.pg_enable_capture(self.pg_interfaces)
192 rx = swif.get_capture(1)
193 swif.sub_if.remove_dot1_layer(rx[0])
194 self.assertTrue(Dot1Q not in rx[0])
196 self.vapi.sw_interface_set_l2_bridge(
197 rx_sw_if_index=i.sw_if_index, bd_id=self.bd_id, enable=0
199 i.remove_vpp_config()
201 def test_1ad_vtr_pop_1(self):
202 """1AD VTR pop 1 test"""
203 self.pg1.sub_if.set_vtr(L2_VTR_OP.L2_POP_1)
204 self.vtr_test(self.pg1, [Tag(dot1=DOT1Q, vlan=100)])
206 def test_1ad_vtr_pop_2(self):
207 """1AD VTR pop 2 test"""
208 self.pg1.sub_if.set_vtr(L2_VTR_OP.L2_POP_2)
209 self.vtr_test(self.pg1, [])
211 def test_1ad_vtr_push_1ad(self):
212 """1AD VTR push 1 1AD test"""
213 self.pg1.sub_if.set_vtr(L2_VTR_OP.L2_PUSH_1, tag=300)
217 Tag(dot1=DOT1AD, vlan=300),
218 Tag(dot1=DOT1AD, vlan=200),
219 Tag(dot1=DOT1Q, vlan=100),
223 def test_1ad_vtr_push_2ad(self):
224 """1AD VTR push 2 1AD test"""
225 self.pg1.sub_if.set_vtr(L2_VTR_OP.L2_PUSH_2, outer=400, inner=300)
229 Tag(dot1=DOT1AD, vlan=400),
230 Tag(dot1=DOT1Q, vlan=300),
231 Tag(dot1=DOT1AD, vlan=200),
232 Tag(dot1=DOT1Q, vlan=100),
236 def test_1ad_vtr_push_1q(self):
237 """1AD VTR push 1 1Q test"""
238 self.pg1.sub_if.set_vtr(L2_VTR_OP.L2_PUSH_1, tag=300, push1q=1)
242 Tag(dot1=DOT1Q, vlan=300),
243 Tag(dot1=DOT1AD, vlan=200),
244 Tag(dot1=DOT1Q, vlan=100),
248 def test_1ad_vtr_push_2q(self):
249 """1AD VTR push 2 1Q test"""
250 self.pg1.sub_if.set_vtr(L2_VTR_OP.L2_PUSH_2, outer=400, inner=300, push1q=1)
254 Tag(dot1=DOT1Q, vlan=400),
255 Tag(dot1=DOT1Q, vlan=300),
256 Tag(dot1=DOT1AD, vlan=200),
257 Tag(dot1=DOT1Q, vlan=100),
261 def test_1ad_vtr_translate_1_1ad(self):
262 """1AD VTR translate 1 -> 1 1AD test"""
263 self.pg1.sub_if.set_vtr(L2_VTR_OP.L2_TRANSLATE_1_1, tag=300)
264 self.vtr_test(self.pg1, [Tag(dot1=DOT1AD, vlan=300), Tag(dot1=DOT1Q, vlan=100)])
266 def test_1ad_vtr_translate_1_2ad(self):
267 """1AD VTR translate 1 -> 2 1AD test"""
268 self.pg1.sub_if.set_vtr(L2_VTR_OP.L2_TRANSLATE_1_2, inner=300, outer=400)
272 Tag(dot1=DOT1AD, vlan=400),
273 Tag(dot1=DOT1Q, vlan=300),
274 Tag(dot1=DOT1Q, vlan=100),
278 def test_1ad_vtr_translate_2_1ad(self):
279 """1AD VTR translate 2 -> 1 1AD test"""
280 self.pg1.sub_if.set_vtr(L2_VTR_OP.L2_TRANSLATE_2_1, tag=300)
281 self.vtr_test(self.pg1, [Tag(dot1=DOT1AD, vlan=300)])
283 def test_1ad_vtr_translate_2_2ad(self):
284 """1AD VTR translate 2 -> 2 1AD test"""
285 self.pg1.sub_if.set_vtr(L2_VTR_OP.L2_TRANSLATE_2_2, inner=300, outer=400)
286 self.vtr_test(self.pg1, [Tag(dot1=DOT1AD, vlan=400), Tag(dot1=DOT1Q, vlan=300)])
288 def test_1ad_vtr_translate_1_1q(self):
289 """1AD VTR translate 1 -> 1 1Q test"""
290 self.pg1.sub_if.set_vtr(L2_VTR_OP.L2_TRANSLATE_1_1, tag=300, push1q=1)
291 self.vtr_test(self.pg1, [Tag(dot1=DOT1Q, vlan=300), Tag(dot1=DOT1Q, vlan=100)])
293 def test_1ad_vtr_translate_1_2q(self):
294 """1AD VTR translate 1 -> 2 1Q test"""
295 self.pg1.sub_if.set_vtr(
296 L2_VTR_OP.L2_TRANSLATE_1_2, inner=300, outer=400, push1q=1
301 Tag(dot1=DOT1Q, vlan=400),
302 Tag(dot1=DOT1Q, vlan=300),
303 Tag(dot1=DOT1Q, vlan=100),
307 def test_1ad_vtr_translate_2_1q(self):
308 """1AD VTR translate 2 -> 1 1Q test"""
309 self.pg1.sub_if.set_vtr(L2_VTR_OP.L2_TRANSLATE_2_1, tag=300, push1q=1)
310 self.vtr_test(self.pg1, [Tag(dot1=DOT1Q, vlan=300)])
312 def test_1ad_vtr_translate_2_2q(self):
313 """1AD VTR translate 2 -> 2 1Q test"""
314 self.pg1.sub_if.set_vtr(
315 L2_VTR_OP.L2_TRANSLATE_2_2, inner=300, outer=400, push1q=1
317 self.vtr_test(self.pg1, [Tag(dot1=DOT1Q, vlan=400), Tag(dot1=DOT1Q, vlan=300)])
319 def test_1q_vtr_pop_1(self):
320 """1Q VTR pop 1 test"""
321 self.pg2.sub_if.set_vtr(L2_VTR_OP.L2_POP_1)
322 self.vtr_test(self.pg2, [])
324 def test_1q_vtr_push_1(self):
325 """1Q VTR push 1 test"""
326 self.pg2.sub_if.set_vtr(L2_VTR_OP.L2_PUSH_1, tag=300)
327 self.vtr_test(self.pg2, [Tag(dot1=DOT1AD, vlan=300), Tag(dot1=DOT1Q, vlan=200)])
329 def test_1q_vtr_push_2(self):
330 """1Q VTR push 2 test"""
331 self.pg2.sub_if.set_vtr(L2_VTR_OP.L2_PUSH_2, outer=400, inner=300)
335 Tag(dot1=DOT1AD, vlan=400),
336 Tag(dot1=DOT1Q, vlan=300),
337 Tag(dot1=DOT1Q, vlan=200),
341 def test_1q_vtr_translate_1_1(self):
342 """1Q VTR translate 1 -> 1 test"""
343 self.pg2.sub_if.set_vtr(L2_VTR_OP.L2_TRANSLATE_1_1, tag=300)
344 self.vtr_test(self.pg2, [Tag(dot1=DOT1AD, vlan=300)])
346 def test_1q_vtr_translate_1_2(self):
347 """1Q VTR translate 1 -> 2 test"""
348 self.pg2.sub_if.set_vtr(L2_VTR_OP.L2_TRANSLATE_1_2, inner=300, outer=400)
349 self.vtr_test(self.pg2, [Tag(dot1=DOT1AD, vlan=400), Tag(dot1=DOT1Q, vlan=300)])
351 def test_if_vtr_disable(self):
352 """Disable VTR on non-sub-interfaces"""
353 # First set the VTR fields to junk
354 self.vapi.l2_interface_vlan_tag_rewrite(
355 sw_if_index=self.pg0.sw_if_index,
356 vtr_op=L2_VTR_OP.L2_PUSH_2,
362 if_state = self.vapi.sw_interface_dump(sw_if_index=self.pg0.sw_if_index)
363 self.assertEqual(if_state[0].sw_if_index, self.pg0.sw_if_index)
364 self.assertNotEqual(if_state[0].vtr_op, L2_VTR_OP.L2_DISABLED)
366 # Then ensure that a request to disable VTR is honored.
367 self.vapi.l2_interface_vlan_tag_rewrite(
368 sw_if_index=self.pg0.sw_if_index, vtr_op=L2_VTR_OP.L2_DISABLED
371 if_state = self.vapi.sw_interface_dump(sw_if_index=self.pg0.sw_if_index)
372 self.assertEqual(if_state[0].sw_if_index, self.pg0.sw_if_index)
373 self.assertEqual(if_state[0].vtr_op, L2_VTR_OP.L2_DISABLED)
375 def test_if_vtr_push_1q(self):
376 """1Q VTR push 1 on non-sub-interfaces"""
377 self.vapi.l2_interface_vlan_tag_rewrite(
378 sw_if_index=self.pg0.sw_if_index,
379 vtr_op=L2_VTR_OP.L2_PUSH_1,
384 if_state = self.vapi.sw_interface_dump(sw_if_index=self.pg0.sw_if_index)
385 self.assertEqual(if_state[0].sw_if_index, self.pg0.sw_if_index)
386 self.assertEqual(if_state[0].vtr_op, L2_VTR_OP.L2_PUSH_1)
387 self.assertEqual(if_state[0].vtr_tag1, 150)
388 self.assertNotEqual(if_state[0].vtr_push_dot1q, 0)
390 def test_if_vtr_push_2ad(self):
391 """1AD VTR push 2 on non-sub-interfaces"""
392 self.vapi.l2_interface_vlan_tag_rewrite(
393 sw_if_index=self.pg0.sw_if_index,
394 vtr_op=L2_VTR_OP.L2_PUSH_2,
400 if_state = self.vapi.sw_interface_dump(sw_if_index=self.pg0.sw_if_index)
401 self.assertEqual(if_state[0].sw_if_index, self.pg0.sw_if_index)
402 self.assertEqual(if_state[0].vtr_op, L2_VTR_OP.L2_PUSH_2)
403 self.assertEqual(if_state[0].vtr_tag1, 450) # outer
404 self.assertEqual(if_state[0].vtr_tag2, 350) # inner
405 self.assertEqual(if_state[0].vtr_push_dot1q, 0)
408 if __name__ == "__main__":
409 unittest.main(testRunner=VppTestRunner)