5 from framework import VppTestCase, VppTestRunner
6 from vpp_papi_provider import QOS_SOURCE
7 from vpp_sub_interface import VppDot1QSubint
8 from vpp_ip import DpoProto
9 from vpp_ip_route import VppIpRoute, VppRoutePath, VppMplsRoute, \
10 VppMplsLabel, VppMplsTable
13 from scapy.packet import Raw
14 from scapy.layers.l2 import Ether, Dot1Q
15 from scapy.layers.inet import IP, UDP
16 from scapy.layers.inet6 import IPv6
17 from scapy.contrib.mpls import MPLS
20 class TestQOS(VppTestCase):
25 super(TestQOS, cls).setUpClass()
28 def tearDownClass(cls):
29 super(TestQOS, cls).tearDownClass()
32 super(TestQOS, self).setUp()
34 self.create_pg_interfaces(range(5))
36 tbl = VppMplsTable(self, 0)
39 for i in self.pg_interfaces:
48 for i in self.pg_interfaces:
53 super(TestQOS, self).tearDown()
55 def test_qos_ip(self):
56 """ QoS Mark/Record IP """
59 # for table 1 map the n=0xff possible values of input QoS mark,
62 output = [scapy.compat.chb(0)] * 256
63 for i in range(0, 255):
64 output[i] = scapy.compat.chb(255 - i)
66 rows = [{'outputs': os},
71 self.vapi.qos_egress_map_update(1, rows)
74 # For table 2 (and up) use the value n for everything
76 output = [scapy.compat.chb(2)] * 256
78 rows = [{'outputs': os},
83 self.vapi.qos_egress_map_update(2, rows)
85 output = [scapy.compat.chb(3)] * 256
87 rows = [{'outputs': os},
92 self.vapi.qos_egress_map_update(3, rows)
94 output = [scapy.compat.chb(4)] * 256
96 rows = [{'outputs': os},
100 self.vapi.qos_egress_map_update(4, rows)
101 self.vapi.qos_egress_map_update(5, rows)
102 self.vapi.qos_egress_map_update(6, rows)
103 self.vapi.qos_egress_map_update(7, rows)
105 self.logger.info(self.vapi.cli("sh qos eg map"))
108 # Bind interface pgN to table n
110 self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
114 self.vapi.qos_mark_enable_disable(self.pg2.sw_if_index,
118 self.vapi.qos_mark_enable_disable(self.pg3.sw_if_index,
122 self.vapi.qos_mark_enable_disable(self.pg4.sw_if_index,
128 # packets ingress on Pg0
130 p_v4 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
131 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, tos=1) /
132 UDP(sport=1234, dport=1234) /
133 Raw(scapy.compat.chb(100) * 65))
134 p_v6 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
135 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6,
137 UDP(sport=1234, dport=1234) /
138 Raw(scapy.compat.chb(100) * 65))
141 # Since we have not yet enabled the recording of the input QoS
142 # from the input iP header, the egress packet's ToS will be unchanged
144 rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
146 self.assertEqual(p[IP].tos, 1)
147 rx = self.send_and_expect(self.pg0, p_v6 * 65, self.pg1)
149 self.assertEqual(p[IPv6].tc, 1)
152 # Enable QoS recording on IP input for pg0
154 self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
159 # send the same packets, this time expect the input TOS of 1
160 # to be mapped to pg1's egress value of 254
162 rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
164 self.assertEqual(p[IP].tos, 254)
165 rx = self.send_and_expect(self.pg0, p_v6 * 65, self.pg1)
167 self.assertEqual(p[IPv6].tc, 254)
170 # different input ToS to test the mapping
173 rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
175 self.assertEqual(p[IP].tos, 128)
177 rx = self.send_and_expect(self.pg0, p_v6 * 65, self.pg1)
179 self.assertEqual(p[IPv6].tc, 128)
182 rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
184 self.assertEqual(p[IP].tos, 1)
186 rx = self.send_and_expect(self.pg0, p_v6 * 65, self.pg1)
188 self.assertEqual(p[IPv6].tc, 1)
191 # send packets out the other interfaces to test the maps are
194 p_v4[IP].dst = self.pg2.remote_ip4
195 rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg2)
197 self.assertEqual(p[IP].tos, 2)
199 p_v4[IP].dst = self.pg3.remote_ip4
200 rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg3)
202 self.assertEqual(p[IP].tos, 3)
204 p_v6[IPv6].dst = self.pg3.remote_ip6
205 rx = self.send_and_expect(self.pg0, p_v6 * 65, self.pg3)
207 self.assertEqual(p[IPv6].tc, 3)
210 # remove the map on pg2 and pg3, now expect an unchanged IP tos
212 self.vapi.qos_mark_enable_disable(self.pg2.sw_if_index,
216 self.vapi.qos_mark_enable_disable(self.pg3.sw_if_index,
220 self.logger.info(self.vapi.cli("sh int feat pg2"))
222 p_v4[IP].dst = self.pg2.remote_ip4
223 rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg2)
225 self.assertEqual(p[IP].tos, 254)
227 p_v4[IP].dst = self.pg3.remote_ip4
228 rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg3)
230 self.assertEqual(p[IP].tos, 254)
233 # still mapping out of pg1
235 p_v4[IP].dst = self.pg1.remote_ip4
236 rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
238 self.assertEqual(p[IP].tos, 1)
241 # disable the input recording on pg0
243 self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
248 # back to an unchanged TOS value
250 rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
252 self.assertEqual(p[IP].tos, 254)
255 # disable the egress map on pg1 and pg4
257 self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
261 self.vapi.qos_mark_enable_disable(self.pg4.sw_if_index,
267 # unchanged Tos on pg1
269 rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
271 self.assertEqual(p[IP].tos, 254)
276 self.vapi.qos_egress_map_delete(1)
277 self.vapi.qos_egress_map_delete(4)
278 self.vapi.qos_egress_map_delete(2)
279 self.vapi.qos_egress_map_delete(3)
280 self.vapi.qos_egress_map_delete(5)
281 self.vapi.qos_egress_map_delete(6)
282 self.vapi.qos_egress_map_delete(7)
284 def test_qos_mpls(self):
285 """ QoS Mark/Record MPLS """
288 # 255 QoS for all input values
294 output = [scapy.compat.chb(from_ext)] * 256
295 os1 = b''.join(output)
296 output = [scapy.compat.chb(from_vlan)] * 256
297 os2 = b''.join(output)
298 output = [scapy.compat.chb(from_mpls)] * 256
299 os3 = b''.join(output)
300 output = [scapy.compat.chb(from_ip)] * 256
301 os4 = b''.join(output)
302 rows = [{'outputs': os1},
307 self.vapi.qos_egress_map_update(1, rows)
310 # a route with 1 MPLS label
312 route_10_0_0_1 = VppIpRoute(self, "10.0.0.1", 32,
313 [VppRoutePath(self.pg1.remote_ip4,
314 self.pg1.sw_if_index,
316 route_10_0_0_1.add_vpp_config()
319 # a route with 3 MPLS labels
321 route_10_0_0_3 = VppIpRoute(self, "10.0.0.3", 32,
322 [VppRoutePath(self.pg1.remote_ip4,
323 self.pg1.sw_if_index,
324 labels=[63, 33, 34])])
325 route_10_0_0_3.add_vpp_config()
328 # enable IP QoS recording on the input Pg0 and MPLS egress marking
331 self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
334 self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
340 # packet that will get one label added and 3 labels added resp.
342 p_1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
343 IP(src=self.pg0.remote_ip4, dst="10.0.0.1", tos=1) /
344 UDP(sport=1234, dport=1234) /
345 Raw(scapy.compat.chb(100) * 65))
346 p_3 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
347 IP(src=self.pg0.remote_ip4, dst="10.0.0.3", tos=1) /
348 UDP(sport=1234, dport=1234) /
349 Raw(scapy.compat.chb(100) * 65))
351 rx = self.send_and_expect(self.pg0, p_1 * 65, self.pg1)
354 # only 3 bits of ToS value in MPLS make sure tos is correct
355 # and the label and EOS bit have not been corrupted
358 self.assertEqual(p[MPLS].cos, from_ip)
359 self.assertEqual(p[MPLS].label, 32)
360 self.assertEqual(p[MPLS].s, 1)
361 rx = self.send_and_expect(self.pg0, p_3 * 65, self.pg1)
363 self.assertEqual(p[MPLS].cos, from_ip)
364 self.assertEqual(p[MPLS].label, 63)
365 self.assertEqual(p[MPLS].s, 0)
367 self.assertEqual(h[MPLS].cos, from_ip)
368 self.assertEqual(h[MPLS].label, 33)
369 self.assertEqual(h[MPLS].s, 0)
371 self.assertEqual(h[MPLS].cos, from_ip)
372 self.assertEqual(h[MPLS].label, 34)
373 self.assertEqual(h[MPLS].s, 1)
376 # enable MPLS QoS recording on the input Pg0 and IP egress marking
379 self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
382 self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
388 # MPLS x-connect - COS according to pg1 map
390 route_32_eos = VppMplsRoute(self, 32, 1,
391 [VppRoutePath(self.pg1.remote_ip4,
392 self.pg1.sw_if_index,
393 labels=[VppMplsLabel(33)])])
394 route_32_eos.add_vpp_config()
396 p_m1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
397 MPLS(label=32, cos=3, ttl=2) /
398 IP(src=self.pg0.remote_ip4, dst="10.0.0.1", tos=1) /
399 UDP(sport=1234, dport=1234) /
400 Raw(scapy.compat.chb(100) * 65))
402 rx = self.send_and_expect(self.pg0, p_m1 * 65, self.pg1)
404 self.assertEqual(p[MPLS].cos, from_mpls)
405 self.assertEqual(p[MPLS].label, 33)
406 self.assertEqual(p[MPLS].s, 1)
409 # MPLS deag - COS is copied from MPLS to IP
411 route_33_eos = VppMplsRoute(self, 33, 1,
412 [VppRoutePath("0.0.0.0",
415 route_33_eos.add_vpp_config()
417 route_10_0_0_4 = VppIpRoute(self, "10.0.0.4", 32,
418 [VppRoutePath(self.pg1.remote_ip4,
419 self.pg1.sw_if_index)])
420 route_10_0_0_4.add_vpp_config()
422 p_m2 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
423 MPLS(label=33, ttl=2, cos=3) /
424 IP(src=self.pg0.remote_ip4, dst="10.0.0.4", tos=1) /
425 UDP(sport=1234, dport=1234) /
426 Raw(scapy.compat.chb(100) * 65))
428 rx = self.send_and_expect(self.pg0, p_m2 * 65, self.pg1)
431 self.assertEqual(p[IP].tos, from_mpls)
436 self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
439 self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
443 self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
446 self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
450 self.vapi.qos_egress_map_delete(1)
452 def test_qos_vlan(self):
453 """QoS mark/record VLAN """
456 # QoS for all input values
458 output = [scapy.compat.chb(0)] * 256
459 for i in range(0, 255):
460 output[i] = scapy.compat.chb(255 - i)
461 os = b''.join(output)
462 rows = [{'outputs': os},
467 self.vapi.qos_egress_map_update(1, rows)
469 sub_if = VppDot1QSubint(self, self.pg0, 11)
478 # enable VLAN QoS recording/marking on the input Pg0 subinterface and
480 self.vapi.qos_record_enable_disable(sub_if.sw_if_index,
483 self.vapi.qos_mark_enable_disable(sub_if.sw_if_index,
489 # IP marking/recording on pg1
491 self.vapi.qos_record_enable_disable(self.pg1.sw_if_index,
494 self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
500 # a routes to/from sub-interface
502 route_10_0_0_1 = VppIpRoute(self, "10.0.0.1", 32,
503 [VppRoutePath(sub_if.remote_ip4,
504 sub_if.sw_if_index)])
505 route_10_0_0_1.add_vpp_config()
506 route_10_0_0_2 = VppIpRoute(self, "10.0.0.2", 32,
507 [VppRoutePath(self.pg1.remote_ip4,
508 self.pg1.sw_if_index)])
509 route_10_0_0_2.add_vpp_config()
510 route_2001_1 = VppIpRoute(self, "2001::1", 128,
511 [VppRoutePath(sub_if.remote_ip6,
513 proto=DpoProto.DPO_PROTO_IP6)],
515 route_2001_1.add_vpp_config()
516 route_2001_2 = VppIpRoute(self, "2001::2", 128,
517 [VppRoutePath(self.pg1.remote_ip6,
518 self.pg1.sw_if_index,
519 proto=DpoProto.DPO_PROTO_IP6)],
521 route_2001_2.add_vpp_config()
523 p_v1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
524 Dot1Q(vlan=11, prio=1) /
525 IP(src="1.1.1.1", dst="10.0.0.2", tos=1) /
526 UDP(sport=1234, dport=1234) /
527 Raw(scapy.compat.chb(100) * 65))
529 p_v2 = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
530 IP(src="1.1.1.1", dst="10.0.0.1", tos=1) /
531 UDP(sport=1234, dport=1234) /
532 Raw(scapy.compat.chb(100) * 65))
534 rx = self.send_and_expect(self.pg1, p_v2 * 65, self.pg0)
537 self.assertEqual(p[Dot1Q].prio, 6)
539 rx = self.send_and_expect(self.pg0, p_v1 * 65, self.pg1)
542 self.assertEqual(p[IP].tos, 254)
544 p_v1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
545 Dot1Q(vlan=11, prio=2) /
546 IPv6(src="2001::1", dst="2001::2", tc=1) /
547 UDP(sport=1234, dport=1234) /
548 Raw(scapy.compat.chb(100) * 65))
550 p_v2 = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
551 IPv6(src="3001::1", dst="2001::1", tc=1) /
552 UDP(sport=1234, dport=1234) /
553 Raw(scapy.compat.chb(100) * 65))
555 rx = self.send_and_expect(self.pg1, p_v2 * 65, self.pg0)
558 self.assertEqual(p[Dot1Q].prio, 6)
560 rx = self.send_and_expect(self.pg0, p_v1 * 65, self.pg1)
563 self.assertEqual(p[IPv6].tc, 253)
568 sub_if.unconfig_ip4()
569 sub_if.unconfig_ip6()
571 self.vapi.qos_record_enable_disable(sub_if.sw_if_index,
574 self.vapi.qos_mark_enable_disable(sub_if.sw_if_index,
578 self.vapi.qos_record_enable_disable(self.pg1.sw_if_index,
581 self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
587 if __name__ == '__main__':
588 unittest.main(testRunner=VppTestRunner)