http: fix client parse error handling
[vpp.git] / test / test_qos.py
1 #!/usr/bin/env python3
2
3 import unittest
4
5 from framework import VppTestCase
6 from asfframework import VppTestRunner
7 from vpp_sub_interface import VppDot1QSubint
8 from vpp_ip_route import (
9     VppIpRoute,
10     VppRoutePath,
11     VppMplsRoute,
12     VppMplsLabel,
13     VppMplsTable,
14 )
15
16 import scapy.compat
17 from scapy.packet import Raw
18 from scapy.layers.l2 import Ether, Dot1Q
19 from scapy.layers.inet import IP, UDP
20 from scapy.layers.inet6 import IPv6
21 from scapy.contrib.mpls import MPLS
22 from vpp_papi import VppEnum
23 from vpp_qos import VppQosRecord, VppQosEgressMap, VppQosMark, VppQosStore
24
25 NUM_PKTS = 67
26
27
28 class TestQOS(VppTestCase):
29     """QOS Test Case"""
30
31     # Note: Since the enums aren't created dynamically until after
32     #       the papi client attaches to VPP, we put it in a property to
33     #       ensure it is the value at runtime, not at module load time.
34     @property
35     def QOS_SOURCE(self):
36         return VppEnum.vl_api_qos_source_t
37
38     @classmethod
39     def setUpClass(cls):
40         super(TestQOS, cls).setUpClass()
41
42     @classmethod
43     def tearDownClass(cls):
44         super(TestQOS, cls).tearDownClass()
45
46     def setUp(self):
47         super(TestQOS, self).setUp()
48
49         self.create_pg_interfaces(range(5))
50
51         tbl = VppMplsTable(self, 0)
52         tbl.add_vpp_config()
53
54         for i in self.pg_interfaces:
55             i.admin_up()
56             i.config_ip4()
57             i.resolve_arp()
58             i.config_ip6()
59             i.resolve_ndp()
60             i.enable_mpls()
61
62     def tearDown(self):
63         for i in self.pg_interfaces:
64             i.unconfig_ip4()
65             i.unconfig_ip6()
66             i.disable_mpls()
67
68         super(TestQOS, self).tearDown()
69
70     def test_qos_ip(self):
71         """QoS Mark/Record/Store IP"""
72
73         #
74         # for table 1 map the n=0xff possible values of input QoS mark,
75         # n to 1-n
76         #
77         output = [scapy.compat.chb(0)] * 256
78         for i in range(0, 255):
79             output[i] = scapy.compat.chb(255 - i)
80         os = b"".join(output)
81         rows = [{"outputs": os}, {"outputs": os}, {"outputs": os}, {"outputs": os}]
82
83         qem1 = VppQosEgressMap(self, 1, rows).add_vpp_config()
84
85         #
86         # For table 2 (and up) use the value n for everything
87         #
88         output = [scapy.compat.chb(2)] * 256
89         os = b"".join(output)
90         rows = [{"outputs": os}, {"outputs": os}, {"outputs": os}, {"outputs": os}]
91
92         qem2 = VppQosEgressMap(self, 2, rows).add_vpp_config()
93
94         output = [scapy.compat.chb(3)] * 256
95         os = b"".join(output)
96         rows = [{"outputs": os}, {"outputs": os}, {"outputs": os}, {"outputs": os}]
97
98         qem3 = VppQosEgressMap(self, 3, rows).add_vpp_config()
99
100         output = [scapy.compat.chb(4)] * 256
101         os = b"".join(output)
102         rows = [{"outputs": os}, {"outputs": os}, {"outputs": os}, {"outputs": os}]
103
104         qem4 = VppQosEgressMap(self, 4, rows).add_vpp_config()
105         qem5 = VppQosEgressMap(self, 5, rows).add_vpp_config()
106         qem6 = VppQosEgressMap(self, 6, rows).add_vpp_config()
107         qem7 = VppQosEgressMap(self, 7, rows).add_vpp_config()
108
109         self.assertTrue(qem7.query_vpp_config())
110         self.logger.info(self.vapi.cli("sh qos eg map"))
111
112         #
113         # Bind interface pgN to table n
114         #
115         qm1 = VppQosMark(
116             self, self.pg1, qem1, self.QOS_SOURCE.QOS_API_SOURCE_IP
117         ).add_vpp_config()
118         qm2 = VppQosMark(
119             self, self.pg2, qem2, self.QOS_SOURCE.QOS_API_SOURCE_IP
120         ).add_vpp_config()
121         qm3 = VppQosMark(
122             self, self.pg3, qem3, self.QOS_SOURCE.QOS_API_SOURCE_IP
123         ).add_vpp_config()
124         qm4 = VppQosMark(
125             self, self.pg4, qem4, self.QOS_SOURCE.QOS_API_SOURCE_IP
126         ).add_vpp_config()
127         self.assertTrue(qm3.query_vpp_config())
128
129         self.logger.info(self.vapi.cli("sh qos mark"))
130
131         #
132         # packets ingress on Pg0
133         #
134         p_v4 = (
135             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
136             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, tos=1)
137             / UDP(sport=1234, dport=1234)
138             / Raw(scapy.compat.chb(100) * NUM_PKTS)
139         )
140         p_v6 = (
141             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
142             / IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6, tc=1)
143             / UDP(sport=1234, dport=1234)
144             / Raw(scapy.compat.chb(100) * NUM_PKTS)
145         )
146
147         #
148         # Since we have not yet enabled the recording of the input QoS
149         # from the input iP header, the egress packet's ToS will be unchanged
150         #
151         rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg1)
152         for p in rx:
153             self.assertEqual(p[IP].tos, 1)
154         rx = self.send_and_expect(self.pg0, p_v6 * NUM_PKTS, self.pg1)
155         for p in rx:
156             self.assertEqual(p[IPv6].tc, 1)
157
158         #
159         # Enable QoS recording on IP input for pg0
160         #
161         qr1 = VppQosRecord(self, self.pg0, self.QOS_SOURCE.QOS_API_SOURCE_IP)
162         qr1.add_vpp_config()
163         self.logger.info(self.vapi.cli("sh qos record"))
164
165         #
166         # send the same packets, this time expect the input TOS of 1
167         # to be mapped to pg1's egress value of 254
168         #
169         rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg1)
170         for p in rx:
171             self.assertEqual(p[IP].tos, 254)
172         rx = self.send_and_expect(self.pg0, p_v6 * NUM_PKTS, self.pg1)
173         for p in rx:
174             self.assertEqual(p[IPv6].tc, 254)
175
176         #
177         # different input ToS to test the mapping
178         #
179         p_v4[IP].tos = 127
180         rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg1)
181         for p in rx:
182             self.assertEqual(p[IP].tos, 128)
183         p_v6[IPv6].tc = 127
184         rx = self.send_and_expect(self.pg0, p_v6 * NUM_PKTS, self.pg1)
185         for p in rx:
186             self.assertEqual(p[IPv6].tc, 128)
187
188         p_v4[IP].tos = 254
189         rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg1)
190         for p in rx:
191             self.assertEqual(p[IP].tos, 1)
192         p_v6[IPv6].tc = 254
193         rx = self.send_and_expect(self.pg0, p_v6 * NUM_PKTS, self.pg1)
194         for p in rx:
195             self.assertEqual(p[IPv6].tc, 1)
196
197         #
198         # send packets out the other interfaces to test the maps are
199         # correctly applied
200         #
201         p_v4[IP].dst = self.pg2.remote_ip4
202         rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg2)
203         for p in rx:
204             self.assertEqual(p[IP].tos, 2)
205
206         p_v4[IP].dst = self.pg3.remote_ip4
207         rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg3)
208         for p in rx:
209             self.assertEqual(p[IP].tos, 3)
210
211         p_v6[IPv6].dst = self.pg3.remote_ip6
212         rx = self.send_and_expect(self.pg0, p_v6 * NUM_PKTS, self.pg3)
213         for p in rx:
214             self.assertEqual(p[IPv6].tc, 3)
215
216         #
217         # remove the map on pg2 and pg3, now expect an unchanged IP tos
218         #
219         qm2.remove_vpp_config()
220         qm3.remove_vpp_config()
221         self.logger.info(self.vapi.cli("sh qos mark"))
222
223         self.assertFalse(qm3.query_vpp_config())
224         self.logger.info(self.vapi.cli("sh int feat pg2"))
225
226         p_v4[IP].dst = self.pg2.remote_ip4
227         rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg2)
228         for p in rx:
229             self.assertEqual(p[IP].tos, 254)
230
231         p_v4[IP].dst = self.pg3.remote_ip4
232         rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg3)
233         for p in rx:
234             self.assertEqual(p[IP].tos, 254)
235
236         #
237         # still mapping out of pg1
238         #
239         p_v4[IP].dst = self.pg1.remote_ip4
240         rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg1)
241         for p in rx:
242             self.assertEqual(p[IP].tos, 1)
243
244         #
245         # disable the input recording on pg0
246         #
247         self.assertTrue(qr1.query_vpp_config())
248         qr1.remove_vpp_config()
249
250         #
251         # back to an unchanged TOS value
252         #
253         rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg1)
254         for p in rx:
255             self.assertEqual(p[IP].tos, 254)
256
257         #
258         # enable QoS stroe instead of record
259         #
260         qst1 = VppQosStore(
261             self, self.pg0, self.QOS_SOURCE.QOS_API_SOURCE_IP, 5
262         ).add_vpp_config()
263         self.logger.info(self.vapi.cli("sh qos store"))
264
265         p_v4[IP].dst = self.pg1.remote_ip4
266         rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg1)
267         for p in rx:
268             self.assertEqual(p[IP].tos, 250)
269
270         #
271         # disable the input storing on pg0
272         #
273         self.assertTrue(qst1.query_vpp_config())
274         qst1.remove_vpp_config()
275
276         #
277         # back to an unchanged TOS value
278         #
279         rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg1)
280         for p in rx:
281             self.assertEqual(p[IP].tos, 254)
282
283         #
284         # disable the egress map on pg1 and pg4
285         #
286         qm1.remove_vpp_config()
287         qm4.remove_vpp_config()
288
289         #
290         # unchanged Tos on pg1
291         #
292         rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg1)
293         for p in rx:
294             self.assertEqual(p[IP].tos, 254)
295
296     def test_qos_mpls(self):
297         """QoS Mark/Record MPLS"""
298
299         #
300         # 255 QoS for all input values
301         #
302         from_ext = 7
303         from_ip = 6
304         from_mpls = 5
305         from_vlan = 4
306         output = [scapy.compat.chb(from_ext)] * 256
307         os1 = b"".join(output)
308         output = [scapy.compat.chb(from_vlan)] * 256
309         os2 = b"".join(output)
310         output = [scapy.compat.chb(from_mpls)] * 256
311         os3 = b"".join(output)
312         output = [scapy.compat.chb(from_ip)] * 256
313         os4 = b"".join(output)
314         rows = [{"outputs": os1}, {"outputs": os2}, {"outputs": os3}, {"outputs": os4}]
315
316         qem1 = VppQosEgressMap(self, 1, rows).add_vpp_config()
317
318         #
319         # a route with 1 MPLS label
320         #
321         route_10_0_0_1 = VppIpRoute(
322             self,
323             "10.0.0.1",
324             32,
325             [VppRoutePath(self.pg1.remote_ip4, self.pg1.sw_if_index, labels=[32])],
326         )
327         route_10_0_0_1.add_vpp_config()
328
329         #
330         # a route with 3 MPLS labels
331         #
332         route_10_0_0_3 = VppIpRoute(
333             self,
334             "10.0.0.3",
335             32,
336             [
337                 VppRoutePath(
338                     self.pg1.remote_ip4, self.pg1.sw_if_index, labels=[63, 33, 34]
339                 )
340             ],
341         )
342         route_10_0_0_3.add_vpp_config()
343
344         #
345         # enable IP QoS recording on the input Pg0 and MPLS egress marking
346         # on Pg1
347         #
348         qr1 = VppQosRecord(
349             self, self.pg0, self.QOS_SOURCE.QOS_API_SOURCE_IP
350         ).add_vpp_config()
351         qm1 = VppQosMark(
352             self, self.pg1, qem1, self.QOS_SOURCE.QOS_API_SOURCE_MPLS
353         ).add_vpp_config()
354
355         #
356         # packet that will get one label added and 3 labels added resp.
357         #
358         p_1 = (
359             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
360             / IP(src=self.pg0.remote_ip4, dst="10.0.0.1", tos=1)
361             / UDP(sport=1234, dport=1234)
362             / Raw(scapy.compat.chb(100) * NUM_PKTS)
363         )
364         p_3 = (
365             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
366             / IP(src=self.pg0.remote_ip4, dst="10.0.0.3", tos=1)
367             / UDP(sport=1234, dport=1234)
368             / Raw(scapy.compat.chb(100) * NUM_PKTS)
369         )
370
371         rx = self.send_and_expect(self.pg0, p_1 * NUM_PKTS, self.pg1)
372
373         #
374         # only 3 bits of ToS value in MPLS make sure tos is correct
375         # and the label and EOS bit have not been corrupted
376         #
377         for p in rx:
378             self.assertEqual(p[MPLS].cos, from_ip)
379             self.assertEqual(p[MPLS].label, 32)
380             self.assertEqual(p[MPLS].s, 1)
381         rx = self.send_and_expect(self.pg0, p_3 * NUM_PKTS, self.pg1)
382         for p in rx:
383             self.assertEqual(p[MPLS].cos, from_ip)
384             self.assertEqual(p[MPLS].label, 63)
385             self.assertEqual(p[MPLS].s, 0)
386             h = p[MPLS].payload
387             self.assertEqual(h[MPLS].cos, from_ip)
388             self.assertEqual(h[MPLS].label, 33)
389             self.assertEqual(h[MPLS].s, 0)
390             h = h[MPLS].payload
391             self.assertEqual(h[MPLS].cos, from_ip)
392             self.assertEqual(h[MPLS].label, 34)
393             self.assertEqual(h[MPLS].s, 1)
394
395         #
396         # enable MPLS QoS recording on the input Pg0 and IP egress marking
397         # on Pg1
398         #
399         qr2 = VppQosRecord(
400             self, self.pg0, self.QOS_SOURCE.QOS_API_SOURCE_MPLS
401         ).add_vpp_config()
402         qm2 = VppQosMark(
403             self, self.pg1, qem1, self.QOS_SOURCE.QOS_API_SOURCE_IP
404         ).add_vpp_config()
405
406         #
407         # MPLS x-connect - COS according to pg1 map
408         #
409         route_32_eos = VppMplsRoute(
410             self,
411             32,
412             1,
413             [
414                 VppRoutePath(
415                     self.pg1.remote_ip4, self.pg1.sw_if_index, labels=[VppMplsLabel(33)]
416                 )
417             ],
418         )
419         route_32_eos.add_vpp_config()
420
421         p_m1 = (
422             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
423             / MPLS(label=32, cos=3, ttl=2)
424             / IP(src=self.pg0.remote_ip4, dst="10.0.0.1", tos=1)
425             / UDP(sport=1234, dport=1234)
426             / Raw(scapy.compat.chb(100) * NUM_PKTS)
427         )
428
429         rx = self.send_and_expect(self.pg0, p_m1 * NUM_PKTS, self.pg1)
430         for p in rx:
431             self.assertEqual(p[MPLS].cos, from_mpls)
432             self.assertEqual(p[MPLS].label, 33)
433             self.assertEqual(p[MPLS].s, 1)
434
435         #
436         # MPLS deag - COS is copied from MPLS to IP
437         #
438         route_33_eos = VppMplsRoute(
439             self, 33, 1, [VppRoutePath("0.0.0.0", 0xFFFFFFFF, nh_table_id=0)]
440         )
441         route_33_eos.add_vpp_config()
442
443         route_10_0_0_4 = VppIpRoute(
444             self,
445             "10.0.0.4",
446             32,
447             [VppRoutePath(self.pg1.remote_ip4, self.pg1.sw_if_index)],
448         )
449         route_10_0_0_4.add_vpp_config()
450
451         p_m2 = (
452             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
453             / MPLS(label=33, ttl=2, cos=3)
454             / IP(src=self.pg0.remote_ip4, dst="10.0.0.4", tos=1)
455             / UDP(sport=1234, dport=1234)
456             / Raw(scapy.compat.chb(100) * NUM_PKTS)
457         )
458
459         rx = self.send_and_expect(self.pg0, p_m2 * NUM_PKTS, self.pg1)
460
461         for p in rx:
462             self.assertEqual(p[IP].tos, from_mpls)
463
464     def test_qos_vlan(self):
465         """QoS mark/record VLAN"""
466
467         #
468         # QoS for all input values
469         #
470         output = [scapy.compat.chb(0)] * 256
471         for i in range(0, 255):
472             output[i] = scapy.compat.chb(255 - i)
473         os = b"".join(output)
474         rows = [{"outputs": os}, {"outputs": os}, {"outputs": os}, {"outputs": os}]
475
476         qem1 = VppQosEgressMap(self, 1, rows).add_vpp_config()
477
478         sub_if = VppDot1QSubint(self, self.pg0, 11)
479
480         sub_if.admin_up()
481         sub_if.config_ip4()
482         sub_if.resolve_arp()
483         sub_if.config_ip6()
484         sub_if.resolve_ndp()
485
486         #
487         # enable VLAN QoS recording/marking on the input Pg0 subinterface and
488         #
489         qr_v = VppQosRecord(
490             self, sub_if, self.QOS_SOURCE.QOS_API_SOURCE_VLAN
491         ).add_vpp_config()
492         qm_v = VppQosMark(
493             self, sub_if, qem1, self.QOS_SOURCE.QOS_API_SOURCE_VLAN
494         ).add_vpp_config()
495
496         #
497         # IP marking/recording on pg1
498         #
499         qr_ip = VppQosRecord(
500             self, self.pg1, self.QOS_SOURCE.QOS_API_SOURCE_IP
501         ).add_vpp_config()
502         qm_ip = VppQosMark(
503             self, self.pg1, qem1, self.QOS_SOURCE.QOS_API_SOURCE_IP
504         ).add_vpp_config()
505
506         #
507         # a routes to/from sub-interface
508         #
509         route_10_0_0_1 = VppIpRoute(
510             self, "10.0.0.1", 32, [VppRoutePath(sub_if.remote_ip4, sub_if.sw_if_index)]
511         )
512         route_10_0_0_1.add_vpp_config()
513         route_10_0_0_2 = VppIpRoute(
514             self,
515             "10.0.0.2",
516             32,
517             [VppRoutePath(self.pg1.remote_ip4, self.pg1.sw_if_index)],
518         )
519         route_10_0_0_2.add_vpp_config()
520         route_2001_1 = VppIpRoute(
521             self, "2001::1", 128, [VppRoutePath(sub_if.remote_ip6, sub_if.sw_if_index)]
522         )
523         route_2001_1.add_vpp_config()
524         route_2001_2 = VppIpRoute(
525             self,
526             "2001::2",
527             128,
528             [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)],
529         )
530         route_2001_2.add_vpp_config()
531
532         p_v1 = (
533             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
534             / Dot1Q(vlan=11, prio=1)
535             / IP(src="1.1.1.1", dst="10.0.0.2", tos=1)
536             / UDP(sport=1234, dport=1234)
537             / Raw(scapy.compat.chb(100) * NUM_PKTS)
538         )
539
540         p_v2 = (
541             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
542             / IP(src="1.1.1.1", dst="10.0.0.1", tos=1)
543             / UDP(sport=1234, dport=1234)
544             / Raw(scapy.compat.chb(100) * NUM_PKTS)
545         )
546
547         p_v3 = (
548             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
549             / Dot1Q(vlan=11, prio=1, id=1)
550             / IP(src="1.1.1.1", dst="10.0.0.2", tos=2)
551             / UDP(sport=1234, dport=1234)
552             / Raw(scapy.compat.chb(100) * NUM_PKTS)
553         )
554
555         rx = self.send_and_expect(self.pg1, p_v2 * NUM_PKTS, self.pg0)
556
557         for p in rx:
558             self.assertEqual(p[Dot1Q].prio, 7)
559             self.assertEqual(p[Dot1Q].id, 0)
560
561         rx = self.send_and_expect(self.pg0, p_v3 * NUM_PKTS, self.pg1)
562
563         for p in rx:
564             self.assertEqual(p[IP].tos, 252)
565
566         rx = self.send_and_expect(self.pg0, p_v1 * NUM_PKTS, self.pg1)
567
568         for p in rx:
569             self.assertEqual(p[IP].tos, 253)
570
571         p_v1 = (
572             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
573             / Dot1Q(vlan=11, prio=2)
574             / IPv6(src="2001::1", dst="2001::2", tc=1)
575             / UDP(sport=1234, dport=1234)
576             / Raw(scapy.compat.chb(100) * NUM_PKTS)
577         )
578
579         p_v2 = (
580             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
581             / IPv6(src="3001::1", dst="2001::1", tc=1)
582             / UDP(sport=1234, dport=1234)
583             / Raw(scapy.compat.chb(100) * NUM_PKTS)
584         )
585
586         rx = self.send_and_expect(self.pg1, p_v2 * NUM_PKTS, self.pg0)
587
588         for p in rx:
589             self.assertEqual(p[Dot1Q].prio, 7)
590             self.assertEqual(p[Dot1Q].id, 0)
591
592         rx = self.send_and_expect(self.pg0, p_v1 * NUM_PKTS, self.pg1)
593
594         for p in rx:
595             self.assertEqual(p[IPv6].tc, 251)
596
597         #
598         # cleanup
599         #
600         sub_if.unconfig_ip4()
601         sub_if.unconfig_ip6()
602
603
604 if __name__ == "__main__":
605     unittest.main(testRunner=VppTestRunner)