5 from framework import VppTestCase, VppTestRunner
6 from vpp_sub_interface import VppDot1QSubint
7 from vpp_ip import DpoProto
8 from vpp_ip_route import VppIpRoute, VppRoutePath, VppMplsRoute, \
9 VppMplsLabel, VppMplsTable
12 from scapy.packet import Raw
13 from scapy.layers.l2 import Ether, Dot1Q
14 from scapy.layers.inet import IP, UDP
15 from scapy.layers.inet6 import IPv6
16 from scapy.contrib.mpls import MPLS
17 from vpp_papi import VppEnum
22 class TestQOS(VppTestCase):
25 # Note: Since the enums aren't created dynamically until after
26 # the papi client attaches to VPP, we put it in a property to
27 # ensure it is the value at runtime, not at module load time.
30 return VppEnum.vl_api_qos_source_t
34 super(TestQOS, cls).setUpClass()
37 def tearDownClass(cls):
38 super(TestQOS, cls).tearDownClass()
41 super(TestQOS, self).setUp()
43 self.create_pg_interfaces(range(5))
45 tbl = VppMplsTable(self, 0)
48 for i in self.pg_interfaces:
57 for i in self.pg_interfaces:
62 super(TestQOS, self).tearDown()
64 def test_qos_ip(self):
65 """ QoS Mark/Record IP """
68 # for table 1 map the n=0xff possible values of input QoS mark,
71 output = [scapy.compat.chb(0)] * 256
72 for i in range(0, 255):
73 output[i] = scapy.compat.chb(255 - i)
75 rows = [{'outputs': os},
80 self.vapi.qos_egress_map_update(1, rows)
83 # For table 2 (and up) use the value n for everything
85 output = [scapy.compat.chb(2)] * 256
87 rows = [{'outputs': os},
92 self.vapi.qos_egress_map_update(2, rows)
94 output = [scapy.compat.chb(3)] * 256
96 rows = [{'outputs': os},
101 self.vapi.qos_egress_map_update(3, rows)
103 output = [scapy.compat.chb(4)] * 256
104 os = b''.join(output)
105 rows = [{'outputs': os},
109 self.vapi.qos_egress_map_update(4, rows)
110 self.vapi.qos_egress_map_update(5, rows)
111 self.vapi.qos_egress_map_update(6, rows)
112 self.vapi.qos_egress_map_update(7, rows)
114 self.logger.info(self.vapi.cli("sh qos eg map"))
117 # Bind interface pgN to table n
119 self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
120 self.QOS_SOURCE.QOS_API_SOURCE_IP,
123 self.vapi.qos_mark_enable_disable(self.pg2.sw_if_index,
124 self.QOS_SOURCE.QOS_API_SOURCE_IP,
127 self.vapi.qos_mark_enable_disable(self.pg3.sw_if_index,
128 self.QOS_SOURCE.QOS_API_SOURCE_IP,
131 self.vapi.qos_mark_enable_disable(self.pg4.sw_if_index,
132 self.QOS_SOURCE.QOS_API_SOURCE_IP,
137 # packets ingress on Pg0
139 p_v4 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
140 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, tos=1) /
141 UDP(sport=1234, dport=1234) /
142 Raw(scapy.compat.chb(100) * NUM_PKTS))
143 p_v6 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
144 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6,
146 UDP(sport=1234, dport=1234) /
147 Raw(scapy.compat.chb(100) * NUM_PKTS))
150 # Since we have not yet enabled the recording of the input QoS
151 # from the input iP header, the egress packet's ToS will be unchanged
153 rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg1)
155 self.assertEqual(p[IP].tos, 1)
156 rx = self.send_and_expect(self.pg0, p_v6 * NUM_PKTS, self.pg1)
158 self.assertEqual(p[IPv6].tc, 1)
161 # Enable QoS recording on IP input for pg0
163 self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
164 self.QOS_SOURCE.QOS_API_SOURCE_IP,
168 # send the same packets, this time expect the input TOS of 1
169 # to be mapped to pg1's egress value of 254
171 rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg1)
173 self.assertEqual(p[IP].tos, 254)
174 rx = self.send_and_expect(self.pg0, p_v6 * NUM_PKTS, self.pg1)
176 self.assertEqual(p[IPv6].tc, 254)
179 # different input ToS to test the mapping
182 rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg1)
184 self.assertEqual(p[IP].tos, 128)
186 rx = self.send_and_expect(self.pg0, p_v6 * NUM_PKTS, self.pg1)
188 self.assertEqual(p[IPv6].tc, 128)
191 rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg1)
193 self.assertEqual(p[IP].tos, 1)
195 rx = self.send_and_expect(self.pg0, p_v6 * NUM_PKTS, self.pg1)
197 self.assertEqual(p[IPv6].tc, 1)
200 # send packets out the other interfaces to test the maps are
203 p_v4[IP].dst = self.pg2.remote_ip4
204 rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg2)
206 self.assertEqual(p[IP].tos, 2)
208 p_v4[IP].dst = self.pg3.remote_ip4
209 rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg3)
211 self.assertEqual(p[IP].tos, 3)
213 p_v6[IPv6].dst = self.pg3.remote_ip6
214 rx = self.send_and_expect(self.pg0, p_v6 * NUM_PKTS, self.pg3)
216 self.assertEqual(p[IPv6].tc, 3)
219 # remove the map on pg2 and pg3, now expect an unchanged IP tos
221 self.vapi.qos_mark_enable_disable(self.pg2.sw_if_index,
222 self.QOS_SOURCE.QOS_API_SOURCE_IP,
225 self.vapi.qos_mark_enable_disable(self.pg3.sw_if_index,
226 self.QOS_SOURCE.QOS_API_SOURCE_IP,
229 self.logger.info(self.vapi.cli("sh int feat pg2"))
231 p_v4[IP].dst = self.pg2.remote_ip4
232 rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg2)
234 self.assertEqual(p[IP].tos, 254)
236 p_v4[IP].dst = self.pg3.remote_ip4
237 rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg3)
239 self.assertEqual(p[IP].tos, 254)
242 # still mapping out of pg1
244 p_v4[IP].dst = self.pg1.remote_ip4
245 rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg1)
247 self.assertEqual(p[IP].tos, 1)
250 # disable the input recording on pg0
252 self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
253 self.QOS_SOURCE.QOS_API_SOURCE_IP,
257 # back to an unchanged TOS value
259 rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg1)
261 self.assertEqual(p[IP].tos, 254)
264 # disable the egress map on pg1 and pg4
266 self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
267 self.QOS_SOURCE.QOS_API_SOURCE_IP,
270 self.vapi.qos_mark_enable_disable(self.pg4.sw_if_index,
271 self.QOS_SOURCE.QOS_API_SOURCE_IP,
276 # unchanged Tos on pg1
278 rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg1)
280 self.assertEqual(p[IP].tos, 254)
285 self.vapi.qos_egress_map_delete(1)
286 self.vapi.qos_egress_map_delete(4)
287 self.vapi.qos_egress_map_delete(2)
288 self.vapi.qos_egress_map_delete(3)
289 self.vapi.qos_egress_map_delete(5)
290 self.vapi.qos_egress_map_delete(6)
291 self.vapi.qos_egress_map_delete(7)
293 def test_qos_mpls(self):
294 """ QoS Mark/Record MPLS """
297 # 255 QoS for all input values
303 output = [scapy.compat.chb(from_ext)] * 256
304 os1 = b''.join(output)
305 output = [scapy.compat.chb(from_vlan)] * 256
306 os2 = b''.join(output)
307 output = [scapy.compat.chb(from_mpls)] * 256
308 os3 = b''.join(output)
309 output = [scapy.compat.chb(from_ip)] * 256
310 os4 = b''.join(output)
311 rows = [{'outputs': os1},
316 self.vapi.qos_egress_map_update(1, rows)
319 # a route with 1 MPLS label
321 route_10_0_0_1 = VppIpRoute(self, "10.0.0.1", 32,
322 [VppRoutePath(self.pg1.remote_ip4,
323 self.pg1.sw_if_index,
325 route_10_0_0_1.add_vpp_config()
328 # a route with 3 MPLS labels
330 route_10_0_0_3 = VppIpRoute(self, "10.0.0.3", 32,
331 [VppRoutePath(self.pg1.remote_ip4,
332 self.pg1.sw_if_index,
333 labels=[63, 33, 34])])
334 route_10_0_0_3.add_vpp_config()
337 # enable IP QoS recording on the input Pg0 and MPLS egress marking
340 self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
341 self.QOS_SOURCE.QOS_API_SOURCE_IP,
343 self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
344 self.QOS_SOURCE.QOS_API_SOURCE_MPLS,
349 # packet that will get one label added and 3 labels added resp.
351 p_1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
352 IP(src=self.pg0.remote_ip4, dst="10.0.0.1", tos=1) /
353 UDP(sport=1234, dport=1234) /
354 Raw(scapy.compat.chb(100) * NUM_PKTS))
355 p_3 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
356 IP(src=self.pg0.remote_ip4, dst="10.0.0.3", tos=1) /
357 UDP(sport=1234, dport=1234) /
358 Raw(scapy.compat.chb(100) * NUM_PKTS))
360 rx = self.send_and_expect(self.pg0, p_1 * NUM_PKTS, self.pg1)
363 # only 3 bits of ToS value in MPLS make sure tos is correct
364 # and the label and EOS bit have not been corrupted
367 self.assertEqual(p[MPLS].cos, from_ip)
368 self.assertEqual(p[MPLS].label, 32)
369 self.assertEqual(p[MPLS].s, 1)
370 rx = self.send_and_expect(self.pg0, p_3 * NUM_PKTS, self.pg1)
372 self.assertEqual(p[MPLS].cos, from_ip)
373 self.assertEqual(p[MPLS].label, 63)
374 self.assertEqual(p[MPLS].s, 0)
376 self.assertEqual(h[MPLS].cos, from_ip)
377 self.assertEqual(h[MPLS].label, 33)
378 self.assertEqual(h[MPLS].s, 0)
380 self.assertEqual(h[MPLS].cos, from_ip)
381 self.assertEqual(h[MPLS].label, 34)
382 self.assertEqual(h[MPLS].s, 1)
385 # enable MPLS QoS recording on the input Pg0 and IP egress marking
388 self.vapi.qos_record_enable_disable(
389 self.pg0.sw_if_index,
390 self.QOS_SOURCE.QOS_API_SOURCE_MPLS,
392 self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
393 self.QOS_SOURCE.QOS_API_SOURCE_IP,
398 # MPLS x-connect - COS according to pg1 map
400 route_32_eos = VppMplsRoute(self, 32, 1,
401 [VppRoutePath(self.pg1.remote_ip4,
402 self.pg1.sw_if_index,
403 labels=[VppMplsLabel(33)])])
404 route_32_eos.add_vpp_config()
406 p_m1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
407 MPLS(label=32, cos=3, ttl=2) /
408 IP(src=self.pg0.remote_ip4, dst="10.0.0.1", tos=1) /
409 UDP(sport=1234, dport=1234) /
410 Raw(scapy.compat.chb(100) * NUM_PKTS))
412 rx = self.send_and_expect(self.pg0, p_m1 * NUM_PKTS, self.pg1)
414 self.assertEqual(p[MPLS].cos, from_mpls)
415 self.assertEqual(p[MPLS].label, 33)
416 self.assertEqual(p[MPLS].s, 1)
419 # MPLS deag - COS is copied from MPLS to IP
421 route_33_eos = VppMplsRoute(self, 33, 1,
422 [VppRoutePath("0.0.0.0",
425 route_33_eos.add_vpp_config()
427 route_10_0_0_4 = VppIpRoute(self, "10.0.0.4", 32,
428 [VppRoutePath(self.pg1.remote_ip4,
429 self.pg1.sw_if_index)])
430 route_10_0_0_4.add_vpp_config()
432 p_m2 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
433 MPLS(label=33, ttl=2, cos=3) /
434 IP(src=self.pg0.remote_ip4, dst="10.0.0.4", tos=1) /
435 UDP(sport=1234, dport=1234) /
436 Raw(scapy.compat.chb(100) * NUM_PKTS))
438 rx = self.send_and_expect(self.pg0, p_m2 * NUM_PKTS, self.pg1)
441 self.assertEqual(p[IP].tos, from_mpls)
446 self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
447 self.QOS_SOURCE.QOS_API_SOURCE_IP,
449 self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
450 self.QOS_SOURCE.QOS_API_SOURCE_MPLS,
453 self.vapi.qos_record_enable_disable(
454 self.pg0.sw_if_index,
455 self.QOS_SOURCE.QOS_API_SOURCE_MPLS,
457 self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
458 self.QOS_SOURCE.QOS_API_SOURCE_IP,
461 self.vapi.qos_egress_map_delete(1)
463 def test_qos_vlan(self):
464 """QoS mark/record VLAN """
467 # QoS for all input values
469 output = [scapy.compat.chb(0)] * 256
470 for i in range(0, 255):
471 output[i] = scapy.compat.chb(255 - i)
472 os = b''.join(output)
473 rows = [{'outputs': os},
478 self.vapi.qos_egress_map_update(1, rows)
480 sub_if = VppDot1QSubint(self, self.pg0, 11)
489 # enable VLAN QoS recording/marking on the input Pg0 subinterface and
491 self.vapi.qos_record_enable_disable(
493 self.QOS_SOURCE.QOS_API_SOURCE_VLAN,
495 self.vapi.qos_mark_enable_disable(sub_if.sw_if_index,
496 self.QOS_SOURCE.QOS_API_SOURCE_VLAN,
501 # IP marking/recording on pg1
503 self.vapi.qos_record_enable_disable(self.pg1.sw_if_index,
504 self.QOS_SOURCE.QOS_API_SOURCE_IP,
506 self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
507 self.QOS_SOURCE.QOS_API_SOURCE_IP,
512 # a routes to/from sub-interface
514 route_10_0_0_1 = VppIpRoute(self, "10.0.0.1", 32,
515 [VppRoutePath(sub_if.remote_ip4,
516 sub_if.sw_if_index)])
517 route_10_0_0_1.add_vpp_config()
518 route_10_0_0_2 = VppIpRoute(self, "10.0.0.2", 32,
519 [VppRoutePath(self.pg1.remote_ip4,
520 self.pg1.sw_if_index)])
521 route_10_0_0_2.add_vpp_config()
522 route_2001_1 = VppIpRoute(self, "2001::1", 128,
523 [VppRoutePath(sub_if.remote_ip6,
525 proto=DpoProto.DPO_PROTO_IP6)],
527 route_2001_1.add_vpp_config()
528 route_2001_2 = VppIpRoute(self, "2001::2", 128,
529 [VppRoutePath(self.pg1.remote_ip6,
530 self.pg1.sw_if_index,
531 proto=DpoProto.DPO_PROTO_IP6)],
533 route_2001_2.add_vpp_config()
535 p_v1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
536 Dot1Q(vlan=11, prio=1) /
537 IP(src="1.1.1.1", dst="10.0.0.2", tos=1) /
538 UDP(sport=1234, dport=1234) /
539 Raw(scapy.compat.chb(100) * NUM_PKTS))
541 p_v2 = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
542 IP(src="1.1.1.1", dst="10.0.0.1", tos=1) /
543 UDP(sport=1234, dport=1234) /
544 Raw(scapy.compat.chb(100) * NUM_PKTS))
546 rx = self.send_and_expect(self.pg1, p_v2 * NUM_PKTS, self.pg0)
549 self.assertEqual(p[Dot1Q].prio, 6)
551 rx = self.send_and_expect(self.pg0, p_v1 * NUM_PKTS, self.pg1)
554 self.assertEqual(p[IP].tos, 254)
556 p_v1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
557 Dot1Q(vlan=11, prio=2) /
558 IPv6(src="2001::1", dst="2001::2", tc=1) /
559 UDP(sport=1234, dport=1234) /
560 Raw(scapy.compat.chb(100) * NUM_PKTS))
562 p_v2 = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
563 IPv6(src="3001::1", dst="2001::1", tc=1) /
564 UDP(sport=1234, dport=1234) /
565 Raw(scapy.compat.chb(100) * NUM_PKTS))
567 rx = self.send_and_expect(self.pg1, p_v2 * NUM_PKTS, self.pg0)
570 self.assertEqual(p[Dot1Q].prio, 6)
572 rx = self.send_and_expect(self.pg0, p_v1 * NUM_PKTS, self.pg1)
575 self.assertEqual(p[IPv6].tc, 253)
580 sub_if.unconfig_ip4()
581 sub_if.unconfig_ip6()
583 self.vapi.qos_record_enable_disable(
585 self.QOS_SOURCE.QOS_API_SOURCE_VLAN,
587 self.vapi.qos_mark_enable_disable(sub_if.sw_if_index,
588 self.QOS_SOURCE.QOS_API_SOURCE_VLAN,
591 self.vapi.qos_record_enable_disable(self.pg1.sw_if_index,
592 self.QOS_SOURCE.QOS_API_SOURCE_IP,
594 self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
595 self.QOS_SOURCE.QOS_API_SOURCE_IP,
600 if __name__ == '__main__':
601 unittest.main(testRunner=VppTestRunner)