5 from framework import VppTestCase, VppTestRunner
6 from vpp_sub_interface import VppDot1QSubint
7 from vpp_ip import DpoProto
8 from vpp_ip_route import VppIpRoute, VppRoutePath, VppMplsRoute, \
9 VppMplsLabel, VppMplsTable, FibPathProto
12 from scapy.packet import Raw
13 from scapy.layers.l2 import Ether, Dot1Q
14 from scapy.layers.inet import IP, UDP
15 from scapy.layers.inet6 import IPv6
16 from scapy.contrib.mpls import MPLS
17 from vpp_papi import VppEnum
18 from vpp_qos import VppQosRecord, VppQosEgressMap, VppQosMark
23 class TestQOS(VppTestCase):
26 # Note: Since the enums aren't created dynamically until after
27 # the papi client attaches to VPP, we put it in a property to
28 # ensure it is the value at runtime, not at module load time.
31 return VppEnum.vl_api_qos_source_t
35 super(TestQOS, cls).setUpClass()
38 def tearDownClass(cls):
39 super(TestQOS, cls).tearDownClass()
42 super(TestQOS, self).setUp()
44 self.create_pg_interfaces(range(5))
46 tbl = VppMplsTable(self, 0)
49 for i in self.pg_interfaces:
58 for i in self.pg_interfaces:
63 super(TestQOS, self).tearDown()
65 def test_qos_ip(self):
66 """ QoS Mark/Record IP """
69 # for table 1 map the n=0xff possible values of input QoS mark,
72 output = [scapy.compat.chb(0)] * 256
73 for i in range(0, 255):
74 output[i] = scapy.compat.chb(255 - i)
76 rows = [{'outputs': os},
81 qem1 = VppQosEgressMap(self, 1, rows).add_vpp_config()
84 # For table 2 (and up) use the value n for everything
86 output = [scapy.compat.chb(2)] * 256
88 rows = [{'outputs': os},
93 qem2 = VppQosEgressMap(self, 2, rows).add_vpp_config()
95 output = [scapy.compat.chb(3)] * 256
97 rows = [{'outputs': os},
102 qem3 = VppQosEgressMap(self, 3, rows).add_vpp_config()
104 output = [scapy.compat.chb(4)] * 256
105 os = b''.join(output)
106 rows = [{'outputs': os},
111 qem4 = VppQosEgressMap(self, 4, rows).add_vpp_config()
112 qem5 = VppQosEgressMap(self, 5, rows).add_vpp_config()
113 qem6 = VppQosEgressMap(self, 6, rows).add_vpp_config()
114 qem7 = VppQosEgressMap(self, 7, rows).add_vpp_config()
116 self.assertTrue(qem7.query_vpp_config())
117 self.logger.info(self.vapi.cli("sh qos eg map"))
120 # Bind interface pgN to table n
122 qm1 = VppQosMark(self, self.pg1, qem1,
123 self.QOS_SOURCE.QOS_API_SOURCE_IP).add_vpp_config()
124 qm2 = VppQosMark(self, self.pg2, qem2,
125 self.QOS_SOURCE.QOS_API_SOURCE_IP).add_vpp_config()
126 qm3 = VppQosMark(self, self.pg3, qem3,
127 self.QOS_SOURCE.QOS_API_SOURCE_IP).add_vpp_config()
128 qm4 = VppQosMark(self, self.pg4, qem4,
129 self.QOS_SOURCE.QOS_API_SOURCE_IP).add_vpp_config()
130 self.assertTrue(qm3.query_vpp_config())
132 self.logger.info(self.vapi.cli("sh qos mark"))
135 # packets ingress on Pg0
137 p_v4 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
138 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, tos=1) /
139 UDP(sport=1234, dport=1234) /
140 Raw(scapy.compat.chb(100) * NUM_PKTS))
141 p_v6 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
142 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6,
144 UDP(sport=1234, dport=1234) /
145 Raw(scapy.compat.chb(100) * NUM_PKTS))
148 # Since we have not yet enabled the recording of the input QoS
149 # from the input iP header, the egress packet's ToS will be unchanged
151 rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg1)
153 self.assertEqual(p[IP].tos, 1)
154 rx = self.send_and_expect(self.pg0, p_v6 * NUM_PKTS, self.pg1)
156 self.assertEqual(p[IPv6].tc, 1)
159 # Enable QoS recording on IP input for pg0
161 qr1 = VppQosRecord(self, self.pg0,
162 self.QOS_SOURCE.QOS_API_SOURCE_IP)
164 self.logger.info(self.vapi.cli("sh qos record"))
167 # send the same packets, this time expect the input TOS of 1
168 # to be mapped to pg1's egress value of 254
170 rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg1)
172 self.assertEqual(p[IP].tos, 254)
173 rx = self.send_and_expect(self.pg0, p_v6 * NUM_PKTS, self.pg1)
175 self.assertEqual(p[IPv6].tc, 254)
178 # different input ToS to test the mapping
181 rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg1)
183 self.assertEqual(p[IP].tos, 128)
185 rx = self.send_and_expect(self.pg0, p_v6 * NUM_PKTS, self.pg1)
187 self.assertEqual(p[IPv6].tc, 128)
190 rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg1)
192 self.assertEqual(p[IP].tos, 1)
194 rx = self.send_and_expect(self.pg0, p_v6 * NUM_PKTS, self.pg1)
196 self.assertEqual(p[IPv6].tc, 1)
199 # send packets out the other interfaces to test the maps are
202 p_v4[IP].dst = self.pg2.remote_ip4
203 rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg2)
205 self.assertEqual(p[IP].tos, 2)
207 p_v4[IP].dst = self.pg3.remote_ip4
208 rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg3)
210 self.assertEqual(p[IP].tos, 3)
212 p_v6[IPv6].dst = self.pg3.remote_ip6
213 rx = self.send_and_expect(self.pg0, p_v6 * NUM_PKTS, self.pg3)
215 self.assertEqual(p[IPv6].tc, 3)
218 # remove the map on pg2 and pg3, now expect an unchanged IP tos
220 qm2.remove_vpp_config()
221 qm3.remove_vpp_config()
222 self.logger.info(self.vapi.cli("sh qos mark"))
224 self.assertFalse(qm3.query_vpp_config())
225 self.logger.info(self.vapi.cli("sh int feat pg2"))
227 p_v4[IP].dst = self.pg2.remote_ip4
228 rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg2)
230 self.assertEqual(p[IP].tos, 254)
232 p_v4[IP].dst = self.pg3.remote_ip4
233 rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg3)
235 self.assertEqual(p[IP].tos, 254)
238 # still mapping out of pg1
240 p_v4[IP].dst = self.pg1.remote_ip4
241 rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg1)
243 self.assertEqual(p[IP].tos, 1)
246 # disable the input recording on pg0
248 self.assertTrue(qr1.query_vpp_config())
249 qr1.remove_vpp_config()
252 # back to an unchanged TOS value
254 rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg1)
256 self.assertEqual(p[IP].tos, 254)
259 # disable the egress map on pg1 and pg4
261 qm1.remove_vpp_config()
262 qm4.remove_vpp_config()
265 # unchanged Tos on pg1
267 rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg1)
269 self.assertEqual(p[IP].tos, 254)
271 def test_qos_mpls(self):
272 """ QoS Mark/Record MPLS """
275 # 255 QoS for all input values
281 output = [scapy.compat.chb(from_ext)] * 256
282 os1 = b''.join(output)
283 output = [scapy.compat.chb(from_vlan)] * 256
284 os2 = b''.join(output)
285 output = [scapy.compat.chb(from_mpls)] * 256
286 os3 = b''.join(output)
287 output = [scapy.compat.chb(from_ip)] * 256
288 os4 = b''.join(output)
289 rows = [{'outputs': os1},
294 qem1 = VppQosEgressMap(self, 1, rows).add_vpp_config()
297 # a route with 1 MPLS label
299 route_10_0_0_1 = VppIpRoute(self, "10.0.0.1", 32,
300 [VppRoutePath(self.pg1.remote_ip4,
301 self.pg1.sw_if_index,
303 route_10_0_0_1.add_vpp_config()
306 # a route with 3 MPLS labels
308 route_10_0_0_3 = VppIpRoute(self, "10.0.0.3", 32,
309 [VppRoutePath(self.pg1.remote_ip4,
310 self.pg1.sw_if_index,
311 labels=[63, 33, 34])])
312 route_10_0_0_3.add_vpp_config()
315 # enable IP QoS recording on the input Pg0 and MPLS egress marking
318 qr1 = VppQosRecord(self, self.pg0,
319 self.QOS_SOURCE.QOS_API_SOURCE_IP).add_vpp_config()
320 qm1 = VppQosMark(self, self.pg1, qem1,
321 self.QOS_SOURCE.QOS_API_SOURCE_MPLS).add_vpp_config()
324 # packet that will get one label added and 3 labels added resp.
326 p_1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
327 IP(src=self.pg0.remote_ip4, dst="10.0.0.1", tos=1) /
328 UDP(sport=1234, dport=1234) /
329 Raw(scapy.compat.chb(100) * NUM_PKTS))
330 p_3 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
331 IP(src=self.pg0.remote_ip4, dst="10.0.0.3", tos=1) /
332 UDP(sport=1234, dport=1234) /
333 Raw(scapy.compat.chb(100) * NUM_PKTS))
335 rx = self.send_and_expect(self.pg0, p_1 * NUM_PKTS, self.pg1)
338 # only 3 bits of ToS value in MPLS make sure tos is correct
339 # and the label and EOS bit have not been corrupted
342 self.assertEqual(p[MPLS].cos, from_ip)
343 self.assertEqual(p[MPLS].label, 32)
344 self.assertEqual(p[MPLS].s, 1)
345 rx = self.send_and_expect(self.pg0, p_3 * NUM_PKTS, self.pg1)
347 self.assertEqual(p[MPLS].cos, from_ip)
348 self.assertEqual(p[MPLS].label, 63)
349 self.assertEqual(p[MPLS].s, 0)
351 self.assertEqual(h[MPLS].cos, from_ip)
352 self.assertEqual(h[MPLS].label, 33)
353 self.assertEqual(h[MPLS].s, 0)
355 self.assertEqual(h[MPLS].cos, from_ip)
356 self.assertEqual(h[MPLS].label, 34)
357 self.assertEqual(h[MPLS].s, 1)
360 # enable MPLS QoS recording on the input Pg0 and IP egress marking
365 self.QOS_SOURCE.QOS_API_SOURCE_MPLS).add_vpp_config()
367 self, self.pg1, qem1,
368 self.QOS_SOURCE.QOS_API_SOURCE_IP).add_vpp_config()
371 # MPLS x-connect - COS according to pg1 map
373 route_32_eos = VppMplsRoute(self, 32, 1,
374 [VppRoutePath(self.pg1.remote_ip4,
375 self.pg1.sw_if_index,
376 labels=[VppMplsLabel(33)])])
377 route_32_eos.add_vpp_config()
379 p_m1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
380 MPLS(label=32, cos=3, ttl=2) /
381 IP(src=self.pg0.remote_ip4, dst="10.0.0.1", tos=1) /
382 UDP(sport=1234, dport=1234) /
383 Raw(scapy.compat.chb(100) * NUM_PKTS))
385 rx = self.send_and_expect(self.pg0, p_m1 * NUM_PKTS, self.pg1)
387 self.assertEqual(p[MPLS].cos, from_mpls)
388 self.assertEqual(p[MPLS].label, 33)
389 self.assertEqual(p[MPLS].s, 1)
392 # MPLS deag - COS is copied from MPLS to IP
394 route_33_eos = VppMplsRoute(self, 33, 1,
395 [VppRoutePath("0.0.0.0",
398 route_33_eos.add_vpp_config()
400 route_10_0_0_4 = VppIpRoute(self, "10.0.0.4", 32,
401 [VppRoutePath(self.pg1.remote_ip4,
402 self.pg1.sw_if_index)])
403 route_10_0_0_4.add_vpp_config()
405 p_m2 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
406 MPLS(label=33, ttl=2, cos=3) /
407 IP(src=self.pg0.remote_ip4, dst="10.0.0.4", tos=1) /
408 UDP(sport=1234, dport=1234) /
409 Raw(scapy.compat.chb(100) * NUM_PKTS))
411 rx = self.send_and_expect(self.pg0, p_m2 * NUM_PKTS, self.pg1)
414 self.assertEqual(p[IP].tos, from_mpls)
416 def test_qos_vlan(self):
417 """QoS mark/record VLAN """
420 # QoS for all input values
422 output = [scapy.compat.chb(0)] * 256
423 for i in range(0, 255):
424 output[i] = scapy.compat.chb(255 - i)
425 os = b''.join(output)
426 rows = [{'outputs': os},
431 qem1 = VppQosEgressMap(self, 1, rows).add_vpp_config()
433 sub_if = VppDot1QSubint(self, self.pg0, 11)
442 # enable VLAN QoS recording/marking on the input Pg0 subinterface and
446 self.QOS_SOURCE.QOS_API_SOURCE_VLAN).add_vpp_config()
449 self.QOS_SOURCE.QOS_API_SOURCE_VLAN).add_vpp_config()
452 # IP marking/recording on pg1
454 qr_ip = VppQosRecord(
456 self.QOS_SOURCE.QOS_API_SOURCE_IP).add_vpp_config()
458 self, self.pg1, qem1,
459 self.QOS_SOURCE.QOS_API_SOURCE_IP).add_vpp_config()
462 # a routes to/from sub-interface
464 route_10_0_0_1 = VppIpRoute(self, "10.0.0.1", 32,
465 [VppRoutePath(sub_if.remote_ip4,
466 sub_if.sw_if_index)])
467 route_10_0_0_1.add_vpp_config()
468 route_10_0_0_2 = VppIpRoute(self, "10.0.0.2", 32,
469 [VppRoutePath(self.pg1.remote_ip4,
470 self.pg1.sw_if_index)])
471 route_10_0_0_2.add_vpp_config()
472 route_2001_1 = VppIpRoute(self, "2001::1", 128,
473 [VppRoutePath(sub_if.remote_ip6,
474 sub_if.sw_if_index)])
475 route_2001_1.add_vpp_config()
476 route_2001_2 = VppIpRoute(self, "2001::2", 128,
477 [VppRoutePath(self.pg1.remote_ip6,
478 self.pg1.sw_if_index)])
479 route_2001_2.add_vpp_config()
481 p_v1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
482 Dot1Q(vlan=11, prio=1) /
483 IP(src="1.1.1.1", dst="10.0.0.2", tos=1) /
484 UDP(sport=1234, dport=1234) /
485 Raw(scapy.compat.chb(100) * NUM_PKTS))
487 p_v2 = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
488 IP(src="1.1.1.1", dst="10.0.0.1", tos=1) /
489 UDP(sport=1234, dport=1234) /
490 Raw(scapy.compat.chb(100) * NUM_PKTS))
492 rx = self.send_and_expect(self.pg1, p_v2 * NUM_PKTS, self.pg0)
495 self.assertEqual(p[Dot1Q].prio, 6)
497 rx = self.send_and_expect(self.pg0, p_v1 * NUM_PKTS, self.pg1)
500 self.assertEqual(p[IP].tos, 254)
502 p_v1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
503 Dot1Q(vlan=11, prio=2) /
504 IPv6(src="2001::1", dst="2001::2", tc=1) /
505 UDP(sport=1234, dport=1234) /
506 Raw(scapy.compat.chb(100) * NUM_PKTS))
508 p_v2 = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
509 IPv6(src="3001::1", dst="2001::1", tc=1) /
510 UDP(sport=1234, dport=1234) /
511 Raw(scapy.compat.chb(100) * NUM_PKTS))
513 rx = self.send_and_expect(self.pg1, p_v2 * NUM_PKTS, self.pg0)
516 self.assertEqual(p[Dot1Q].prio, 6)
518 rx = self.send_and_expect(self.pg0, p_v1 * NUM_PKTS, self.pg1)
521 self.assertEqual(p[IPv6].tc, 253)
526 sub_if.unconfig_ip4()
527 sub_if.unconfig_ip6()
530 if __name__ == '__main__':
531 unittest.main(testRunner=VppTestRunner)