5 from framework import VppTestCase, VppTestRunner
6 from vpp_papi_provider import QOS_SOURCE
7 from vpp_sub_interface import VppDot1QSubint
8 from vpp_ip import DpoProto
9 from vpp_ip_route import VppIpRoute, VppRoutePath, VppMplsRoute, \
10 VppMplsLabel, VppMplsTable
12 from scapy.packet import Raw
13 from scapy.layers.l2 import Ether, Dot1Q
14 from scapy.layers.inet import IP, UDP
15 from scapy.layers.inet6 import IPv6
16 from scapy.contrib.mpls import MPLS
19 class TestQOS(VppTestCase):
23 super(TestQOS, self).setUp()
25 self.create_pg_interfaces(range(5))
27 tbl = VppMplsTable(self, 0)
30 for i in self.pg_interfaces:
39 for i in self.pg_interfaces:
44 super(TestQOS, self).tearDown()
46 def test_qos_ip(self):
47 """ QoS Mark/Record IP """
50 # for table 1 map the n=0xff possible values of input QoS mark,
53 output = [chr(0)] * 256
54 for i in range(0, 255):
55 output[i] = chr(255 - i)
57 rows = [{'outputs': os},
62 self.vapi.qos_egress_map_update(1, rows)
65 # For table 2 (and up) use the value n for everything
67 output = [chr(2)] * 256
69 rows = [{'outputs': os},
74 self.vapi.qos_egress_map_update(2, rows)
76 output = [chr(3)] * 256
78 rows = [{'outputs': os},
83 self.vapi.qos_egress_map_update(3, rows)
85 output = [chr(4)] * 256
87 rows = [{'outputs': os},
91 self.vapi.qos_egress_map_update(4, rows)
92 self.vapi.qos_egress_map_update(5, rows)
93 self.vapi.qos_egress_map_update(6, rows)
94 self.vapi.qos_egress_map_update(7, rows)
96 self.logger.info(self.vapi.cli("sh qos eg map"))
99 # Bind interface pgN to table n
101 self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
105 self.vapi.qos_mark_enable_disable(self.pg2.sw_if_index,
109 self.vapi.qos_mark_enable_disable(self.pg3.sw_if_index,
113 self.vapi.qos_mark_enable_disable(self.pg4.sw_if_index,
119 # packets ingress on Pg0
121 p_v4 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
122 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, tos=1) /
123 UDP(sport=1234, dport=1234) /
125 p_v6 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
126 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6,
128 UDP(sport=1234, dport=1234) /
132 # Since we have not yet enabled the recording of the input QoS
133 # from the input iP header, the egress packet's ToS will be unchanged
135 rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
137 self.assertEqual(p[IP].tos, 1)
138 rx = self.send_and_expect(self.pg0, p_v6 * 65, self.pg1)
140 self.assertEqual(p[IPv6].tc, 1)
143 # Enable QoS recrding on IP input for pg0
145 self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
150 # send the same packets, this time expect the input TOS of 1
151 # to be mapped to pg1's egress value of 254
153 rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
155 self.assertEqual(p[IP].tos, 254)
156 rx = self.send_and_expect(self.pg0, p_v6 * 65, self.pg1)
158 self.assertEqual(p[IPv6].tc, 254)
161 # different input ToS to test the mapping
164 rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
166 self.assertEqual(p[IP].tos, 128)
168 rx = self.send_and_expect(self.pg0, p_v6 * 65, self.pg1)
170 self.assertEqual(p[IPv6].tc, 128)
173 rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
175 self.assertEqual(p[IP].tos, 1)
177 rx = self.send_and_expect(self.pg0, p_v6 * 65, self.pg1)
179 self.assertEqual(p[IPv6].tc, 1)
182 # send packets out the other interfaces to test the maps are
185 p_v4[IP].dst = self.pg2.remote_ip4
186 rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg2)
188 self.assertEqual(p[IP].tos, 2)
190 p_v4[IP].dst = self.pg3.remote_ip4
191 rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg3)
193 self.assertEqual(p[IP].tos, 3)
195 p_v6[IPv6].dst = self.pg3.remote_ip6
196 rx = self.send_and_expect(self.pg0, p_v6 * 65, self.pg3)
198 self.assertEqual(p[IPv6].tc, 3)
201 # remove the map on pg2 and pg3, now expect an unchanged IP tos
203 self.vapi.qos_mark_enable_disable(self.pg2.sw_if_index,
207 self.vapi.qos_mark_enable_disable(self.pg3.sw_if_index,
211 self.logger.info(self.vapi.cli("sh int feat pg2"))
213 p_v4[IP].dst = self.pg2.remote_ip4
214 rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg2)
216 self.assertEqual(p[IP].tos, 254)
218 p_v4[IP].dst = self.pg3.remote_ip4
219 rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg3)
221 self.assertEqual(p[IP].tos, 254)
224 # still mapping out of pg1
226 p_v4[IP].dst = self.pg1.remote_ip4
227 rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
229 self.assertEqual(p[IP].tos, 1)
232 # disable the input recording on pg0
234 self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
239 # back to an unchanged TOS value
241 rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
243 self.assertEqual(p[IP].tos, 254)
246 # disable the egress map on pg1 and pg4
248 self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
252 self.vapi.qos_mark_enable_disable(self.pg4.sw_if_index,
258 # unchanged Tos on pg1
260 rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
262 self.assertEqual(p[IP].tos, 254)
267 self.vapi.qos_egress_map_delete(1)
268 self.vapi.qos_egress_map_delete(4)
269 self.vapi.qos_egress_map_delete(2)
270 self.vapi.qos_egress_map_delete(3)
271 self.vapi.qos_egress_map_delete(5)
272 self.vapi.qos_egress_map_delete(6)
273 self.vapi.qos_egress_map_delete(7)
275 def test_qos_mpls(self):
276 """ QoS Mark/Record MPLS """
279 # 255 QoS for all input values
285 output = [chr(from_ext)] * 256
286 os1 = ''.join(output)
287 output = [chr(from_vlan)] * 256
288 os2 = ''.join(output)
289 output = [chr(from_mpls)] * 256
290 os3 = ''.join(output)
291 output = [chr(from_ip)] * 256
292 os4 = ''.join(output)
293 rows = [{'outputs': os1},
298 self.vapi.qos_egress_map_update(1, rows)
301 # a route with 1 MPLS label
303 route_10_0_0_1 = VppIpRoute(self, "10.0.0.1", 32,
304 [VppRoutePath(self.pg1.remote_ip4,
305 self.pg1.sw_if_index,
307 route_10_0_0_1.add_vpp_config()
310 # a route with 3 MPLS labels
312 route_10_0_0_3 = VppIpRoute(self, "10.0.0.3", 32,
313 [VppRoutePath(self.pg1.remote_ip4,
314 self.pg1.sw_if_index,
315 labels=[63, 33, 34])])
316 route_10_0_0_3.add_vpp_config()
319 # enable IP QoS recording on the input Pg0 and MPLS egress marking
322 self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
325 self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
331 # packet that will get one label added and 3 labels added resp.
333 p_1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
334 IP(src=self.pg0.remote_ip4, dst="10.0.0.1", tos=1) /
335 UDP(sport=1234, dport=1234) /
337 p_3 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
338 IP(src=self.pg0.remote_ip4, dst="10.0.0.3", tos=1) /
339 UDP(sport=1234, dport=1234) /
342 rx = self.send_and_expect(self.pg0, p_1 * 65, self.pg1)
345 # only 3 bits of ToS value in MPLS make sure tos is correct
346 # and the label and EOS bit have not been corrupted
349 self.assertEqual(p[MPLS].cos, from_ip)
350 self.assertEqual(p[MPLS].label, 32)
351 self.assertEqual(p[MPLS].s, 1)
352 rx = self.send_and_expect(self.pg0, p_3 * 65, self.pg1)
354 self.assertEqual(p[MPLS].cos, from_ip)
355 self.assertEqual(p[MPLS].label, 63)
356 self.assertEqual(p[MPLS].s, 0)
358 self.assertEqual(h[MPLS].cos, from_ip)
359 self.assertEqual(h[MPLS].label, 33)
360 self.assertEqual(h[MPLS].s, 0)
362 self.assertEqual(h[MPLS].cos, from_ip)
363 self.assertEqual(h[MPLS].label, 34)
364 self.assertEqual(h[MPLS].s, 1)
367 # enable MPLS QoS recording on the input Pg0 and IP egress marking
370 self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
373 self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
379 # MPLS x-connect - COS according to pg1 map
381 route_32_eos = VppMplsRoute(self, 32, 1,
382 [VppRoutePath(self.pg1.remote_ip4,
383 self.pg1.sw_if_index,
384 labels=[VppMplsLabel(33)])])
385 route_32_eos.add_vpp_config()
387 p_m1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
388 MPLS(label=32, cos=3, ttl=2) /
389 IP(src=self.pg0.remote_ip4, dst="10.0.0.1", tos=1) /
390 UDP(sport=1234, dport=1234) /
393 rx = self.send_and_expect(self.pg0, p_m1 * 65, self.pg1)
395 self.assertEqual(p[MPLS].cos, from_mpls)
396 self.assertEqual(p[MPLS].label, 33)
397 self.assertEqual(p[MPLS].s, 1)
400 # MPLS deag - COS is copied from MPLS to IP
402 route_33_eos = VppMplsRoute(self, 33, 1,
403 [VppRoutePath("0.0.0.0",
406 route_33_eos.add_vpp_config()
408 route_10_0_0_4 = VppIpRoute(self, "10.0.0.4", 32,
409 [VppRoutePath(self.pg1.remote_ip4,
410 self.pg1.sw_if_index)])
411 route_10_0_0_4.add_vpp_config()
413 p_m2 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
414 MPLS(label=33, ttl=2, cos=3) /
415 IP(src=self.pg0.remote_ip4, dst="10.0.0.4", tos=1) /
416 UDP(sport=1234, dport=1234) /
419 rx = self.send_and_expect(self.pg0, p_m2 * 65, self.pg1)
422 self.assertEqual(p[IP].tos, from_mpls)
427 self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
430 self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
434 self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
437 self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
441 self.vapi.qos_egress_map_delete(1)
443 def test_qos_vlan(self):
444 """QoS mark/record VLAN """
447 # QoS for all input values
449 output = [chr(0)] * 256
450 for i in range(0, 255):
451 output[i] = chr(255 - i)
453 rows = [{'outputs': os},
458 self.vapi.qos_egress_map_update(1, rows)
460 sub_if = VppDot1QSubint(self, self.pg0, 11)
469 # enable VLAN QoS recording/marking on the input Pg0 subinterface and
471 self.vapi.qos_record_enable_disable(sub_if.sw_if_index,
474 self.vapi.qos_mark_enable_disable(sub_if.sw_if_index,
480 # IP marking/recording on pg1
482 self.vapi.qos_record_enable_disable(self.pg1.sw_if_index,
485 self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
491 # a routes to/from sub-interface
493 route_10_0_0_1 = VppIpRoute(self, "10.0.0.1", 32,
494 [VppRoutePath(sub_if.remote_ip4,
495 sub_if.sw_if_index)])
496 route_10_0_0_1.add_vpp_config()
497 route_10_0_0_2 = VppIpRoute(self, "10.0.0.2", 32,
498 [VppRoutePath(self.pg1.remote_ip4,
499 self.pg1.sw_if_index)])
500 route_10_0_0_2.add_vpp_config()
501 route_2001_1 = VppIpRoute(self, "2001::1", 128,
502 [VppRoutePath(sub_if.remote_ip6,
504 proto=DpoProto.DPO_PROTO_IP6)],
506 route_2001_1.add_vpp_config()
507 route_2001_2 = VppIpRoute(self, "2001::2", 128,
508 [VppRoutePath(self.pg1.remote_ip6,
509 self.pg1.sw_if_index,
510 proto=DpoProto.DPO_PROTO_IP6)],
512 route_2001_2.add_vpp_config()
514 p_v1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
515 Dot1Q(vlan=11, prio=1) /
516 IP(src="1.1.1.1", dst="10.0.0.2", tos=1) /
517 UDP(sport=1234, dport=1234) /
520 p_v2 = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
521 IP(src="1.1.1.1", dst="10.0.0.1", tos=1) /
522 UDP(sport=1234, dport=1234) /
525 rx = self.send_and_expect(self.pg1, p_v2 * 65, self.pg0)
528 self.assertEqual(p[Dot1Q].prio, 6)
530 rx = self.send_and_expect(self.pg0, p_v1 * 65, self.pg1)
533 self.assertEqual(p[IP].tos, 254)
535 p_v1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
536 Dot1Q(vlan=11, prio=2) /
537 IPv6(src="2001::1", dst="2001::2", tc=1) /
538 UDP(sport=1234, dport=1234) /
541 p_v2 = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
542 IPv6(src="3001::1", dst="2001::1", tc=1) /
543 UDP(sport=1234, dport=1234) /
546 rx = self.send_and_expect(self.pg1, p_v2 * 65, self.pg0)
549 self.assertEqual(p[Dot1Q].prio, 6)
551 rx = self.send_and_expect(self.pg0, p_v1 * 65, self.pg1)
554 self.assertEqual(p[IPv6].tc, 253)
559 sub_if.unconfig_ip4()
560 sub_if.unconfig_ip6()
562 self.vapi.qos_record_enable_disable(sub_if.sw_if_index,
565 self.vapi.qos_mark_enable_disable(sub_if.sw_if_index,
569 self.vapi.qos_record_enable_disable(self.pg1.sw_if_index,
572 self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
578 if __name__ == '__main__':
579 unittest.main(testRunner=VppTestRunner)