5 from framework import VppTestCase, VppTestRunner
6 from vpp_papi_provider import QOS_SOURCE
7 from vpp_sub_interface import VppDot1QSubint
8 from vpp_ip import DpoProto
9 from vpp_ip_route import VppIpRoute, VppRoutePath, VppMplsRoute, \
10 VppMplsLabel, VppMplsTable
12 from scapy.packet import Raw
13 from scapy.layers.l2 import Ether, Dot1Q
14 from scapy.layers.inet import IP, UDP
15 from scapy.layers.inet6 import IPv6
16 from scapy.contrib.mpls import MPLS
19 class TestQOS(VppTestCase):
23 super(TestQOS, self).setUp()
25 self.create_pg_interfaces(range(5))
27 tbl = VppMplsTable(self, 0)
30 for i in self.pg_interfaces:
39 for i in self.pg_interfaces:
44 super(TestQOS, self).tearDown()
46 def test_qos_ip(self):
47 """ QoS Mark/Record IP """
50 # for table 1 map the n=0xff possible values of input QoS mark,
53 output = [chr(0)] * 256
54 for i in range(0, 255):
55 output[i] = chr(255 - i)
57 rows = [{'outputs': os},
62 self.vapi.qos_egress_map_update(1, rows)
65 # For table 2 (and up) use the value n for everything
67 output = [chr(2)] * 256
69 rows = [{'outputs': os},
74 self.vapi.qos_egress_map_update(2, rows)
76 output = [chr(3)] * 256
78 rows = [{'outputs': os},
83 self.vapi.qos_egress_map_update(3, rows)
85 output = [chr(4)] * 256
87 rows = [{'outputs': os},
91 self.vapi.qos_egress_map_update(4, rows)
92 self.vapi.qos_egress_map_update(5, rows)
93 self.vapi.qos_egress_map_update(6, rows)
94 self.vapi.qos_egress_map_update(7, rows)
96 self.logger.info(self.vapi.cli("sh qos eg map"))
99 # Bind interface pgN to table n
101 self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
105 self.vapi.qos_mark_enable_disable(self.pg2.sw_if_index,
109 self.vapi.qos_mark_enable_disable(self.pg3.sw_if_index,
113 self.vapi.qos_mark_enable_disable(self.pg4.sw_if_index,
119 # packets ingress on Pg0
121 p_v4 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
122 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, tos=1) /
123 UDP(sport=1234, dport=1234) /
125 p_v6 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
126 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6,
128 UDP(sport=1234, dport=1234) /
132 # Since we have not yet enabled the recording of the input QoS
133 # from the input iP header, the egress packet's ToS will be unchanged
135 rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
137 self.assertEqual(p[IP].tos, 1)
138 rx = self.send_and_expect(self.pg0, p_v6 * 65, self.pg1)
140 self.assertEqual(p[IPv6].tc, 1)
143 # Enable QoS recrding on IP input for pg0
145 self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
150 # send the same packets, this time expect the input TOS of 1
151 # to be mapped to pg1's egress value of 254
153 rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
155 self.assertEqual(p[IP].tos, 254)
156 rx = self.send_and_expect(self.pg0, p_v6 * 65, self.pg1)
158 self.assertEqual(p[IPv6].tc, 254)
161 # different input ToS to test the mapping
164 rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
166 self.assertEqual(p[IP].tos, 128)
168 rx = self.send_and_expect(self.pg0, p_v6 * 65, self.pg1)
170 self.assertEqual(p[IPv6].tc, 128)
173 rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
175 self.assertEqual(p[IP].tos, 1)
177 rx = self.send_and_expect(self.pg0, p_v6 * 65, self.pg1)
179 self.assertEqual(p[IPv6].tc, 1)
182 # send packets out the other interfaces to test the maps are
185 p_v4[IP].dst = self.pg2.remote_ip4
186 rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg2)
188 self.assertEqual(p[IP].tos, 2)
190 p_v4[IP].dst = self.pg3.remote_ip4
191 rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg3)
193 self.assertEqual(p[IP].tos, 3)
195 p_v6[IPv6].dst = self.pg3.remote_ip6
196 rx = self.send_and_expect(self.pg0, p_v6 * 65, self.pg3)
198 self.assertEqual(p[IPv6].tc, 3)
201 # remove the map on pg2 and pg3, now expect an unchanged IP tos
203 self.vapi.qos_mark_enable_disable(self.pg2.sw_if_index,
207 self.vapi.qos_mark_enable_disable(self.pg3.sw_if_index,
211 self.logger.info(self.vapi.cli("sh int feat pg2"))
213 p_v4[IP].dst = self.pg2.remote_ip4
214 rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg2)
216 self.assertEqual(p[IP].tos, 254)
218 p_v4[IP].dst = self.pg3.remote_ip4
219 rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg3)
221 self.assertEqual(p[IP].tos, 254)
224 # still mapping out of pg1
226 p_v4[IP].dst = self.pg1.remote_ip4
227 rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
229 self.assertEqual(p[IP].tos, 1)
232 # disable the input recording on pg0
234 self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
239 # back to an unchanged TOS value
241 rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
243 self.assertEqual(p[IP].tos, 254)
246 # disable the egress map on pg1 and pg4
248 self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
252 self.vapi.qos_mark_enable_disable(self.pg4.sw_if_index,
258 # unchanged Tos on pg1
260 rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
262 self.assertEqual(p[IP].tos, 254)
267 self.vapi.qos_egress_map_delete(1)
268 self.vapi.qos_egress_map_delete(4)
269 self.vapi.qos_egress_map_delete(2)
270 self.vapi.qos_egress_map_delete(3)
271 self.vapi.qos_egress_map_delete(5)
272 self.vapi.qos_egress_map_delete(6)
273 self.vapi.qos_egress_map_delete(7)
275 def test_qos_mpls(self):
276 """ QoS Mark/Record MPLS """
279 # 255 QoS for all input values
281 output = [chr(255)] * 256
283 rows = [{'outputs': os},
288 self.vapi.qos_egress_map_update(1, rows)
291 # a route with 1 MPLS label
293 route_10_0_0_1 = VppIpRoute(self, "10.0.0.1", 32,
294 [VppRoutePath(self.pg1.remote_ip4,
295 self.pg1.sw_if_index,
297 route_10_0_0_1.add_vpp_config()
300 # a route with 3 MPLS labels
302 route_10_0_0_3 = VppIpRoute(self, "10.0.0.3", 32,
303 [VppRoutePath(self.pg1.remote_ip4,
304 self.pg1.sw_if_index,
305 labels=[63, 33, 34])])
306 route_10_0_0_3.add_vpp_config()
309 # enable IP QoS recording on the input Pg0 and MPLS egress marking
312 self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
315 self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
321 # packet that will get one label added and 3 labels added resp.
323 p_1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
324 IP(src=self.pg0.remote_ip4, dst="10.0.0.1", tos=1) /
325 UDP(sport=1234, dport=1234) /
327 p_3 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
328 IP(src=self.pg0.remote_ip4, dst="10.0.0.3", tos=1) /
329 UDP(sport=1234, dport=1234) /
332 rx = self.send_and_expect(self.pg0, p_1 * 65, self.pg1)
335 # only 3 bits of ToS value in MPLS make sure tos is correct
336 # and the label and EOS bit have not been corrupted
339 self.assertEqual(p[MPLS].cos, 7)
340 self.assertEqual(p[MPLS].label, 32)
341 self.assertEqual(p[MPLS].s, 1)
342 rx = self.send_and_expect(self.pg0, p_3 * 65, self.pg1)
344 self.assertEqual(p[MPLS].cos, 7)
345 self.assertEqual(p[MPLS].label, 63)
346 self.assertEqual(p[MPLS].s, 0)
348 self.assertEqual(h[MPLS].cos, 7)
349 self.assertEqual(h[MPLS].label, 33)
350 self.assertEqual(h[MPLS].s, 0)
352 self.assertEqual(h[MPLS].cos, 7)
353 self.assertEqual(h[MPLS].label, 34)
354 self.assertEqual(h[MPLS].s, 1)
357 # enable MPLS QoS recording on the input Pg0 and IP egress marking
360 self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
363 self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
369 # MPLS x-connect - COS is preserved
371 route_32_eos = VppMplsRoute(self, 32, 1,
372 [VppRoutePath(self.pg1.remote_ip4,
373 self.pg1.sw_if_index,
374 labels=[VppMplsLabel(33)])])
375 route_32_eos.add_vpp_config()
377 p_m1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
378 MPLS(label=32, cos=3, ttl=2) /
379 IP(src=self.pg0.remote_ip4, dst="10.0.0.1", tos=1) /
380 UDP(sport=1234, dport=1234) /
383 rx = self.send_and_expect(self.pg0, p_m1 * 65, self.pg1)
385 self.assertEqual(p[MPLS].cos, 7)
386 self.assertEqual(p[MPLS].label, 33)
387 self.assertEqual(p[MPLS].s, 1)
390 # MPLS deag - COS is copied from MPLS to IP
392 route_33_eos = VppMplsRoute(self, 33, 1,
393 [VppRoutePath("0.0.0.0",
396 route_33_eos.add_vpp_config()
398 route_10_0_0_4 = VppIpRoute(self, "10.0.0.4", 32,
399 [VppRoutePath(self.pg1.remote_ip4,
400 self.pg1.sw_if_index)])
401 route_10_0_0_4.add_vpp_config()
403 p_m2 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
404 MPLS(label=33, ttl=2, cos=3) /
405 IP(src=self.pg0.remote_ip4, dst="10.0.0.4", tos=1) /
406 UDP(sport=1234, dport=1234) /
409 rx = self.send_and_expect(self.pg0, p_m2 * 65, self.pg1)
412 self.assertEqual(p[IP].tos, 255)
417 self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
420 self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
424 self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
427 self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
431 self.vapi.qos_egress_map_delete(1)
433 def test_qos_vlan(self):
434 """QoS mark/record VLAN """
437 # QoS for all input values
439 output = [chr(0)] * 256
440 for i in range(0, 255):
441 output[i] = chr(255 - i)
443 rows = [{'outputs': os},
448 self.vapi.qos_egress_map_update(1, rows)
450 sub_if = VppDot1QSubint(self, self.pg0, 11)
459 # enable VLAN QoS recording/marking on the input Pg0 subinterface and
461 self.vapi.qos_record_enable_disable(sub_if.sw_if_index,
464 self.vapi.qos_mark_enable_disable(sub_if.sw_if_index,
470 # IP marking/recording on pg1
472 self.vapi.qos_record_enable_disable(self.pg1.sw_if_index,
475 self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
481 # a routes to/from sub-interface
483 route_10_0_0_1 = VppIpRoute(self, "10.0.0.1", 32,
484 [VppRoutePath(sub_if.remote_ip4,
485 sub_if.sw_if_index)])
486 route_10_0_0_1.add_vpp_config()
487 route_10_0_0_2 = VppIpRoute(self, "10.0.0.2", 32,
488 [VppRoutePath(self.pg1.remote_ip4,
489 self.pg1.sw_if_index)])
490 route_10_0_0_2.add_vpp_config()
491 route_2001_1 = VppIpRoute(self, "2001::1", 128,
492 [VppRoutePath(sub_if.remote_ip6,
494 proto=DpoProto.DPO_PROTO_IP6)],
496 route_2001_1.add_vpp_config()
497 route_2001_2 = VppIpRoute(self, "2001::2", 128,
498 [VppRoutePath(self.pg1.remote_ip6,
499 self.pg1.sw_if_index,
500 proto=DpoProto.DPO_PROTO_IP6)],
502 route_2001_2.add_vpp_config()
504 p_v1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
505 Dot1Q(vlan=11, prio=1) /
506 IP(src="1.1.1.1", dst="10.0.0.2", tos=1) /
507 UDP(sport=1234, dport=1234) /
510 p_v2 = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
511 IP(src="1.1.1.1", dst="10.0.0.1", tos=1) /
512 UDP(sport=1234, dport=1234) /
515 rx = self.send_and_expect(self.pg1, p_v2 * 65, self.pg0)
518 self.assertEqual(p[Dot1Q].prio, 6)
520 rx = self.send_and_expect(self.pg0, p_v1 * 65, self.pg1)
523 self.assertEqual(p[IP].tos, 254)
525 p_v1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
526 Dot1Q(vlan=11, prio=2) /
527 IPv6(src="2001::1", dst="2001::2", tc=1) /
528 UDP(sport=1234, dport=1234) /
531 p_v2 = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
532 IPv6(src="3001::1", dst="2001::1", tc=1) /
533 UDP(sport=1234, dport=1234) /
536 rx = self.send_and_expect(self.pg1, p_v2 * 65, self.pg0)
539 self.assertEqual(p[Dot1Q].prio, 6)
541 rx = self.send_and_expect(self.pg0, p_v1 * 65, self.pg1)
544 self.assertEqual(p[IPv6].tc, 253)
549 sub_if.unconfig_ip4()
550 sub_if.unconfig_ip6()
552 self.vapi.qos_record_enable_disable(sub_if.sw_if_index,
555 self.vapi.qos_mark_enable_disable(sub_if.sw_if_index,
559 self.vapi.qos_record_enable_disable(self.pg1.sw_if_index,
562 self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
568 if __name__ == '__main__':
569 unittest.main(testRunner=VppTestRunner)