7 from framework import VppTestCase, VppTestRunner
8 from vpp_object import VppObject
9 from vpp_papi_provider import QOS_SOURCE
10 from vpp_ip_route import VppIpRoute, VppRoutePath, VppMplsRoute
11 from vpp_sub_interface import VppSubInterface, VppDot1QSubint
13 from scapy.packet import Raw
14 from scapy.layers.l2 import Ether, Dot1Q
15 from scapy.layers.inet import IP, UDP
16 from scapy.layers.inet6 import IPv6
17 from scapy.contrib.mpls import MPLS
20 class TestQOS(VppTestCase):
24 super(TestQOS, self).setUp()
26 self.create_pg_interfaces(range(5))
28 for i in self.pg_interfaces:
36 for i in self.pg_interfaces:
40 super(TestQOS, self).tearDown()
42 def test_qos_ip(self):
46 # for table 1 map the n=0xff possible values of input QoS mark,
49 output = [chr(0)] * 256
50 for i in range(0, 255):
51 output[i] = chr(255 - i)
53 rows = [{'outputs': os},
58 self.vapi.qos_egress_map_update(1, rows)
61 # For table 2 (and up) use the value n for everything
63 output = [chr(2)] * 256
65 rows = [{'outputs': os},
70 self.vapi.qos_egress_map_update(2, rows)
72 output = [chr(3)] * 256
74 rows = [{'outputs': os},
79 self.vapi.qos_egress_map_update(3, rows)
81 output = [chr(4)] * 256
83 rows = [{'outputs': os},
87 self.vapi.qos_egress_map_update(4, rows)
88 self.vapi.qos_egress_map_update(5, rows)
89 self.vapi.qos_egress_map_update(6, rows)
90 self.vapi.qos_egress_map_update(7, rows)
92 self.logger.info(self.vapi.cli("sh qos eg map"))
95 # Bind interface pgN to table n
97 self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
101 self.vapi.qos_mark_enable_disable(self.pg2.sw_if_index,
105 self.vapi.qos_mark_enable_disable(self.pg3.sw_if_index,
109 self.vapi.qos_mark_enable_disable(self.pg4.sw_if_index,
115 # packets ingress on Pg0
117 p_v4 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
118 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, tos=1) /
119 UDP(sport=1234, dport=1234) /
121 p_v6 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
122 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6,
124 UDP(sport=1234, dport=1234) /
128 # Since we have not yet enabled the recording of the input QoS
129 # from the input iP header, the egress packet's ToS will be unchanged
131 rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
133 self.assertEqual(p[IP].tos, 1)
134 rx = self.send_and_expect(self.pg0, p_v6 * 65, self.pg1)
136 self.assertEqual(p[IPv6].tc, 1)
139 # Enable QoS recrding on IP input for pg0
141 self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
146 # send the same packets, this time expect the input TOS of 1
147 # to be mapped to pg1's egress value of 254
149 rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
151 self.assertEqual(p[IP].tos, 254)
152 rx = self.send_and_expect(self.pg0, p_v6 * 65, self.pg1)
154 self.assertEqual(p[IPv6].tc, 254)
157 # different input ToS to test the mapping
160 rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
162 self.assertEqual(p[IP].tos, 128)
164 rx = self.send_and_expect(self.pg0, p_v6 * 65, self.pg1)
166 self.assertEqual(p[IPv6].tc, 128)
169 rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
171 self.assertEqual(p[IP].tos, 1)
173 rx = self.send_and_expect(self.pg0, p_v6 * 65, self.pg1)
175 self.assertEqual(p[IPv6].tc, 1)
178 # send packets out the other interfaces to test the maps are
181 p_v4[IP].dst = self.pg2.remote_ip4
182 rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg2)
184 self.assertEqual(p[IP].tos, 2)
186 p_v4[IP].dst = self.pg3.remote_ip4
187 rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg3)
189 self.assertEqual(p[IP].tos, 3)
191 p_v6[IPv6].dst = self.pg3.remote_ip6
192 rx = self.send_and_expect(self.pg0, p_v6 * 65, self.pg3)
194 self.assertEqual(p[IPv6].tc, 3)
197 # remove the map on pg2 and pg3, now expect an unchanged IP tos
199 self.vapi.qos_mark_enable_disable(self.pg2.sw_if_index,
203 self.vapi.qos_mark_enable_disable(self.pg3.sw_if_index,
207 self.logger.info(self.vapi.cli("sh int feat pg2"))
209 p_v4[IP].dst = self.pg2.remote_ip4
210 rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg2)
212 self.assertEqual(p[IP].tos, 254)
214 p_v4[IP].dst = self.pg3.remote_ip4
215 rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg3)
217 self.assertEqual(p[IP].tos, 254)
220 # still mapping out of pg1
222 p_v4[IP].dst = self.pg1.remote_ip4
223 rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
225 self.assertEqual(p[IP].tos, 1)
228 # disable the input recording on pg0
230 self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
235 # back to an unchanged TOS value
237 rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
239 self.assertEqual(p[IP].tos, 254)
242 # disable the egress map on pg1 and pg4
244 self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
248 self.vapi.qos_mark_enable_disable(self.pg4.sw_if_index,
254 # unchanged Tos on pg1
256 rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
258 self.assertEqual(p[IP].tos, 254)
263 self.vapi.qos_egress_map_delete(1)
264 self.vapi.qos_egress_map_delete(4)
265 self.vapi.qos_egress_map_delete(2)
266 self.vapi.qos_egress_map_delete(3)
267 self.vapi.qos_egress_map_delete(5)
268 self.vapi.qos_egress_map_delete(6)
269 self.vapi.qos_egress_map_delete(7)
271 def test_qos_mpls(self):
272 """ QoS Mark MPLS """
275 # 255 QoS for all input values
277 output = [chr(255)] * 256
279 rows = [{'outputs': os},
284 self.vapi.qos_egress_map_update(1, rows)
287 # a route with 1 MPLS label
289 route_10_0_0_1 = VppIpRoute(self, "10.0.0.1", 32,
290 [VppRoutePath(self.pg1.remote_ip4,
291 self.pg1.sw_if_index,
293 route_10_0_0_1.add_vpp_config()
296 # a route with 3 MPLS labels
298 route_10_0_0_3 = VppIpRoute(self, "10.0.0.3", 32,
299 [VppRoutePath(self.pg1.remote_ip4,
300 self.pg1.sw_if_index,
301 labels=[63, 33, 34])])
302 route_10_0_0_3.add_vpp_config()
305 # enable IP QoS recording on the input Pg0 and MPLS egress marking
308 self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
311 self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
317 # packet that will get one label added and 3 labels added resp.
319 p_1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
320 IP(src=self.pg0.remote_ip4, dst="10.0.0.1", tos=1) /
321 UDP(sport=1234, dport=1234) /
323 p_3 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
324 IP(src=self.pg0.remote_ip4, dst="10.0.0.3", tos=1) /
325 UDP(sport=1234, dport=1234) /
328 rx = self.send_and_expect(self.pg0, p_1 * 65, self.pg1)
331 # only 3 bits of ToS value in MPLS make sure tos is correct
332 # and the label and EOS bit have not been corrupted
335 self.assertEqual(p[MPLS].cos, 7)
336 self.assertEqual(p[MPLS].label, 32)
337 self.assertEqual(p[MPLS].s, 1)
338 rx = self.send_and_expect(self.pg0, p_3 * 65, self.pg1)
340 self.assertEqual(p[MPLS].cos, 7)
341 self.assertEqual(p[MPLS].label, 63)
342 self.assertEqual(p[MPLS].s, 0)
344 self.assertEqual(h[MPLS].cos, 7)
345 self.assertEqual(h[MPLS].label, 33)
346 self.assertEqual(h[MPLS].s, 0)
348 self.assertEqual(h[MPLS].cos, 7)
349 self.assertEqual(h[MPLS].label, 34)
350 self.assertEqual(h[MPLS].s, 1)
355 self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
358 self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
362 self.vapi.qos_egress_map_delete(1)
365 if __name__ == '__main__':
366 unittest.main(testRunner=VppTestRunner)