tests: replace pycodestyle with black
[vpp.git] / test / test_qos.py
1 #!/usr/bin/env python3
2
3 import unittest
4
5 from framework import VppTestCase, VppTestRunner
6 from vpp_sub_interface import VppDot1QSubint
7 from vpp_ip import DpoProto
8 from vpp_ip_route import (
9     VppIpRoute,
10     VppRoutePath,
11     VppMplsRoute,
12     VppMplsLabel,
13     VppMplsTable,
14     FibPathProto,
15 )
16
17 import scapy.compat
18 from scapy.packet import Raw
19 from scapy.layers.l2 import Ether, Dot1Q
20 from scapy.layers.inet import IP, UDP
21 from scapy.layers.inet6 import IPv6
22 from scapy.contrib.mpls import MPLS
23 from vpp_papi import VppEnum
24 from vpp_qos import VppQosRecord, VppQosEgressMap, VppQosMark, VppQosStore
25
26 NUM_PKTS = 67
27
28
29 class TestQOS(VppTestCase):
30     """QOS Test Case"""
31
32     # Note: Since the enums aren't created dynamically until after
33     #       the papi client attaches to VPP, we put it in a property to
34     #       ensure it is the value at runtime, not at module load time.
35     @property
36     def QOS_SOURCE(self):
37         return VppEnum.vl_api_qos_source_t
38
39     @classmethod
40     def setUpClass(cls):
41         super(TestQOS, cls).setUpClass()
42
43     @classmethod
44     def tearDownClass(cls):
45         super(TestQOS, cls).tearDownClass()
46
47     def setUp(self):
48         super(TestQOS, self).setUp()
49
50         self.create_pg_interfaces(range(5))
51
52         tbl = VppMplsTable(self, 0)
53         tbl.add_vpp_config()
54
55         for i in self.pg_interfaces:
56             i.admin_up()
57             i.config_ip4()
58             i.resolve_arp()
59             i.config_ip6()
60             i.resolve_ndp()
61             i.enable_mpls()
62
63     def tearDown(self):
64         for i in self.pg_interfaces:
65             i.unconfig_ip4()
66             i.unconfig_ip6()
67             i.disable_mpls()
68
69         super(TestQOS, self).tearDown()
70
71     def test_qos_ip(self):
72         """QoS Mark/Record/Store IP"""
73
74         #
75         # for table 1 map the n=0xff possible values of input QoS mark,
76         # n to 1-n
77         #
78         output = [scapy.compat.chb(0)] * 256
79         for i in range(0, 255):
80             output[i] = scapy.compat.chb(255 - i)
81         os = b"".join(output)
82         rows = [{"outputs": os}, {"outputs": os}, {"outputs": os}, {"outputs": os}]
83
84         qem1 = VppQosEgressMap(self, 1, rows).add_vpp_config()
85
86         #
87         # For table 2 (and up) use the value n for everything
88         #
89         output = [scapy.compat.chb(2)] * 256
90         os = b"".join(output)
91         rows = [{"outputs": os}, {"outputs": os}, {"outputs": os}, {"outputs": os}]
92
93         qem2 = VppQosEgressMap(self, 2, rows).add_vpp_config()
94
95         output = [scapy.compat.chb(3)] * 256
96         os = b"".join(output)
97         rows = [{"outputs": os}, {"outputs": os}, {"outputs": os}, {"outputs": os}]
98
99         qem3 = VppQosEgressMap(self, 3, rows).add_vpp_config()
100
101         output = [scapy.compat.chb(4)] * 256
102         os = b"".join(output)
103         rows = [{"outputs": os}, {"outputs": os}, {"outputs": os}, {"outputs": os}]
104
105         qem4 = VppQosEgressMap(self, 4, rows).add_vpp_config()
106         qem5 = VppQosEgressMap(self, 5, rows).add_vpp_config()
107         qem6 = VppQosEgressMap(self, 6, rows).add_vpp_config()
108         qem7 = VppQosEgressMap(self, 7, rows).add_vpp_config()
109
110         self.assertTrue(qem7.query_vpp_config())
111         self.logger.info(self.vapi.cli("sh qos eg map"))
112
113         #
114         # Bind interface pgN to table n
115         #
116         qm1 = VppQosMark(
117             self, self.pg1, qem1, self.QOS_SOURCE.QOS_API_SOURCE_IP
118         ).add_vpp_config()
119         qm2 = VppQosMark(
120             self, self.pg2, qem2, self.QOS_SOURCE.QOS_API_SOURCE_IP
121         ).add_vpp_config()
122         qm3 = VppQosMark(
123             self, self.pg3, qem3, self.QOS_SOURCE.QOS_API_SOURCE_IP
124         ).add_vpp_config()
125         qm4 = VppQosMark(
126             self, self.pg4, qem4, self.QOS_SOURCE.QOS_API_SOURCE_IP
127         ).add_vpp_config()
128         self.assertTrue(qm3.query_vpp_config())
129
130         self.logger.info(self.vapi.cli("sh qos mark"))
131
132         #
133         # packets ingress on Pg0
134         #
135         p_v4 = (
136             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
137             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, tos=1)
138             / UDP(sport=1234, dport=1234)
139             / Raw(scapy.compat.chb(100) * NUM_PKTS)
140         )
141         p_v6 = (
142             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
143             / IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6, tc=1)
144             / UDP(sport=1234, dport=1234)
145             / Raw(scapy.compat.chb(100) * NUM_PKTS)
146         )
147
148         #
149         # Since we have not yet enabled the recording of the input QoS
150         # from the input iP header, the egress packet's ToS will be unchanged
151         #
152         rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg1)
153         for p in rx:
154             self.assertEqual(p[IP].tos, 1)
155         rx = self.send_and_expect(self.pg0, p_v6 * NUM_PKTS, self.pg1)
156         for p in rx:
157             self.assertEqual(p[IPv6].tc, 1)
158
159         #
160         # Enable QoS recording on IP input for pg0
161         #
162         qr1 = VppQosRecord(self, self.pg0, self.QOS_SOURCE.QOS_API_SOURCE_IP)
163         qr1.add_vpp_config()
164         self.logger.info(self.vapi.cli("sh qos record"))
165
166         #
167         # send the same packets, this time expect the input TOS of 1
168         # to be mapped to pg1's egress value of 254
169         #
170         rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg1)
171         for p in rx:
172             self.assertEqual(p[IP].tos, 254)
173         rx = self.send_and_expect(self.pg0, p_v6 * NUM_PKTS, self.pg1)
174         for p in rx:
175             self.assertEqual(p[IPv6].tc, 254)
176
177         #
178         # different input ToS to test the mapping
179         #
180         p_v4[IP].tos = 127
181         rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg1)
182         for p in rx:
183             self.assertEqual(p[IP].tos, 128)
184         p_v6[IPv6].tc = 127
185         rx = self.send_and_expect(self.pg0, p_v6 * NUM_PKTS, self.pg1)
186         for p in rx:
187             self.assertEqual(p[IPv6].tc, 128)
188
189         p_v4[IP].tos = 254
190         rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg1)
191         for p in rx:
192             self.assertEqual(p[IP].tos, 1)
193         p_v6[IPv6].tc = 254
194         rx = self.send_and_expect(self.pg0, p_v6 * NUM_PKTS, self.pg1)
195         for p in rx:
196             self.assertEqual(p[IPv6].tc, 1)
197
198         #
199         # send packets out the other interfaces to test the maps are
200         # correctly applied
201         #
202         p_v4[IP].dst = self.pg2.remote_ip4
203         rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg2)
204         for p in rx:
205             self.assertEqual(p[IP].tos, 2)
206
207         p_v4[IP].dst = self.pg3.remote_ip4
208         rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg3)
209         for p in rx:
210             self.assertEqual(p[IP].tos, 3)
211
212         p_v6[IPv6].dst = self.pg3.remote_ip6
213         rx = self.send_and_expect(self.pg0, p_v6 * NUM_PKTS, self.pg3)
214         for p in rx:
215             self.assertEqual(p[IPv6].tc, 3)
216
217         #
218         # remove the map on pg2 and pg3, now expect an unchanged IP tos
219         #
220         qm2.remove_vpp_config()
221         qm3.remove_vpp_config()
222         self.logger.info(self.vapi.cli("sh qos mark"))
223
224         self.assertFalse(qm3.query_vpp_config())
225         self.logger.info(self.vapi.cli("sh int feat pg2"))
226
227         p_v4[IP].dst = self.pg2.remote_ip4
228         rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg2)
229         for p in rx:
230             self.assertEqual(p[IP].tos, 254)
231
232         p_v4[IP].dst = self.pg3.remote_ip4
233         rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg3)
234         for p in rx:
235             self.assertEqual(p[IP].tos, 254)
236
237         #
238         # still mapping out of pg1
239         #
240         p_v4[IP].dst = self.pg1.remote_ip4
241         rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg1)
242         for p in rx:
243             self.assertEqual(p[IP].tos, 1)
244
245         #
246         # disable the input recording on pg0
247         #
248         self.assertTrue(qr1.query_vpp_config())
249         qr1.remove_vpp_config()
250
251         #
252         # back to an unchanged TOS value
253         #
254         rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg1)
255         for p in rx:
256             self.assertEqual(p[IP].tos, 254)
257
258         #
259         # enable QoS stroe instead of record
260         #
261         qst1 = VppQosStore(
262             self, self.pg0, self.QOS_SOURCE.QOS_API_SOURCE_IP, 5
263         ).add_vpp_config()
264         self.logger.info(self.vapi.cli("sh qos store"))
265
266         p_v4[IP].dst = self.pg1.remote_ip4
267         rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg1)
268         for p in rx:
269             self.assertEqual(p[IP].tos, 250)
270
271         #
272         # disable the input storing on pg0
273         #
274         self.assertTrue(qst1.query_vpp_config())
275         qst1.remove_vpp_config()
276
277         #
278         # back to an unchanged TOS value
279         #
280         rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg1)
281         for p in rx:
282             self.assertEqual(p[IP].tos, 254)
283
284         #
285         # disable the egress map on pg1 and pg4
286         #
287         qm1.remove_vpp_config()
288         qm4.remove_vpp_config()
289
290         #
291         # unchanged Tos on pg1
292         #
293         rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg1)
294         for p in rx:
295             self.assertEqual(p[IP].tos, 254)
296
297     def test_qos_mpls(self):
298         """QoS Mark/Record MPLS"""
299
300         #
301         # 255 QoS for all input values
302         #
303         from_ext = 7
304         from_ip = 6
305         from_mpls = 5
306         from_vlan = 4
307         output = [scapy.compat.chb(from_ext)] * 256
308         os1 = b"".join(output)
309         output = [scapy.compat.chb(from_vlan)] * 256
310         os2 = b"".join(output)
311         output = [scapy.compat.chb(from_mpls)] * 256
312         os3 = b"".join(output)
313         output = [scapy.compat.chb(from_ip)] * 256
314         os4 = b"".join(output)
315         rows = [{"outputs": os1}, {"outputs": os2}, {"outputs": os3}, {"outputs": os4}]
316
317         qem1 = VppQosEgressMap(self, 1, rows).add_vpp_config()
318
319         #
320         # a route with 1 MPLS label
321         #
322         route_10_0_0_1 = VppIpRoute(
323             self,
324             "10.0.0.1",
325             32,
326             [VppRoutePath(self.pg1.remote_ip4, self.pg1.sw_if_index, labels=[32])],
327         )
328         route_10_0_0_1.add_vpp_config()
329
330         #
331         # a route with 3 MPLS labels
332         #
333         route_10_0_0_3 = VppIpRoute(
334             self,
335             "10.0.0.3",
336             32,
337             [
338                 VppRoutePath(
339                     self.pg1.remote_ip4, self.pg1.sw_if_index, labels=[63, 33, 34]
340                 )
341             ],
342         )
343         route_10_0_0_3.add_vpp_config()
344
345         #
346         # enable IP QoS recording on the input Pg0 and MPLS egress marking
347         # on Pg1
348         #
349         qr1 = VppQosRecord(
350             self, self.pg0, self.QOS_SOURCE.QOS_API_SOURCE_IP
351         ).add_vpp_config()
352         qm1 = VppQosMark(
353             self, self.pg1, qem1, self.QOS_SOURCE.QOS_API_SOURCE_MPLS
354         ).add_vpp_config()
355
356         #
357         # packet that will get one label added and 3 labels added resp.
358         #
359         p_1 = (
360             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
361             / IP(src=self.pg0.remote_ip4, dst="10.0.0.1", tos=1)
362             / UDP(sport=1234, dport=1234)
363             / Raw(scapy.compat.chb(100) * NUM_PKTS)
364         )
365         p_3 = (
366             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
367             / IP(src=self.pg0.remote_ip4, dst="10.0.0.3", tos=1)
368             / UDP(sport=1234, dport=1234)
369             / Raw(scapy.compat.chb(100) * NUM_PKTS)
370         )
371
372         rx = self.send_and_expect(self.pg0, p_1 * NUM_PKTS, self.pg1)
373
374         #
375         # only 3 bits of ToS value in MPLS make sure tos is correct
376         # and the label and EOS bit have not been corrupted
377         #
378         for p in rx:
379             self.assertEqual(p[MPLS].cos, from_ip)
380             self.assertEqual(p[MPLS].label, 32)
381             self.assertEqual(p[MPLS].s, 1)
382         rx = self.send_and_expect(self.pg0, p_3 * NUM_PKTS, self.pg1)
383         for p in rx:
384             self.assertEqual(p[MPLS].cos, from_ip)
385             self.assertEqual(p[MPLS].label, 63)
386             self.assertEqual(p[MPLS].s, 0)
387             h = p[MPLS].payload
388             self.assertEqual(h[MPLS].cos, from_ip)
389             self.assertEqual(h[MPLS].label, 33)
390             self.assertEqual(h[MPLS].s, 0)
391             h = h[MPLS].payload
392             self.assertEqual(h[MPLS].cos, from_ip)
393             self.assertEqual(h[MPLS].label, 34)
394             self.assertEqual(h[MPLS].s, 1)
395
396         #
397         # enable MPLS QoS recording on the input Pg0 and IP egress marking
398         # on Pg1
399         #
400         qr2 = VppQosRecord(
401             self, self.pg0, self.QOS_SOURCE.QOS_API_SOURCE_MPLS
402         ).add_vpp_config()
403         qm2 = VppQosMark(
404             self, self.pg1, qem1, self.QOS_SOURCE.QOS_API_SOURCE_IP
405         ).add_vpp_config()
406
407         #
408         # MPLS x-connect - COS according to pg1 map
409         #
410         route_32_eos = VppMplsRoute(
411             self,
412             32,
413             1,
414             [
415                 VppRoutePath(
416                     self.pg1.remote_ip4, self.pg1.sw_if_index, labels=[VppMplsLabel(33)]
417                 )
418             ],
419         )
420         route_32_eos.add_vpp_config()
421
422         p_m1 = (
423             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
424             / MPLS(label=32, cos=3, ttl=2)
425             / IP(src=self.pg0.remote_ip4, dst="10.0.0.1", tos=1)
426             / UDP(sport=1234, dport=1234)
427             / Raw(scapy.compat.chb(100) * NUM_PKTS)
428         )
429
430         rx = self.send_and_expect(self.pg0, p_m1 * NUM_PKTS, self.pg1)
431         for p in rx:
432             self.assertEqual(p[MPLS].cos, from_mpls)
433             self.assertEqual(p[MPLS].label, 33)
434             self.assertEqual(p[MPLS].s, 1)
435
436         #
437         # MPLS deag - COS is copied from MPLS to IP
438         #
439         route_33_eos = VppMplsRoute(
440             self, 33, 1, [VppRoutePath("0.0.0.0", 0xFFFFFFFF, nh_table_id=0)]
441         )
442         route_33_eos.add_vpp_config()
443
444         route_10_0_0_4 = VppIpRoute(
445             self,
446             "10.0.0.4",
447             32,
448             [VppRoutePath(self.pg1.remote_ip4, self.pg1.sw_if_index)],
449         )
450         route_10_0_0_4.add_vpp_config()
451
452         p_m2 = (
453             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
454             / MPLS(label=33, ttl=2, cos=3)
455             / IP(src=self.pg0.remote_ip4, dst="10.0.0.4", tos=1)
456             / UDP(sport=1234, dport=1234)
457             / Raw(scapy.compat.chb(100) * NUM_PKTS)
458         )
459
460         rx = self.send_and_expect(self.pg0, p_m2 * NUM_PKTS, self.pg1)
461
462         for p in rx:
463             self.assertEqual(p[IP].tos, from_mpls)
464
465     def test_qos_vlan(self):
466         """QoS mark/record VLAN"""
467
468         #
469         # QoS for all input values
470         #
471         output = [scapy.compat.chb(0)] * 256
472         for i in range(0, 255):
473             output[i] = scapy.compat.chb(255 - i)
474         os = b"".join(output)
475         rows = [{"outputs": os}, {"outputs": os}, {"outputs": os}, {"outputs": os}]
476
477         qem1 = VppQosEgressMap(self, 1, rows).add_vpp_config()
478
479         sub_if = VppDot1QSubint(self, self.pg0, 11)
480
481         sub_if.admin_up()
482         sub_if.config_ip4()
483         sub_if.resolve_arp()
484         sub_if.config_ip6()
485         sub_if.resolve_ndp()
486
487         #
488         # enable VLAN QoS recording/marking on the input Pg0 subinterface and
489         #
490         qr_v = VppQosRecord(
491             self, sub_if, self.QOS_SOURCE.QOS_API_SOURCE_VLAN
492         ).add_vpp_config()
493         qm_v = VppQosMark(
494             self, sub_if, qem1, self.QOS_SOURCE.QOS_API_SOURCE_VLAN
495         ).add_vpp_config()
496
497         #
498         # IP marking/recording on pg1
499         #
500         qr_ip = VppQosRecord(
501             self, self.pg1, self.QOS_SOURCE.QOS_API_SOURCE_IP
502         ).add_vpp_config()
503         qm_ip = VppQosMark(
504             self, self.pg1, qem1, self.QOS_SOURCE.QOS_API_SOURCE_IP
505         ).add_vpp_config()
506
507         #
508         # a routes to/from sub-interface
509         #
510         route_10_0_0_1 = VppIpRoute(
511             self, "10.0.0.1", 32, [VppRoutePath(sub_if.remote_ip4, sub_if.sw_if_index)]
512         )
513         route_10_0_0_1.add_vpp_config()
514         route_10_0_0_2 = VppIpRoute(
515             self,
516             "10.0.0.2",
517             32,
518             [VppRoutePath(self.pg1.remote_ip4, self.pg1.sw_if_index)],
519         )
520         route_10_0_0_2.add_vpp_config()
521         route_2001_1 = VppIpRoute(
522             self, "2001::1", 128, [VppRoutePath(sub_if.remote_ip6, sub_if.sw_if_index)]
523         )
524         route_2001_1.add_vpp_config()
525         route_2001_2 = VppIpRoute(
526             self,
527             "2001::2",
528             128,
529             [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)],
530         )
531         route_2001_2.add_vpp_config()
532
533         p_v1 = (
534             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
535             / Dot1Q(vlan=11, prio=1)
536             / IP(src="1.1.1.1", dst="10.0.0.2", tos=1)
537             / UDP(sport=1234, dport=1234)
538             / Raw(scapy.compat.chb(100) * NUM_PKTS)
539         )
540
541         p_v2 = (
542             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
543             / IP(src="1.1.1.1", dst="10.0.0.1", tos=1)
544             / UDP(sport=1234, dport=1234)
545             / Raw(scapy.compat.chb(100) * NUM_PKTS)
546         )
547
548         p_v3 = (
549             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
550             / Dot1Q(vlan=11, prio=1, id=1)
551             / IP(src="1.1.1.1", dst="10.0.0.2", tos=2)
552             / UDP(sport=1234, dport=1234)
553             / Raw(scapy.compat.chb(100) * NUM_PKTS)
554         )
555
556         rx = self.send_and_expect(self.pg1, p_v2 * NUM_PKTS, self.pg0)
557
558         for p in rx:
559             self.assertEqual(p[Dot1Q].prio, 7)
560             self.assertEqual(p[Dot1Q].id, 0)
561
562         rx = self.send_and_expect(self.pg0, p_v3 * NUM_PKTS, self.pg1)
563
564         for p in rx:
565             self.assertEqual(p[IP].tos, 252)
566
567         rx = self.send_and_expect(self.pg0, p_v1 * NUM_PKTS, self.pg1)
568
569         for p in rx:
570             self.assertEqual(p[IP].tos, 253)
571
572         p_v1 = (
573             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
574             / Dot1Q(vlan=11, prio=2)
575             / IPv6(src="2001::1", dst="2001::2", tc=1)
576             / UDP(sport=1234, dport=1234)
577             / Raw(scapy.compat.chb(100) * NUM_PKTS)
578         )
579
580         p_v2 = (
581             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
582             / IPv6(src="3001::1", dst="2001::1", tc=1)
583             / UDP(sport=1234, dport=1234)
584             / Raw(scapy.compat.chb(100) * NUM_PKTS)
585         )
586
587         rx = self.send_and_expect(self.pg1, p_v2 * NUM_PKTS, self.pg0)
588
589         for p in rx:
590             self.assertEqual(p[Dot1Q].prio, 7)
591             self.assertEqual(p[Dot1Q].id, 0)
592
593         rx = self.send_and_expect(self.pg0, p_v1 * NUM_PKTS, self.pg1)
594
595         for p in rx:
596             self.assertEqual(p[IPv6].tc, 251)
597
598         #
599         # cleanup
600         #
601         sub_if.unconfig_ip4()
602         sub_if.unconfig_ip6()
603
604
605 if __name__ == "__main__":
606     unittest.main(testRunner=VppTestRunner)