5 from framework import VppTestCase, VppTestRunner
6 from vpp_papi_provider import QOS_SOURCE
7 from vpp_sub_interface import VppDot1QSubint
8 from vpp_ip_route import VppIpRoute, VppRoutePath, VppMplsRoute, \
9 VppMplsLabel, VppMplsTable, DpoProto
11 from scapy.packet import Raw
12 from scapy.layers.l2 import Ether, Dot1Q
13 from scapy.layers.inet import IP, UDP
14 from scapy.layers.inet6 import IPv6
15 from scapy.contrib.mpls import MPLS
18 class TestQOS(VppTestCase):
22 super(TestQOS, self).setUp()
24 self.create_pg_interfaces(range(5))
26 tbl = VppMplsTable(self, 0)
29 for i in self.pg_interfaces:
38 for i in self.pg_interfaces:
43 super(TestQOS, self).tearDown()
45 def test_qos_ip(self):
46 """ QoS Mark/Record IP """
49 # for table 1 map the n=0xff possible values of input QoS mark,
52 output = [chr(0)] * 256
53 for i in range(0, 255):
54 output[i] = chr(255 - i)
56 rows = [{'outputs': os},
61 self.vapi.qos_egress_map_update(1, rows)
64 # For table 2 (and up) use the value n for everything
66 output = [chr(2)] * 256
68 rows = [{'outputs': os},
73 self.vapi.qos_egress_map_update(2, rows)
75 output = [chr(3)] * 256
77 rows = [{'outputs': os},
82 self.vapi.qos_egress_map_update(3, rows)
84 output = [chr(4)] * 256
86 rows = [{'outputs': os},
90 self.vapi.qos_egress_map_update(4, rows)
91 self.vapi.qos_egress_map_update(5, rows)
92 self.vapi.qos_egress_map_update(6, rows)
93 self.vapi.qos_egress_map_update(7, rows)
95 self.logger.info(self.vapi.cli("sh qos eg map"))
98 # Bind interface pgN to table n
100 self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
104 self.vapi.qos_mark_enable_disable(self.pg2.sw_if_index,
108 self.vapi.qos_mark_enable_disable(self.pg3.sw_if_index,
112 self.vapi.qos_mark_enable_disable(self.pg4.sw_if_index,
118 # packets ingress on Pg0
120 p_v4 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
121 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, tos=1) /
122 UDP(sport=1234, dport=1234) /
124 p_v6 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
125 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6,
127 UDP(sport=1234, dport=1234) /
131 # Since we have not yet enabled the recording of the input QoS
132 # from the input iP header, the egress packet's ToS will be unchanged
134 rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
136 self.assertEqual(p[IP].tos, 1)
137 rx = self.send_and_expect(self.pg0, p_v6 * 65, self.pg1)
139 self.assertEqual(p[IPv6].tc, 1)
142 # Enable QoS recrding on IP input for pg0
144 self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
149 # send the same packets, this time expect the input TOS of 1
150 # to be mapped to pg1's egress value of 254
152 rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
154 self.assertEqual(p[IP].tos, 254)
155 rx = self.send_and_expect(self.pg0, p_v6 * 65, self.pg1)
157 self.assertEqual(p[IPv6].tc, 254)
160 # different input ToS to test the mapping
163 rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
165 self.assertEqual(p[IP].tos, 128)
167 rx = self.send_and_expect(self.pg0, p_v6 * 65, self.pg1)
169 self.assertEqual(p[IPv6].tc, 128)
172 rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
174 self.assertEqual(p[IP].tos, 1)
176 rx = self.send_and_expect(self.pg0, p_v6 * 65, self.pg1)
178 self.assertEqual(p[IPv6].tc, 1)
181 # send packets out the other interfaces to test the maps are
184 p_v4[IP].dst = self.pg2.remote_ip4
185 rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg2)
187 self.assertEqual(p[IP].tos, 2)
189 p_v4[IP].dst = self.pg3.remote_ip4
190 rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg3)
192 self.assertEqual(p[IP].tos, 3)
194 p_v6[IPv6].dst = self.pg3.remote_ip6
195 rx = self.send_and_expect(self.pg0, p_v6 * 65, self.pg3)
197 self.assertEqual(p[IPv6].tc, 3)
200 # remove the map on pg2 and pg3, now expect an unchanged IP tos
202 self.vapi.qos_mark_enable_disable(self.pg2.sw_if_index,
206 self.vapi.qos_mark_enable_disable(self.pg3.sw_if_index,
210 self.logger.info(self.vapi.cli("sh int feat pg2"))
212 p_v4[IP].dst = self.pg2.remote_ip4
213 rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg2)
215 self.assertEqual(p[IP].tos, 254)
217 p_v4[IP].dst = self.pg3.remote_ip4
218 rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg3)
220 self.assertEqual(p[IP].tos, 254)
223 # still mapping out of pg1
225 p_v4[IP].dst = self.pg1.remote_ip4
226 rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
228 self.assertEqual(p[IP].tos, 1)
231 # disable the input recording on pg0
233 self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
238 # back to an unchanged TOS value
240 rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
242 self.assertEqual(p[IP].tos, 254)
245 # disable the egress map on pg1 and pg4
247 self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
251 self.vapi.qos_mark_enable_disable(self.pg4.sw_if_index,
257 # unchanged Tos on pg1
259 rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
261 self.assertEqual(p[IP].tos, 254)
266 self.vapi.qos_egress_map_delete(1)
267 self.vapi.qos_egress_map_delete(4)
268 self.vapi.qos_egress_map_delete(2)
269 self.vapi.qos_egress_map_delete(3)
270 self.vapi.qos_egress_map_delete(5)
271 self.vapi.qos_egress_map_delete(6)
272 self.vapi.qos_egress_map_delete(7)
274 def test_qos_mpls(self):
275 """ QoS Mark/Record MPLS """
278 # 255 QoS for all input values
280 output = [chr(255)] * 256
282 rows = [{'outputs': os},
287 self.vapi.qos_egress_map_update(1, rows)
290 # a route with 1 MPLS label
292 route_10_0_0_1 = VppIpRoute(self, "10.0.0.1", 32,
293 [VppRoutePath(self.pg1.remote_ip4,
294 self.pg1.sw_if_index,
296 route_10_0_0_1.add_vpp_config()
299 # a route with 3 MPLS labels
301 route_10_0_0_3 = VppIpRoute(self, "10.0.0.3", 32,
302 [VppRoutePath(self.pg1.remote_ip4,
303 self.pg1.sw_if_index,
304 labels=[63, 33, 34])])
305 route_10_0_0_3.add_vpp_config()
308 # enable IP QoS recording on the input Pg0 and MPLS egress marking
311 self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
314 self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
320 # packet that will get one label added and 3 labels added resp.
322 p_1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
323 IP(src=self.pg0.remote_ip4, dst="10.0.0.1", tos=1) /
324 UDP(sport=1234, dport=1234) /
326 p_3 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
327 IP(src=self.pg0.remote_ip4, dst="10.0.0.3", tos=1) /
328 UDP(sport=1234, dport=1234) /
331 rx = self.send_and_expect(self.pg0, p_1 * 65, self.pg1)
334 # only 3 bits of ToS value in MPLS make sure tos is correct
335 # and the label and EOS bit have not been corrupted
338 self.assertEqual(p[MPLS].cos, 7)
339 self.assertEqual(p[MPLS].label, 32)
340 self.assertEqual(p[MPLS].s, 1)
341 rx = self.send_and_expect(self.pg0, p_3 * 65, self.pg1)
343 self.assertEqual(p[MPLS].cos, 7)
344 self.assertEqual(p[MPLS].label, 63)
345 self.assertEqual(p[MPLS].s, 0)
347 self.assertEqual(h[MPLS].cos, 7)
348 self.assertEqual(h[MPLS].label, 33)
349 self.assertEqual(h[MPLS].s, 0)
351 self.assertEqual(h[MPLS].cos, 7)
352 self.assertEqual(h[MPLS].label, 34)
353 self.assertEqual(h[MPLS].s, 1)
356 # enable MPLS QoS recording on the input Pg0 and IP egress marking
359 self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
362 self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
368 # MPLS x-connect - COS is preserved
370 route_32_eos = VppMplsRoute(self, 32, 1,
371 [VppRoutePath(self.pg1.remote_ip4,
372 self.pg1.sw_if_index,
373 labels=[VppMplsLabel(33)])])
374 route_32_eos.add_vpp_config()
376 p_m1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
377 MPLS(label=32, cos=3, ttl=2) /
378 IP(src=self.pg0.remote_ip4, dst="10.0.0.1", tos=1) /
379 UDP(sport=1234, dport=1234) /
382 rx = self.send_and_expect(self.pg0, p_m1 * 65, self.pg1)
384 self.assertEqual(p[MPLS].cos, 7)
385 self.assertEqual(p[MPLS].label, 33)
386 self.assertEqual(p[MPLS].s, 1)
389 # MPLS deag - COS is copied from MPLS to IP
391 route_33_eos = VppMplsRoute(self, 33, 1,
392 [VppRoutePath("0.0.0.0",
395 route_33_eos.add_vpp_config()
397 route_10_0_0_4 = VppIpRoute(self, "10.0.0.4", 32,
398 [VppRoutePath(self.pg1.remote_ip4,
399 self.pg1.sw_if_index)])
400 route_10_0_0_4.add_vpp_config()
402 p_m2 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
403 MPLS(label=33, ttl=2, cos=3) /
404 IP(src=self.pg0.remote_ip4, dst="10.0.0.4", tos=1) /
405 UDP(sport=1234, dport=1234) /
408 rx = self.send_and_expect(self.pg0, p_m2 * 65, self.pg1)
411 self.assertEqual(p[IP].tos, 255)
416 self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
419 self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
423 self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
426 self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
430 self.vapi.qos_egress_map_delete(1)
432 def test_qos_vlan(self):
433 """QoS mark/record VLAN """
436 # QoS for all input values
438 output = [chr(0)] * 256
439 for i in range(0, 255):
440 output[i] = chr(255 - i)
442 rows = [{'outputs': os},
447 self.vapi.qos_egress_map_update(1, rows)
449 sub_if = VppDot1QSubint(self, self.pg0, 11)
458 # enable VLAN QoS recording/marking on the input Pg0 subinterface and
460 self.vapi.qos_record_enable_disable(sub_if.sw_if_index,
463 self.vapi.qos_mark_enable_disable(sub_if.sw_if_index,
469 # IP marking/recording on pg1
471 self.vapi.qos_record_enable_disable(self.pg1.sw_if_index,
474 self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
480 # a routes to/from sub-interface
482 route_10_0_0_1 = VppIpRoute(self, "10.0.0.1", 32,
483 [VppRoutePath(sub_if.remote_ip4,
484 sub_if.sw_if_index)])
485 route_10_0_0_1.add_vpp_config()
486 route_10_0_0_2 = VppIpRoute(self, "10.0.0.2", 32,
487 [VppRoutePath(self.pg1.remote_ip4,
488 self.pg1.sw_if_index)])
489 route_10_0_0_2.add_vpp_config()
490 route_2001_1 = VppIpRoute(self, "2001::1", 128,
491 [VppRoutePath(sub_if.remote_ip6,
493 proto=DpoProto.DPO_PROTO_IP6)],
495 route_2001_1.add_vpp_config()
496 route_2001_2 = VppIpRoute(self, "2001::2", 128,
497 [VppRoutePath(self.pg1.remote_ip6,
498 self.pg1.sw_if_index,
499 proto=DpoProto.DPO_PROTO_IP6)],
501 route_2001_2.add_vpp_config()
503 p_v1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
504 Dot1Q(vlan=11, prio=1) /
505 IP(src="1.1.1.1", dst="10.0.0.2", tos=1) /
506 UDP(sport=1234, dport=1234) /
509 p_v2 = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
510 IP(src="1.1.1.1", dst="10.0.0.1", tos=1) /
511 UDP(sport=1234, dport=1234) /
514 rx = self.send_and_expect(self.pg1, p_v2 * 65, self.pg0)
517 self.assertEqual(p[Dot1Q].prio, 6)
519 rx = self.send_and_expect(self.pg0, p_v1 * 65, self.pg1)
522 self.assertEqual(p[IP].tos, 254)
524 p_v1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
525 Dot1Q(vlan=11, prio=2) /
526 IPv6(src="2001::1", dst="2001::2", tc=1) /
527 UDP(sport=1234, dport=1234) /
530 p_v2 = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
531 IPv6(src="3001::1", dst="2001::1", tc=1) /
532 UDP(sport=1234, dport=1234) /
535 rx = self.send_and_expect(self.pg1, p_v2 * 65, self.pg0)
538 self.assertEqual(p[Dot1Q].prio, 6)
540 rx = self.send_and_expect(self.pg0, p_v1 * 65, self.pg1)
543 self.assertEqual(p[IPv6].tc, 253)
548 sub_if.unconfig_ip4()
549 sub_if.unconfig_ip6()
551 self.vapi.qos_record_enable_disable(sub_if.sw_if_index,
554 self.vapi.qos_mark_enable_disable(sub_if.sw_if_index,
558 self.vapi.qos_record_enable_disable(self.pg1.sw_if_index,
561 self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
567 if __name__ == '__main__':
568 unittest.main(testRunner=VppTestRunner)