5 from framework import VppTestCase, VppTestRunner
6 from vpp_papi_provider import QOS_SOURCE
7 from vpp_sub_interface import VppDot1QSubint
8 from vpp_ip import DpoProto
9 from vpp_ip_route import VppIpRoute, VppRoutePath, VppMplsRoute, \
10 VppMplsLabel, VppMplsTable
13 from scapy.packet import Raw
14 from scapy.layers.l2 import Ether, Dot1Q
15 from scapy.layers.inet import IP, UDP
16 from scapy.layers.inet6 import IPv6
17 from scapy.contrib.mpls import MPLS
22 class TestQOS(VppTestCase):
27 super(TestQOS, cls).setUpClass()
30 def tearDownClass(cls):
31 super(TestQOS, cls).tearDownClass()
34 super(TestQOS, self).setUp()
36 self.create_pg_interfaces(range(5))
38 tbl = VppMplsTable(self, 0)
41 for i in self.pg_interfaces:
50 for i in self.pg_interfaces:
55 super(TestQOS, self).tearDown()
57 def test_qos_ip(self):
58 """ QoS Mark/Record IP """
61 # for table 1 map the n=0xff possible values of input QoS mark,
64 output = [scapy.compat.chb(0)] * 256
65 for i in range(0, 255):
66 output[i] = scapy.compat.chb(255 - i)
68 rows = [{'outputs': os},
73 self.vapi.qos_egress_map_update(1, rows)
76 # For table 2 (and up) use the value n for everything
78 output = [scapy.compat.chb(2)] * 256
80 rows = [{'outputs': os},
85 self.vapi.qos_egress_map_update(2, rows)
87 output = [scapy.compat.chb(3)] * 256
89 rows = [{'outputs': os},
94 self.vapi.qos_egress_map_update(3, rows)
96 output = [scapy.compat.chb(4)] * 256
98 rows = [{'outputs': os},
102 self.vapi.qos_egress_map_update(4, rows)
103 self.vapi.qos_egress_map_update(5, rows)
104 self.vapi.qos_egress_map_update(6, rows)
105 self.vapi.qos_egress_map_update(7, rows)
107 self.logger.info(self.vapi.cli("sh qos eg map"))
110 # Bind interface pgN to table n
112 self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
116 self.vapi.qos_mark_enable_disable(self.pg2.sw_if_index,
120 self.vapi.qos_mark_enable_disable(self.pg3.sw_if_index,
124 self.vapi.qos_mark_enable_disable(self.pg4.sw_if_index,
130 # packets ingress on Pg0
132 p_v4 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
133 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, tos=1) /
134 UDP(sport=1234, dport=1234) /
135 Raw(scapy.compat.chb(100) * NUM_PKTS))
136 p_v6 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
137 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6,
139 UDP(sport=1234, dport=1234) /
140 Raw(scapy.compat.chb(100) * NUM_PKTS))
143 # Since we have not yet enabled the recording of the input QoS
144 # from the input iP header, the egress packet's ToS will be unchanged
146 rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg1)
148 self.assertEqual(p[IP].tos, 1)
149 rx = self.send_and_expect(self.pg0, p_v6 * NUM_PKTS, self.pg1)
151 self.assertEqual(p[IPv6].tc, 1)
154 # Enable QoS recording on IP input for pg0
156 self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
161 # send the same packets, this time expect the input TOS of 1
162 # to be mapped to pg1's egress value of 254
164 rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg1)
166 self.assertEqual(p[IP].tos, 254)
167 rx = self.send_and_expect(self.pg0, p_v6 * NUM_PKTS, self.pg1)
169 self.assertEqual(p[IPv6].tc, 254)
172 # different input ToS to test the mapping
175 rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg1)
177 self.assertEqual(p[IP].tos, 128)
179 rx = self.send_and_expect(self.pg0, p_v6 * NUM_PKTS, self.pg1)
181 self.assertEqual(p[IPv6].tc, 128)
184 rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg1)
186 self.assertEqual(p[IP].tos, 1)
188 rx = self.send_and_expect(self.pg0, p_v6 * NUM_PKTS, self.pg1)
190 self.assertEqual(p[IPv6].tc, 1)
193 # send packets out the other interfaces to test the maps are
196 p_v4[IP].dst = self.pg2.remote_ip4
197 rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg2)
199 self.assertEqual(p[IP].tos, 2)
201 p_v4[IP].dst = self.pg3.remote_ip4
202 rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg3)
204 self.assertEqual(p[IP].tos, 3)
206 p_v6[IPv6].dst = self.pg3.remote_ip6
207 rx = self.send_and_expect(self.pg0, p_v6 * NUM_PKTS, self.pg3)
209 self.assertEqual(p[IPv6].tc, 3)
212 # remove the map on pg2 and pg3, now expect an unchanged IP tos
214 self.vapi.qos_mark_enable_disable(self.pg2.sw_if_index,
218 self.vapi.qos_mark_enable_disable(self.pg3.sw_if_index,
222 self.logger.info(self.vapi.cli("sh int feat pg2"))
224 p_v4[IP].dst = self.pg2.remote_ip4
225 rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg2)
227 self.assertEqual(p[IP].tos, 254)
229 p_v4[IP].dst = self.pg3.remote_ip4
230 rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg3)
232 self.assertEqual(p[IP].tos, 254)
235 # still mapping out of pg1
237 p_v4[IP].dst = self.pg1.remote_ip4
238 rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg1)
240 self.assertEqual(p[IP].tos, 1)
243 # disable the input recording on pg0
245 self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
250 # back to an unchanged TOS value
252 rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg1)
254 self.assertEqual(p[IP].tos, 254)
257 # disable the egress map on pg1 and pg4
259 self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
263 self.vapi.qos_mark_enable_disable(self.pg4.sw_if_index,
269 # unchanged Tos on pg1
271 rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg1)
273 self.assertEqual(p[IP].tos, 254)
278 self.vapi.qos_egress_map_delete(1)
279 self.vapi.qos_egress_map_delete(4)
280 self.vapi.qos_egress_map_delete(2)
281 self.vapi.qos_egress_map_delete(3)
282 self.vapi.qos_egress_map_delete(5)
283 self.vapi.qos_egress_map_delete(6)
284 self.vapi.qos_egress_map_delete(7)
286 def test_qos_mpls(self):
287 """ QoS Mark/Record MPLS """
290 # 255 QoS for all input values
296 output = [scapy.compat.chb(from_ext)] * 256
297 os1 = b''.join(output)
298 output = [scapy.compat.chb(from_vlan)] * 256
299 os2 = b''.join(output)
300 output = [scapy.compat.chb(from_mpls)] * 256
301 os3 = b''.join(output)
302 output = [scapy.compat.chb(from_ip)] * 256
303 os4 = b''.join(output)
304 rows = [{'outputs': os1},
309 self.vapi.qos_egress_map_update(1, rows)
312 # a route with 1 MPLS label
314 route_10_0_0_1 = VppIpRoute(self, "10.0.0.1", 32,
315 [VppRoutePath(self.pg1.remote_ip4,
316 self.pg1.sw_if_index,
318 route_10_0_0_1.add_vpp_config()
321 # a route with 3 MPLS labels
323 route_10_0_0_3 = VppIpRoute(self, "10.0.0.3", 32,
324 [VppRoutePath(self.pg1.remote_ip4,
325 self.pg1.sw_if_index,
326 labels=[63, 33, 34])])
327 route_10_0_0_3.add_vpp_config()
330 # enable IP QoS recording on the input Pg0 and MPLS egress marking
333 self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
336 self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
342 # packet that will get one label added and 3 labels added resp.
344 p_1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
345 IP(src=self.pg0.remote_ip4, dst="10.0.0.1", tos=1) /
346 UDP(sport=1234, dport=1234) /
347 Raw(scapy.compat.chb(100) * NUM_PKTS))
348 p_3 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
349 IP(src=self.pg0.remote_ip4, dst="10.0.0.3", tos=1) /
350 UDP(sport=1234, dport=1234) /
351 Raw(scapy.compat.chb(100) * NUM_PKTS))
353 rx = self.send_and_expect(self.pg0, p_1 * NUM_PKTS, self.pg1)
356 # only 3 bits of ToS value in MPLS make sure tos is correct
357 # and the label and EOS bit have not been corrupted
360 self.assertEqual(p[MPLS].cos, from_ip)
361 self.assertEqual(p[MPLS].label, 32)
362 self.assertEqual(p[MPLS].s, 1)
363 rx = self.send_and_expect(self.pg0, p_3 * NUM_PKTS, self.pg1)
365 self.assertEqual(p[MPLS].cos, from_ip)
366 self.assertEqual(p[MPLS].label, 63)
367 self.assertEqual(p[MPLS].s, 0)
369 self.assertEqual(h[MPLS].cos, from_ip)
370 self.assertEqual(h[MPLS].label, 33)
371 self.assertEqual(h[MPLS].s, 0)
373 self.assertEqual(h[MPLS].cos, from_ip)
374 self.assertEqual(h[MPLS].label, 34)
375 self.assertEqual(h[MPLS].s, 1)
378 # enable MPLS QoS recording on the input Pg0 and IP egress marking
381 self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
384 self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
390 # MPLS x-connect - COS according to pg1 map
392 route_32_eos = VppMplsRoute(self, 32, 1,
393 [VppRoutePath(self.pg1.remote_ip4,
394 self.pg1.sw_if_index,
395 labels=[VppMplsLabel(33)])])
396 route_32_eos.add_vpp_config()
398 p_m1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
399 MPLS(label=32, cos=3, ttl=2) /
400 IP(src=self.pg0.remote_ip4, dst="10.0.0.1", tos=1) /
401 UDP(sport=1234, dport=1234) /
402 Raw(scapy.compat.chb(100) * NUM_PKTS))
404 rx = self.send_and_expect(self.pg0, p_m1 * NUM_PKTS, self.pg1)
406 self.assertEqual(p[MPLS].cos, from_mpls)
407 self.assertEqual(p[MPLS].label, 33)
408 self.assertEqual(p[MPLS].s, 1)
411 # MPLS deag - COS is copied from MPLS to IP
413 route_33_eos = VppMplsRoute(self, 33, 1,
414 [VppRoutePath("0.0.0.0",
417 route_33_eos.add_vpp_config()
419 route_10_0_0_4 = VppIpRoute(self, "10.0.0.4", 32,
420 [VppRoutePath(self.pg1.remote_ip4,
421 self.pg1.sw_if_index)])
422 route_10_0_0_4.add_vpp_config()
424 p_m2 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
425 MPLS(label=33, ttl=2, cos=3) /
426 IP(src=self.pg0.remote_ip4, dst="10.0.0.4", tos=1) /
427 UDP(sport=1234, dport=1234) /
428 Raw(scapy.compat.chb(100) * NUM_PKTS))
430 rx = self.send_and_expect(self.pg0, p_m2 * NUM_PKTS, self.pg1)
433 self.assertEqual(p[IP].tos, from_mpls)
438 self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
441 self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
445 self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
448 self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
452 self.vapi.qos_egress_map_delete(1)
454 def test_qos_vlan(self):
455 """QoS mark/record VLAN """
458 # QoS for all input values
460 output = [scapy.compat.chb(0)] * 256
461 for i in range(0, 255):
462 output[i] = scapy.compat.chb(255 - i)
463 os = b''.join(output)
464 rows = [{'outputs': os},
469 self.vapi.qos_egress_map_update(1, rows)
471 sub_if = VppDot1QSubint(self, self.pg0, 11)
480 # enable VLAN QoS recording/marking on the input Pg0 subinterface and
482 self.vapi.qos_record_enable_disable(sub_if.sw_if_index,
485 self.vapi.qos_mark_enable_disable(sub_if.sw_if_index,
491 # IP marking/recording on pg1
493 self.vapi.qos_record_enable_disable(self.pg1.sw_if_index,
496 self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
502 # a routes to/from sub-interface
504 route_10_0_0_1 = VppIpRoute(self, "10.0.0.1", 32,
505 [VppRoutePath(sub_if.remote_ip4,
506 sub_if.sw_if_index)])
507 route_10_0_0_1.add_vpp_config()
508 route_10_0_0_2 = VppIpRoute(self, "10.0.0.2", 32,
509 [VppRoutePath(self.pg1.remote_ip4,
510 self.pg1.sw_if_index)])
511 route_10_0_0_2.add_vpp_config()
512 route_2001_1 = VppIpRoute(self, "2001::1", 128,
513 [VppRoutePath(sub_if.remote_ip6,
515 proto=DpoProto.DPO_PROTO_IP6)],
517 route_2001_1.add_vpp_config()
518 route_2001_2 = VppIpRoute(self, "2001::2", 128,
519 [VppRoutePath(self.pg1.remote_ip6,
520 self.pg1.sw_if_index,
521 proto=DpoProto.DPO_PROTO_IP6)],
523 route_2001_2.add_vpp_config()
525 p_v1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
526 Dot1Q(vlan=11, prio=1) /
527 IP(src="1.1.1.1", dst="10.0.0.2", tos=1) /
528 UDP(sport=1234, dport=1234) /
529 Raw(scapy.compat.chb(100) * NUM_PKTS))
531 p_v2 = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
532 IP(src="1.1.1.1", dst="10.0.0.1", tos=1) /
533 UDP(sport=1234, dport=1234) /
534 Raw(scapy.compat.chb(100) * NUM_PKTS))
536 rx = self.send_and_expect(self.pg1, p_v2 * NUM_PKTS, self.pg0)
539 self.assertEqual(p[Dot1Q].prio, 6)
541 rx = self.send_and_expect(self.pg0, p_v1 * NUM_PKTS, self.pg1)
544 self.assertEqual(p[IP].tos, 254)
546 p_v1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
547 Dot1Q(vlan=11, prio=2) /
548 IPv6(src="2001::1", dst="2001::2", tc=1) /
549 UDP(sport=1234, dport=1234) /
550 Raw(scapy.compat.chb(100) * NUM_PKTS))
552 p_v2 = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
553 IPv6(src="3001::1", dst="2001::1", tc=1) /
554 UDP(sport=1234, dport=1234) /
555 Raw(scapy.compat.chb(100) * NUM_PKTS))
557 rx = self.send_and_expect(self.pg1, p_v2 * NUM_PKTS, self.pg0)
560 self.assertEqual(p[Dot1Q].prio, 6)
562 rx = self.send_and_expect(self.pg0, p_v1 * NUM_PKTS, self.pg1)
565 self.assertEqual(p[IPv6].tc, 253)
570 sub_if.unconfig_ip4()
571 sub_if.unconfig_ip6()
573 self.vapi.qos_record_enable_disable(sub_if.sw_if_index,
576 self.vapi.qos_mark_enable_disable(sub_if.sw_if_index,
580 self.vapi.qos_record_enable_disable(self.pg1.sw_if_index,
583 self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
589 if __name__ == '__main__':
590 unittest.main(testRunner=VppTestRunner)