5 from framework import VppTestCase, VppTestRunner
6 from vpp_papi_provider import QOS_SOURCE
7 from vpp_sub_interface import VppDot1QSubint
8 from vpp_ip import DpoProto
9 from vpp_ip_route import VppIpRoute, VppRoutePath, VppMplsRoute, \
10 VppMplsLabel, VppMplsTable
13 from scapy.packet import Raw
14 from scapy.layers.l2 import Ether, Dot1Q
15 from scapy.layers.inet import IP, UDP
16 from scapy.layers.inet6 import IPv6
17 from scapy.contrib.mpls import MPLS
20 class TestQOS(VppTestCase):
24 super(TestQOS, self).setUp()
26 self.create_pg_interfaces(range(5))
28 tbl = VppMplsTable(self, 0)
31 for i in self.pg_interfaces:
40 for i in self.pg_interfaces:
45 super(TestQOS, self).tearDown()
47 def test_qos_ip(self):
48 """ QoS Mark/Record IP """
51 # for table 1 map the n=0xff possible values of input QoS mark,
54 output = [scapy.compat.chb(0)] * 256
55 for i in range(0, 255):
56 output[i] = scapy.compat.chb(255 - i)
58 rows = [{'outputs': os},
63 self.vapi.qos_egress_map_update(1, rows)
66 # For table 2 (and up) use the value n for everything
68 output = [scapy.compat.chb(2)] * 256
70 rows = [{'outputs': os},
75 self.vapi.qos_egress_map_update(2, rows)
77 output = [scapy.compat.chb(3)] * 256
79 rows = [{'outputs': os},
84 self.vapi.qos_egress_map_update(3, rows)
86 output = [scapy.compat.chb(4)] * 256
88 rows = [{'outputs': os},
92 self.vapi.qos_egress_map_update(4, rows)
93 self.vapi.qos_egress_map_update(5, rows)
94 self.vapi.qos_egress_map_update(6, rows)
95 self.vapi.qos_egress_map_update(7, rows)
97 self.logger.info(self.vapi.cli("sh qos eg map"))
100 # Bind interface pgN to table n
102 self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
106 self.vapi.qos_mark_enable_disable(self.pg2.sw_if_index,
110 self.vapi.qos_mark_enable_disable(self.pg3.sw_if_index,
114 self.vapi.qos_mark_enable_disable(self.pg4.sw_if_index,
120 # packets ingress on Pg0
122 p_v4 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
123 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, tos=1) /
124 UDP(sport=1234, dport=1234) /
125 Raw(scapy.compat.chb(100) * 65))
126 p_v6 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
127 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6,
129 UDP(sport=1234, dport=1234) /
130 Raw(scapy.compat.chb(100) * 65))
133 # Since we have not yet enabled the recording of the input QoS
134 # from the input iP header, the egress packet's ToS will be unchanged
136 rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
138 self.assertEqual(p[IP].tos, 1)
139 rx = self.send_and_expect(self.pg0, p_v6 * 65, self.pg1)
141 self.assertEqual(p[IPv6].tc, 1)
144 # Enable QoS recording on IP input for pg0
146 self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
151 # send the same packets, this time expect the input TOS of 1
152 # to be mapped to pg1's egress value of 254
154 rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
156 self.assertEqual(p[IP].tos, 254)
157 rx = self.send_and_expect(self.pg0, p_v6 * 65, self.pg1)
159 self.assertEqual(p[IPv6].tc, 254)
162 # different input ToS to test the mapping
165 rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
167 self.assertEqual(p[IP].tos, 128)
169 rx = self.send_and_expect(self.pg0, p_v6 * 65, self.pg1)
171 self.assertEqual(p[IPv6].tc, 128)
174 rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
176 self.assertEqual(p[IP].tos, 1)
178 rx = self.send_and_expect(self.pg0, p_v6 * 65, self.pg1)
180 self.assertEqual(p[IPv6].tc, 1)
183 # send packets out the other interfaces to test the maps are
186 p_v4[IP].dst = self.pg2.remote_ip4
187 rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg2)
189 self.assertEqual(p[IP].tos, 2)
191 p_v4[IP].dst = self.pg3.remote_ip4
192 rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg3)
194 self.assertEqual(p[IP].tos, 3)
196 p_v6[IPv6].dst = self.pg3.remote_ip6
197 rx = self.send_and_expect(self.pg0, p_v6 * 65, self.pg3)
199 self.assertEqual(p[IPv6].tc, 3)
202 # remove the map on pg2 and pg3, now expect an unchanged IP tos
204 self.vapi.qos_mark_enable_disable(self.pg2.sw_if_index,
208 self.vapi.qos_mark_enable_disable(self.pg3.sw_if_index,
212 self.logger.info(self.vapi.cli("sh int feat pg2"))
214 p_v4[IP].dst = self.pg2.remote_ip4
215 rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg2)
217 self.assertEqual(p[IP].tos, 254)
219 p_v4[IP].dst = self.pg3.remote_ip4
220 rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg3)
222 self.assertEqual(p[IP].tos, 254)
225 # still mapping out of pg1
227 p_v4[IP].dst = self.pg1.remote_ip4
228 rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
230 self.assertEqual(p[IP].tos, 1)
233 # disable the input recording on pg0
235 self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
240 # back to an unchanged TOS value
242 rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
244 self.assertEqual(p[IP].tos, 254)
247 # disable the egress map on pg1 and pg4
249 self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
253 self.vapi.qos_mark_enable_disable(self.pg4.sw_if_index,
259 # unchanged Tos on pg1
261 rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
263 self.assertEqual(p[IP].tos, 254)
268 self.vapi.qos_egress_map_delete(1)
269 self.vapi.qos_egress_map_delete(4)
270 self.vapi.qos_egress_map_delete(2)
271 self.vapi.qos_egress_map_delete(3)
272 self.vapi.qos_egress_map_delete(5)
273 self.vapi.qos_egress_map_delete(6)
274 self.vapi.qos_egress_map_delete(7)
276 def test_qos_mpls(self):
277 """ QoS Mark/Record MPLS """
280 # 255 QoS for all input values
286 output = [scapy.compat.chb(from_ext)] * 256
287 os1 = b''.join(output)
288 output = [scapy.compat.chb(from_vlan)] * 256
289 os2 = b''.join(output)
290 output = [scapy.compat.chb(from_mpls)] * 256
291 os3 = b''.join(output)
292 output = [scapy.compat.chb(from_ip)] * 256
293 os4 = b''.join(output)
294 rows = [{'outputs': os1},
299 self.vapi.qos_egress_map_update(1, rows)
302 # a route with 1 MPLS label
304 route_10_0_0_1 = VppIpRoute(self, "10.0.0.1", 32,
305 [VppRoutePath(self.pg1.remote_ip4,
306 self.pg1.sw_if_index,
308 route_10_0_0_1.add_vpp_config()
311 # a route with 3 MPLS labels
313 route_10_0_0_3 = VppIpRoute(self, "10.0.0.3", 32,
314 [VppRoutePath(self.pg1.remote_ip4,
315 self.pg1.sw_if_index,
316 labels=[63, 33, 34])])
317 route_10_0_0_3.add_vpp_config()
320 # enable IP QoS recording on the input Pg0 and MPLS egress marking
323 self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
326 self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
332 # packet that will get one label added and 3 labels added resp.
334 p_1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
335 IP(src=self.pg0.remote_ip4, dst="10.0.0.1", tos=1) /
336 UDP(sport=1234, dport=1234) /
337 Raw(scapy.compat.chb(100) * 65))
338 p_3 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
339 IP(src=self.pg0.remote_ip4, dst="10.0.0.3", tos=1) /
340 UDP(sport=1234, dport=1234) /
341 Raw(scapy.compat.chb(100) * 65))
343 rx = self.send_and_expect(self.pg0, p_1 * 65, self.pg1)
346 # only 3 bits of ToS value in MPLS make sure tos is correct
347 # and the label and EOS bit have not been corrupted
350 self.assertEqual(p[MPLS].cos, from_ip)
351 self.assertEqual(p[MPLS].label, 32)
352 self.assertEqual(p[MPLS].s, 1)
353 rx = self.send_and_expect(self.pg0, p_3 * 65, self.pg1)
355 self.assertEqual(p[MPLS].cos, from_ip)
356 self.assertEqual(p[MPLS].label, 63)
357 self.assertEqual(p[MPLS].s, 0)
359 self.assertEqual(h[MPLS].cos, from_ip)
360 self.assertEqual(h[MPLS].label, 33)
361 self.assertEqual(h[MPLS].s, 0)
363 self.assertEqual(h[MPLS].cos, from_ip)
364 self.assertEqual(h[MPLS].label, 34)
365 self.assertEqual(h[MPLS].s, 1)
368 # enable MPLS QoS recording on the input Pg0 and IP egress marking
371 self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
374 self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
380 # MPLS x-connect - COS according to pg1 map
382 route_32_eos = VppMplsRoute(self, 32, 1,
383 [VppRoutePath(self.pg1.remote_ip4,
384 self.pg1.sw_if_index,
385 labels=[VppMplsLabel(33)])])
386 route_32_eos.add_vpp_config()
388 p_m1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
389 MPLS(label=32, cos=3, ttl=2) /
390 IP(src=self.pg0.remote_ip4, dst="10.0.0.1", tos=1) /
391 UDP(sport=1234, dport=1234) /
392 Raw(scapy.compat.chb(100) * 65))
394 rx = self.send_and_expect(self.pg0, p_m1 * 65, self.pg1)
396 self.assertEqual(p[MPLS].cos, from_mpls)
397 self.assertEqual(p[MPLS].label, 33)
398 self.assertEqual(p[MPLS].s, 1)
401 # MPLS deag - COS is copied from MPLS to IP
403 route_33_eos = VppMplsRoute(self, 33, 1,
404 [VppRoutePath("0.0.0.0",
407 route_33_eos.add_vpp_config()
409 route_10_0_0_4 = VppIpRoute(self, "10.0.0.4", 32,
410 [VppRoutePath(self.pg1.remote_ip4,
411 self.pg1.sw_if_index)])
412 route_10_0_0_4.add_vpp_config()
414 p_m2 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
415 MPLS(label=33, ttl=2, cos=3) /
416 IP(src=self.pg0.remote_ip4, dst="10.0.0.4", tos=1) /
417 UDP(sport=1234, dport=1234) /
418 Raw(scapy.compat.chb(100) * 65))
420 rx = self.send_and_expect(self.pg0, p_m2 * 65, self.pg1)
423 self.assertEqual(p[IP].tos, from_mpls)
428 self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
431 self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
435 self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
438 self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
442 self.vapi.qos_egress_map_delete(1)
444 def test_qos_vlan(self):
445 """QoS mark/record VLAN """
448 # QoS for all input values
450 output = [scapy.compat.chb(0)] * 256
451 for i in range(0, 255):
452 output[i] = scapy.compat.chb(255 - i)
453 os = b''.join(output)
454 rows = [{'outputs': os},
459 self.vapi.qos_egress_map_update(1, rows)
461 sub_if = VppDot1QSubint(self, self.pg0, 11)
470 # enable VLAN QoS recording/marking on the input Pg0 subinterface and
472 self.vapi.qos_record_enable_disable(sub_if.sw_if_index,
475 self.vapi.qos_mark_enable_disable(sub_if.sw_if_index,
481 # IP marking/recording on pg1
483 self.vapi.qos_record_enable_disable(self.pg1.sw_if_index,
486 self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
492 # a routes to/from sub-interface
494 route_10_0_0_1 = VppIpRoute(self, "10.0.0.1", 32,
495 [VppRoutePath(sub_if.remote_ip4,
496 sub_if.sw_if_index)])
497 route_10_0_0_1.add_vpp_config()
498 route_10_0_0_2 = VppIpRoute(self, "10.0.0.2", 32,
499 [VppRoutePath(self.pg1.remote_ip4,
500 self.pg1.sw_if_index)])
501 route_10_0_0_2.add_vpp_config()
502 route_2001_1 = VppIpRoute(self, "2001::1", 128,
503 [VppRoutePath(sub_if.remote_ip6,
505 proto=DpoProto.DPO_PROTO_IP6)],
507 route_2001_1.add_vpp_config()
508 route_2001_2 = VppIpRoute(self, "2001::2", 128,
509 [VppRoutePath(self.pg1.remote_ip6,
510 self.pg1.sw_if_index,
511 proto=DpoProto.DPO_PROTO_IP6)],
513 route_2001_2.add_vpp_config()
515 p_v1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
516 Dot1Q(vlan=11, prio=1) /
517 IP(src="1.1.1.1", dst="10.0.0.2", tos=1) /
518 UDP(sport=1234, dport=1234) /
519 Raw(scapy.compat.chb(100) * 65))
521 p_v2 = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
522 IP(src="1.1.1.1", dst="10.0.0.1", tos=1) /
523 UDP(sport=1234, dport=1234) /
524 Raw(scapy.compat.chb(100) * 65))
526 rx = self.send_and_expect(self.pg1, p_v2 * 65, self.pg0)
529 self.assertEqual(p[Dot1Q].prio, 6)
531 rx = self.send_and_expect(self.pg0, p_v1 * 65, self.pg1)
534 self.assertEqual(p[IP].tos, 254)
536 p_v1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
537 Dot1Q(vlan=11, prio=2) /
538 IPv6(src="2001::1", dst="2001::2", tc=1) /
539 UDP(sport=1234, dport=1234) /
540 Raw(scapy.compat.chb(100) * 65))
542 p_v2 = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
543 IPv6(src="3001::1", dst="2001::1", tc=1) /
544 UDP(sport=1234, dport=1234) /
545 Raw(scapy.compat.chb(100) * 65))
547 rx = self.send_and_expect(self.pg1, p_v2 * 65, self.pg0)
550 self.assertEqual(p[Dot1Q].prio, 6)
552 rx = self.send_and_expect(self.pg0, p_v1 * 65, self.pg1)
555 self.assertEqual(p[IPv6].tc, 253)
560 sub_if.unconfig_ip4()
561 sub_if.unconfig_ip6()
563 self.vapi.qos_record_enable_disable(sub_if.sw_if_index,
566 self.vapi.qos_mark_enable_disable(sub_if.sw_if_index,
570 self.vapi.qos_record_enable_disable(self.pg1.sw_if_index,
573 self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
579 if __name__ == '__main__':
580 unittest.main(testRunner=VppTestRunner)