5 from framework import VppTestCase, VppTestRunner
6 from vpp_sub_interface import VppDot1QSubint
7 from vpp_ip import DpoProto
8 from vpp_ip_route import VppIpRoute, VppRoutePath, VppMplsRoute, \
9 VppMplsLabel, VppMplsTable, FibPathProto
12 from scapy.packet import Raw
13 from scapy.layers.l2 import Ether, Dot1Q
14 from scapy.layers.inet import IP, UDP
15 from scapy.layers.inet6 import IPv6
16 from scapy.contrib.mpls import MPLS
17 from vpp_papi import VppEnum
18 from vpp_qos import VppQosRecord, VppQosEgressMap, VppQosMark, VppQosStore
23 class TestQOS(VppTestCase):
26 # Note: Since the enums aren't created dynamically until after
27 # the papi client attaches to VPP, we put it in a property to
28 # ensure it is the value at runtime, not at module load time.
31 return VppEnum.vl_api_qos_source_t
35 super(TestQOS, cls).setUpClass()
38 def tearDownClass(cls):
39 super(TestQOS, cls).tearDownClass()
42 super(TestQOS, self).setUp()
44 self.create_pg_interfaces(range(5))
46 tbl = VppMplsTable(self, 0)
49 for i in self.pg_interfaces:
58 for i in self.pg_interfaces:
63 super(TestQOS, self).tearDown()
65 def test_qos_ip(self):
66 """ QoS Mark/Record/Store IP """
69 # for table 1 map the n=0xff possible values of input QoS mark,
72 output = [scapy.compat.chb(0)] * 256
73 for i in range(0, 255):
74 output[i] = scapy.compat.chb(255 - i)
76 rows = [{'outputs': os},
81 qem1 = VppQosEgressMap(self, 1, rows).add_vpp_config()
84 # For table 2 (and up) use the value n for everything
86 output = [scapy.compat.chb(2)] * 256
88 rows = [{'outputs': os},
93 qem2 = VppQosEgressMap(self, 2, rows).add_vpp_config()
95 output = [scapy.compat.chb(3)] * 256
97 rows = [{'outputs': os},
102 qem3 = VppQosEgressMap(self, 3, rows).add_vpp_config()
104 output = [scapy.compat.chb(4)] * 256
105 os = b''.join(output)
106 rows = [{'outputs': os},
111 qem4 = VppQosEgressMap(self, 4, rows).add_vpp_config()
112 qem5 = VppQosEgressMap(self, 5, rows).add_vpp_config()
113 qem6 = VppQosEgressMap(self, 6, rows).add_vpp_config()
114 qem7 = VppQosEgressMap(self, 7, rows).add_vpp_config()
116 self.assertTrue(qem7.query_vpp_config())
117 self.logger.info(self.vapi.cli("sh qos eg map"))
120 # Bind interface pgN to table n
122 qm1 = VppQosMark(self, self.pg1, qem1,
123 self.QOS_SOURCE.QOS_API_SOURCE_IP).add_vpp_config()
124 qm2 = VppQosMark(self, self.pg2, qem2,
125 self.QOS_SOURCE.QOS_API_SOURCE_IP).add_vpp_config()
126 qm3 = VppQosMark(self, self.pg3, qem3,
127 self.QOS_SOURCE.QOS_API_SOURCE_IP).add_vpp_config()
128 qm4 = VppQosMark(self, self.pg4, qem4,
129 self.QOS_SOURCE.QOS_API_SOURCE_IP).add_vpp_config()
130 self.assertTrue(qm3.query_vpp_config())
132 self.logger.info(self.vapi.cli("sh qos mark"))
135 # packets ingress on Pg0
137 p_v4 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
138 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, tos=1) /
139 UDP(sport=1234, dport=1234) /
140 Raw(scapy.compat.chb(100) * NUM_PKTS))
141 p_v6 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
142 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6,
144 UDP(sport=1234, dport=1234) /
145 Raw(scapy.compat.chb(100) * NUM_PKTS))
148 # Since we have not yet enabled the recording of the input QoS
149 # from the input iP header, the egress packet's ToS will be unchanged
151 rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg1)
153 self.assertEqual(p[IP].tos, 1)
154 rx = self.send_and_expect(self.pg0, p_v6 * NUM_PKTS, self.pg1)
156 self.assertEqual(p[IPv6].tc, 1)
159 # Enable QoS recording on IP input for pg0
161 qr1 = VppQosRecord(self, self.pg0,
162 self.QOS_SOURCE.QOS_API_SOURCE_IP)
164 self.logger.info(self.vapi.cli("sh qos record"))
167 # send the same packets, this time expect the input TOS of 1
168 # to be mapped to pg1's egress value of 254
170 rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg1)
172 self.assertEqual(p[IP].tos, 254)
173 rx = self.send_and_expect(self.pg0, p_v6 * NUM_PKTS, self.pg1)
175 self.assertEqual(p[IPv6].tc, 254)
178 # different input ToS to test the mapping
181 rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg1)
183 self.assertEqual(p[IP].tos, 128)
185 rx = self.send_and_expect(self.pg0, p_v6 * NUM_PKTS, self.pg1)
187 self.assertEqual(p[IPv6].tc, 128)
190 rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg1)
192 self.assertEqual(p[IP].tos, 1)
194 rx = self.send_and_expect(self.pg0, p_v6 * NUM_PKTS, self.pg1)
196 self.assertEqual(p[IPv6].tc, 1)
199 # send packets out the other interfaces to test the maps are
202 p_v4[IP].dst = self.pg2.remote_ip4
203 rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg2)
205 self.assertEqual(p[IP].tos, 2)
207 p_v4[IP].dst = self.pg3.remote_ip4
208 rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg3)
210 self.assertEqual(p[IP].tos, 3)
212 p_v6[IPv6].dst = self.pg3.remote_ip6
213 rx = self.send_and_expect(self.pg0, p_v6 * NUM_PKTS, self.pg3)
215 self.assertEqual(p[IPv6].tc, 3)
218 # remove the map on pg2 and pg3, now expect an unchanged IP tos
220 qm2.remove_vpp_config()
221 qm3.remove_vpp_config()
222 self.logger.info(self.vapi.cli("sh qos mark"))
224 self.assertFalse(qm3.query_vpp_config())
225 self.logger.info(self.vapi.cli("sh int feat pg2"))
227 p_v4[IP].dst = self.pg2.remote_ip4
228 rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg2)
230 self.assertEqual(p[IP].tos, 254)
232 p_v4[IP].dst = self.pg3.remote_ip4
233 rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg3)
235 self.assertEqual(p[IP].tos, 254)
238 # still mapping out of pg1
240 p_v4[IP].dst = self.pg1.remote_ip4
241 rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg1)
243 self.assertEqual(p[IP].tos, 1)
246 # disable the input recording on pg0
248 self.assertTrue(qr1.query_vpp_config())
249 qr1.remove_vpp_config()
252 # back to an unchanged TOS value
254 rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg1)
256 self.assertEqual(p[IP].tos, 254)
259 # enable QoS stroe instead of record
261 qst1 = VppQosStore(self, self.pg0,
262 self.QOS_SOURCE.QOS_API_SOURCE_IP,
264 self.logger.info(self.vapi.cli("sh qos store"))
266 p_v4[IP].dst = self.pg1.remote_ip4
267 rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg1)
269 self.assertEqual(p[IP].tos, 250)
272 # disable the input storing on pg0
274 self.assertTrue(qst1.query_vpp_config())
275 qst1.remove_vpp_config()
278 # back to an unchanged TOS value
280 rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg1)
282 self.assertEqual(p[IP].tos, 254)
285 # disable the egress map on pg1 and pg4
287 qm1.remove_vpp_config()
288 qm4.remove_vpp_config()
291 # unchanged Tos on pg1
293 rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg1)
295 self.assertEqual(p[IP].tos, 254)
297 def test_qos_mpls(self):
298 """ QoS Mark/Record MPLS """
301 # 255 QoS for all input values
307 output = [scapy.compat.chb(from_ext)] * 256
308 os1 = b''.join(output)
309 output = [scapy.compat.chb(from_vlan)] * 256
310 os2 = b''.join(output)
311 output = [scapy.compat.chb(from_mpls)] * 256
312 os3 = b''.join(output)
313 output = [scapy.compat.chb(from_ip)] * 256
314 os4 = b''.join(output)
315 rows = [{'outputs': os1},
320 qem1 = VppQosEgressMap(self, 1, rows).add_vpp_config()
323 # a route with 1 MPLS label
325 route_10_0_0_1 = VppIpRoute(self, "10.0.0.1", 32,
326 [VppRoutePath(self.pg1.remote_ip4,
327 self.pg1.sw_if_index,
329 route_10_0_0_1.add_vpp_config()
332 # a route with 3 MPLS labels
334 route_10_0_0_3 = VppIpRoute(self, "10.0.0.3", 32,
335 [VppRoutePath(self.pg1.remote_ip4,
336 self.pg1.sw_if_index,
337 labels=[63, 33, 34])])
338 route_10_0_0_3.add_vpp_config()
341 # enable IP QoS recording on the input Pg0 and MPLS egress marking
344 qr1 = VppQosRecord(self, self.pg0,
345 self.QOS_SOURCE.QOS_API_SOURCE_IP).add_vpp_config()
346 qm1 = VppQosMark(self, self.pg1, qem1,
347 self.QOS_SOURCE.QOS_API_SOURCE_MPLS).add_vpp_config()
350 # packet that will get one label added and 3 labels added resp.
352 p_1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
353 IP(src=self.pg0.remote_ip4, dst="10.0.0.1", tos=1) /
354 UDP(sport=1234, dport=1234) /
355 Raw(scapy.compat.chb(100) * NUM_PKTS))
356 p_3 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
357 IP(src=self.pg0.remote_ip4, dst="10.0.0.3", tos=1) /
358 UDP(sport=1234, dport=1234) /
359 Raw(scapy.compat.chb(100) * NUM_PKTS))
361 rx = self.send_and_expect(self.pg0, p_1 * NUM_PKTS, self.pg1)
364 # only 3 bits of ToS value in MPLS make sure tos is correct
365 # and the label and EOS bit have not been corrupted
368 self.assertEqual(p[MPLS].cos, from_ip)
369 self.assertEqual(p[MPLS].label, 32)
370 self.assertEqual(p[MPLS].s, 1)
371 rx = self.send_and_expect(self.pg0, p_3 * NUM_PKTS, self.pg1)
373 self.assertEqual(p[MPLS].cos, from_ip)
374 self.assertEqual(p[MPLS].label, 63)
375 self.assertEqual(p[MPLS].s, 0)
377 self.assertEqual(h[MPLS].cos, from_ip)
378 self.assertEqual(h[MPLS].label, 33)
379 self.assertEqual(h[MPLS].s, 0)
381 self.assertEqual(h[MPLS].cos, from_ip)
382 self.assertEqual(h[MPLS].label, 34)
383 self.assertEqual(h[MPLS].s, 1)
386 # enable MPLS QoS recording on the input Pg0 and IP egress marking
391 self.QOS_SOURCE.QOS_API_SOURCE_MPLS).add_vpp_config()
393 self, self.pg1, qem1,
394 self.QOS_SOURCE.QOS_API_SOURCE_IP).add_vpp_config()
397 # MPLS x-connect - COS according to pg1 map
399 route_32_eos = VppMplsRoute(self, 32, 1,
400 [VppRoutePath(self.pg1.remote_ip4,
401 self.pg1.sw_if_index,
402 labels=[VppMplsLabel(33)])])
403 route_32_eos.add_vpp_config()
405 p_m1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
406 MPLS(label=32, cos=3, ttl=2) /
407 IP(src=self.pg0.remote_ip4, dst="10.0.0.1", tos=1) /
408 UDP(sport=1234, dport=1234) /
409 Raw(scapy.compat.chb(100) * NUM_PKTS))
411 rx = self.send_and_expect(self.pg0, p_m1 * NUM_PKTS, self.pg1)
413 self.assertEqual(p[MPLS].cos, from_mpls)
414 self.assertEqual(p[MPLS].label, 33)
415 self.assertEqual(p[MPLS].s, 1)
418 # MPLS deag - COS is copied from MPLS to IP
420 route_33_eos = VppMplsRoute(self, 33, 1,
421 [VppRoutePath("0.0.0.0",
424 route_33_eos.add_vpp_config()
426 route_10_0_0_4 = VppIpRoute(self, "10.0.0.4", 32,
427 [VppRoutePath(self.pg1.remote_ip4,
428 self.pg1.sw_if_index)])
429 route_10_0_0_4.add_vpp_config()
431 p_m2 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
432 MPLS(label=33, ttl=2, cos=3) /
433 IP(src=self.pg0.remote_ip4, dst="10.0.0.4", tos=1) /
434 UDP(sport=1234, dport=1234) /
435 Raw(scapy.compat.chb(100) * NUM_PKTS))
437 rx = self.send_and_expect(self.pg0, p_m2 * NUM_PKTS, self.pg1)
440 self.assertEqual(p[IP].tos, from_mpls)
442 def test_qos_vlan(self):
443 """QoS mark/record VLAN """
446 # QoS for all input values
448 output = [scapy.compat.chb(0)] * 256
449 for i in range(0, 255):
450 output[i] = scapy.compat.chb(255 - i)
451 os = b''.join(output)
452 rows = [{'outputs': os},
457 qem1 = VppQosEgressMap(self, 1, rows).add_vpp_config()
459 sub_if = VppDot1QSubint(self, self.pg0, 11)
468 # enable VLAN QoS recording/marking on the input Pg0 subinterface and
472 self.QOS_SOURCE.QOS_API_SOURCE_VLAN).add_vpp_config()
475 self.QOS_SOURCE.QOS_API_SOURCE_VLAN).add_vpp_config()
478 # IP marking/recording on pg1
480 qr_ip = VppQosRecord(
482 self.QOS_SOURCE.QOS_API_SOURCE_IP).add_vpp_config()
484 self, self.pg1, qem1,
485 self.QOS_SOURCE.QOS_API_SOURCE_IP).add_vpp_config()
488 # a routes to/from sub-interface
490 route_10_0_0_1 = VppIpRoute(self, "10.0.0.1", 32,
491 [VppRoutePath(sub_if.remote_ip4,
492 sub_if.sw_if_index)])
493 route_10_0_0_1.add_vpp_config()
494 route_10_0_0_2 = VppIpRoute(self, "10.0.0.2", 32,
495 [VppRoutePath(self.pg1.remote_ip4,
496 self.pg1.sw_if_index)])
497 route_10_0_0_2.add_vpp_config()
498 route_2001_1 = VppIpRoute(self, "2001::1", 128,
499 [VppRoutePath(sub_if.remote_ip6,
500 sub_if.sw_if_index)])
501 route_2001_1.add_vpp_config()
502 route_2001_2 = VppIpRoute(self, "2001::2", 128,
503 [VppRoutePath(self.pg1.remote_ip6,
504 self.pg1.sw_if_index)])
505 route_2001_2.add_vpp_config()
507 p_v1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
508 Dot1Q(vlan=11, prio=1) /
509 IP(src="1.1.1.1", dst="10.0.0.2", tos=1) /
510 UDP(sport=1234, dport=1234) /
511 Raw(scapy.compat.chb(100) * NUM_PKTS))
513 p_v2 = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
514 IP(src="1.1.1.1", dst="10.0.0.1", tos=1) /
515 UDP(sport=1234, dport=1234) /
516 Raw(scapy.compat.chb(100) * NUM_PKTS))
518 p_v3 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
519 Dot1Q(vlan=11, prio=1, id=1) /
520 IP(src="1.1.1.1", dst="10.0.0.2", tos=2) /
521 UDP(sport=1234, dport=1234) /
522 Raw(scapy.compat.chb(100) * NUM_PKTS))
524 rx = self.send_and_expect(self.pg1, p_v2 * NUM_PKTS, self.pg0)
527 self.assertEqual(p[Dot1Q].prio, 7)
528 self.assertEqual(p[Dot1Q].id, 0)
530 rx = self.send_and_expect(self.pg0, p_v3 * NUM_PKTS, self.pg1)
533 self.assertEqual(p[IP].tos, 252)
535 rx = self.send_and_expect(self.pg0, p_v1 * NUM_PKTS, self.pg1)
538 self.assertEqual(p[IP].tos, 253)
540 p_v1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
541 Dot1Q(vlan=11, prio=2) /
542 IPv6(src="2001::1", dst="2001::2", tc=1) /
543 UDP(sport=1234, dport=1234) /
544 Raw(scapy.compat.chb(100) * NUM_PKTS))
546 p_v2 = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
547 IPv6(src="3001::1", dst="2001::1", tc=1) /
548 UDP(sport=1234, dport=1234) /
549 Raw(scapy.compat.chb(100) * NUM_PKTS))
551 rx = self.send_and_expect(self.pg1, p_v2 * NUM_PKTS, self.pg0)
554 self.assertEqual(p[Dot1Q].prio, 7)
555 self.assertEqual(p[Dot1Q].id, 0)
557 rx = self.send_and_expect(self.pg0, p_v1 * NUM_PKTS, self.pg1)
560 self.assertEqual(p[IPv6].tc, 251)
565 sub_if.unconfig_ip4()
566 sub_if.unconfig_ip6()
569 if __name__ == '__main__':
570 unittest.main(testRunner=VppTestRunner)