Tests Cleanup: Fix missing calls to setUpClass/tearDownClass.
[vpp.git] / test / test_qos.py
1 #!/usr/bin/env python
2
3 import unittest
4
5 from framework import VppTestCase, VppTestRunner
6 from vpp_papi_provider import QOS_SOURCE
7 from vpp_sub_interface import VppDot1QSubint
8 from vpp_ip import DpoProto
9 from vpp_ip_route import VppIpRoute, VppRoutePath, VppMplsRoute, \
10     VppMplsLabel, VppMplsTable
11
12 import scapy.compat
13 from scapy.packet import Raw
14 from scapy.layers.l2 import Ether, Dot1Q
15 from scapy.layers.inet import IP, UDP
16 from scapy.layers.inet6 import IPv6
17 from scapy.contrib.mpls import MPLS
18
19
20 class TestQOS(VppTestCase):
21     """ QOS Test Case """
22
23     @classmethod
24     def setUpClass(cls):
25         super(TestQOS, cls).setUpClass()
26
27     @classmethod
28     def tearDownClass(cls):
29         super(TestQOS, cls).tearDownClass()
30
31     def setUp(self):
32         super(TestQOS, self).setUp()
33
34         self.create_pg_interfaces(range(5))
35
36         tbl = VppMplsTable(self, 0)
37         tbl.add_vpp_config()
38
39         for i in self.pg_interfaces:
40             i.admin_up()
41             i.config_ip4()
42             i.resolve_arp()
43             i.config_ip6()
44             i.resolve_ndp()
45             i.enable_mpls()
46
47     def tearDown(self):
48         for i in self.pg_interfaces:
49             i.unconfig_ip4()
50             i.unconfig_ip6()
51             i.disable_mpls()
52
53         super(TestQOS, self).tearDown()
54
55     def test_qos_ip(self):
56         """ QoS Mark/Record IP """
57
58         #
59         # for table 1 map the n=0xff possible values of input QoS mark,
60         # n to 1-n
61         #
62         output = [scapy.compat.chb(0)] * 256
63         for i in range(0, 255):
64             output[i] = scapy.compat.chb(255 - i)
65         os = b''.join(output)
66         rows = [{'outputs': os},
67                 {'outputs': os},
68                 {'outputs': os},
69                 {'outputs': os}]
70
71         self.vapi.qos_egress_map_update(1, rows)
72
73         #
74         # For table 2 (and up) use the value n for everything
75         #
76         output = [scapy.compat.chb(2)] * 256
77         os = b''.join(output)
78         rows = [{'outputs': os},
79                 {'outputs': os},
80                 {'outputs': os},
81                 {'outputs': os}]
82
83         self.vapi.qos_egress_map_update(2, rows)
84
85         output = [scapy.compat.chb(3)] * 256
86         os = b''.join(output)
87         rows = [{'outputs': os},
88                 {'outputs': os},
89                 {'outputs': os},
90                 {'outputs': os}]
91
92         self.vapi.qos_egress_map_update(3, rows)
93
94         output = [scapy.compat.chb(4)] * 256
95         os = b''.join(output)
96         rows = [{'outputs': os},
97                 {'outputs': os},
98                 {'outputs': os},
99                 {'outputs': os}]
100         self.vapi.qos_egress_map_update(4, rows)
101         self.vapi.qos_egress_map_update(5, rows)
102         self.vapi.qos_egress_map_update(6, rows)
103         self.vapi.qos_egress_map_update(7, rows)
104
105         self.logger.info(self.vapi.cli("sh qos eg map"))
106
107         #
108         # Bind interface pgN to table n
109         #
110         self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
111                                           QOS_SOURCE.IP,
112                                           1,
113                                           1)
114         self.vapi.qos_mark_enable_disable(self.pg2.sw_if_index,
115                                           QOS_SOURCE.IP,
116                                           2,
117                                           1)
118         self.vapi.qos_mark_enable_disable(self.pg3.sw_if_index,
119                                           QOS_SOURCE.IP,
120                                           3,
121                                           1)
122         self.vapi.qos_mark_enable_disable(self.pg4.sw_if_index,
123                                           QOS_SOURCE.IP,
124                                           4,
125                                           1)
126
127         #
128         # packets ingress on Pg0
129         #
130         p_v4 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
131                 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, tos=1) /
132                 UDP(sport=1234, dport=1234) /
133                 Raw(scapy.compat.chb(100) * 65))
134         p_v6 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
135                 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6,
136                      tc=1) /
137                 UDP(sport=1234, dport=1234) /
138                 Raw(scapy.compat.chb(100) * 65))
139
140         #
141         # Since we have not yet enabled the recording of the input QoS
142         # from the input iP header, the egress packet's ToS will be unchanged
143         #
144         rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
145         for p in rx:
146             self.assertEqual(p[IP].tos, 1)
147         rx = self.send_and_expect(self.pg0, p_v6 * 65, self.pg1)
148         for p in rx:
149             self.assertEqual(p[IPv6].tc, 1)
150
151         #
152         # Enable QoS recording on IP input for pg0
153         #
154         self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
155                                             QOS_SOURCE.IP,
156                                             1)
157
158         #
159         # send the same packets, this time expect the input TOS of 1
160         # to be mapped to pg1's egress value of 254
161         #
162         rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
163         for p in rx:
164             self.assertEqual(p[IP].tos, 254)
165         rx = self.send_and_expect(self.pg0, p_v6 * 65, self.pg1)
166         for p in rx:
167             self.assertEqual(p[IPv6].tc, 254)
168
169         #
170         # different input ToS to test the mapping
171         #
172         p_v4[IP].tos = 127
173         rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
174         for p in rx:
175             self.assertEqual(p[IP].tos, 128)
176         p_v6[IPv6].tc = 127
177         rx = self.send_and_expect(self.pg0, p_v6 * 65, self.pg1)
178         for p in rx:
179             self.assertEqual(p[IPv6].tc, 128)
180
181         p_v4[IP].tos = 254
182         rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
183         for p in rx:
184             self.assertEqual(p[IP].tos, 1)
185         p_v6[IPv6].tc = 254
186         rx = self.send_and_expect(self.pg0, p_v6 * 65, self.pg1)
187         for p in rx:
188             self.assertEqual(p[IPv6].tc, 1)
189
190         #
191         # send packets out the other interfaces to test the maps are
192         # correctly applied
193         #
194         p_v4[IP].dst = self.pg2.remote_ip4
195         rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg2)
196         for p in rx:
197             self.assertEqual(p[IP].tos, 2)
198
199         p_v4[IP].dst = self.pg3.remote_ip4
200         rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg3)
201         for p in rx:
202             self.assertEqual(p[IP].tos, 3)
203
204         p_v6[IPv6].dst = self.pg3.remote_ip6
205         rx = self.send_and_expect(self.pg0, p_v6 * 65, self.pg3)
206         for p in rx:
207             self.assertEqual(p[IPv6].tc, 3)
208
209         #
210         # remove the map on pg2 and pg3, now expect an unchanged IP tos
211         #
212         self.vapi.qos_mark_enable_disable(self.pg2.sw_if_index,
213                                           QOS_SOURCE.IP,
214                                           2,
215                                           0)
216         self.vapi.qos_mark_enable_disable(self.pg3.sw_if_index,
217                                           QOS_SOURCE.IP,
218                                           3,
219                                           0)
220         self.logger.info(self.vapi.cli("sh int feat pg2"))
221
222         p_v4[IP].dst = self.pg2.remote_ip4
223         rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg2)
224         for p in rx:
225             self.assertEqual(p[IP].tos, 254)
226
227         p_v4[IP].dst = self.pg3.remote_ip4
228         rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg3)
229         for p in rx:
230             self.assertEqual(p[IP].tos, 254)
231
232         #
233         # still mapping out of pg1
234         #
235         p_v4[IP].dst = self.pg1.remote_ip4
236         rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
237         for p in rx:
238             self.assertEqual(p[IP].tos, 1)
239
240         #
241         # disable the input recording on pg0
242         #
243         self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
244                                             QOS_SOURCE.IP,
245                                             0)
246
247         #
248         # back to an unchanged TOS value
249         #
250         rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
251         for p in rx:
252             self.assertEqual(p[IP].tos, 254)
253
254         #
255         # disable the egress map on pg1 and pg4
256         #
257         self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
258                                           QOS_SOURCE.IP,
259                                           1,
260                                           0)
261         self.vapi.qos_mark_enable_disable(self.pg4.sw_if_index,
262                                           QOS_SOURCE.IP,
263                                           4,
264                                           0)
265
266         #
267         # unchanged Tos on pg1
268         #
269         rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
270         for p in rx:
271             self.assertEqual(p[IP].tos, 254)
272
273         #
274         # clean-up the map
275         #
276         self.vapi.qos_egress_map_delete(1)
277         self.vapi.qos_egress_map_delete(4)
278         self.vapi.qos_egress_map_delete(2)
279         self.vapi.qos_egress_map_delete(3)
280         self.vapi.qos_egress_map_delete(5)
281         self.vapi.qos_egress_map_delete(6)
282         self.vapi.qos_egress_map_delete(7)
283
284     def test_qos_mpls(self):
285         """ QoS Mark/Record MPLS """
286
287         #
288         # 255 QoS for all input values
289         #
290         from_ext = 7
291         from_ip = 6
292         from_mpls = 5
293         from_vlan = 4
294         output = [scapy.compat.chb(from_ext)] * 256
295         os1 = b''.join(output)
296         output = [scapy.compat.chb(from_vlan)] * 256
297         os2 = b''.join(output)
298         output = [scapy.compat.chb(from_mpls)] * 256
299         os3 = b''.join(output)
300         output = [scapy.compat.chb(from_ip)] * 256
301         os4 = b''.join(output)
302         rows = [{'outputs': os1},
303                 {'outputs': os2},
304                 {'outputs': os3},
305                 {'outputs': os4}]
306
307         self.vapi.qos_egress_map_update(1, rows)
308
309         #
310         # a route with 1 MPLS label
311         #
312         route_10_0_0_1 = VppIpRoute(self, "10.0.0.1", 32,
313                                     [VppRoutePath(self.pg1.remote_ip4,
314                                                   self.pg1.sw_if_index,
315                                                   labels=[32])])
316         route_10_0_0_1.add_vpp_config()
317
318         #
319         # a route with 3 MPLS labels
320         #
321         route_10_0_0_3 = VppIpRoute(self, "10.0.0.3", 32,
322                                     [VppRoutePath(self.pg1.remote_ip4,
323                                                   self.pg1.sw_if_index,
324                                                   labels=[63, 33, 34])])
325         route_10_0_0_3.add_vpp_config()
326
327         #
328         # enable IP QoS recording on the input Pg0 and MPLS egress marking
329         # on Pg1
330         #
331         self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
332                                             QOS_SOURCE.IP,
333                                             1)
334         self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
335                                           QOS_SOURCE.MPLS,
336                                           1,
337                                           1)
338
339         #
340         # packet that will get one label added and 3 labels added resp.
341         #
342         p_1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
343                IP(src=self.pg0.remote_ip4, dst="10.0.0.1", tos=1) /
344                UDP(sport=1234, dport=1234) /
345                Raw(scapy.compat.chb(100) * 65))
346         p_3 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
347                IP(src=self.pg0.remote_ip4, dst="10.0.0.3", tos=1) /
348                UDP(sport=1234, dport=1234) /
349                Raw(scapy.compat.chb(100) * 65))
350
351         rx = self.send_and_expect(self.pg0, p_1 * 65, self.pg1)
352
353         #
354         # only 3 bits of ToS value in MPLS make sure tos is correct
355         # and the label and EOS bit have not been corrupted
356         #
357         for p in rx:
358             self.assertEqual(p[MPLS].cos, from_ip)
359             self.assertEqual(p[MPLS].label, 32)
360             self.assertEqual(p[MPLS].s, 1)
361         rx = self.send_and_expect(self.pg0, p_3 * 65, self.pg1)
362         for p in rx:
363             self.assertEqual(p[MPLS].cos, from_ip)
364             self.assertEqual(p[MPLS].label, 63)
365             self.assertEqual(p[MPLS].s, 0)
366             h = p[MPLS].payload
367             self.assertEqual(h[MPLS].cos, from_ip)
368             self.assertEqual(h[MPLS].label, 33)
369             self.assertEqual(h[MPLS].s, 0)
370             h = h[MPLS].payload
371             self.assertEqual(h[MPLS].cos, from_ip)
372             self.assertEqual(h[MPLS].label, 34)
373             self.assertEqual(h[MPLS].s, 1)
374
375         #
376         # enable MPLS QoS recording on the input Pg0 and IP egress marking
377         # on Pg1
378         #
379         self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
380                                             QOS_SOURCE.MPLS,
381                                             1)
382         self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
383                                           QOS_SOURCE.IP,
384                                           1,
385                                           1)
386
387         #
388         # MPLS x-connect - COS according to pg1 map
389         #
390         route_32_eos = VppMplsRoute(self, 32, 1,
391                                     [VppRoutePath(self.pg1.remote_ip4,
392                                                   self.pg1.sw_if_index,
393                                                   labels=[VppMplsLabel(33)])])
394         route_32_eos.add_vpp_config()
395
396         p_m1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
397                 MPLS(label=32, cos=3, ttl=2) /
398                 IP(src=self.pg0.remote_ip4, dst="10.0.0.1", tos=1) /
399                 UDP(sport=1234, dport=1234) /
400                 Raw(scapy.compat.chb(100) * 65))
401
402         rx = self.send_and_expect(self.pg0, p_m1 * 65, self.pg1)
403         for p in rx:
404             self.assertEqual(p[MPLS].cos, from_mpls)
405             self.assertEqual(p[MPLS].label, 33)
406             self.assertEqual(p[MPLS].s, 1)
407
408         #
409         # MPLS deag - COS is copied from MPLS to IP
410         #
411         route_33_eos = VppMplsRoute(self, 33, 1,
412                                     [VppRoutePath("0.0.0.0",
413                                                   0xffffffff,
414                                                   nh_table_id=0)])
415         route_33_eos.add_vpp_config()
416
417         route_10_0_0_4 = VppIpRoute(self, "10.0.0.4", 32,
418                                     [VppRoutePath(self.pg1.remote_ip4,
419                                                   self.pg1.sw_if_index)])
420         route_10_0_0_4.add_vpp_config()
421
422         p_m2 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
423                 MPLS(label=33, ttl=2, cos=3) /
424                 IP(src=self.pg0.remote_ip4, dst="10.0.0.4", tos=1) /
425                 UDP(sport=1234, dport=1234) /
426                 Raw(scapy.compat.chb(100) * 65))
427
428         rx = self.send_and_expect(self.pg0, p_m2 * 65, self.pg1)
429
430         for p in rx:
431             self.assertEqual(p[IP].tos, from_mpls)
432
433         #
434         # cleanup
435         #
436         self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
437                                             QOS_SOURCE.IP,
438                                             0)
439         self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
440                                           QOS_SOURCE.MPLS,
441                                           1,
442                                           0)
443         self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
444                                             QOS_SOURCE.MPLS,
445                                             0)
446         self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
447                                           QOS_SOURCE.IP,
448                                           1,
449                                           0)
450         self.vapi.qos_egress_map_delete(1)
451
452     def test_qos_vlan(self):
453         """QoS mark/record VLAN """
454
455         #
456         # QoS for all input values
457         #
458         output = [scapy.compat.chb(0)] * 256
459         for i in range(0, 255):
460             output[i] = scapy.compat.chb(255 - i)
461         os = b''.join(output)
462         rows = [{'outputs': os},
463                 {'outputs': os},
464                 {'outputs': os},
465                 {'outputs': os}]
466
467         self.vapi.qos_egress_map_update(1, rows)
468
469         sub_if = VppDot1QSubint(self, self.pg0, 11)
470
471         sub_if.admin_up()
472         sub_if.config_ip4()
473         sub_if.resolve_arp()
474         sub_if.config_ip6()
475         sub_if.resolve_ndp()
476
477         #
478         # enable VLAN QoS recording/marking on the input Pg0 subinterface and
479         #
480         self.vapi.qos_record_enable_disable(sub_if.sw_if_index,
481                                             QOS_SOURCE.VLAN,
482                                             1)
483         self.vapi.qos_mark_enable_disable(sub_if.sw_if_index,
484                                           QOS_SOURCE.VLAN,
485                                           1,
486                                           1)
487
488         #
489         # IP marking/recording on pg1
490         #
491         self.vapi.qos_record_enable_disable(self.pg1.sw_if_index,
492                                             QOS_SOURCE.IP,
493                                             1)
494         self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
495                                           QOS_SOURCE.IP,
496                                           1,
497                                           1)
498
499         #
500         # a routes to/from sub-interface
501         #
502         route_10_0_0_1 = VppIpRoute(self, "10.0.0.1", 32,
503                                     [VppRoutePath(sub_if.remote_ip4,
504                                                   sub_if.sw_if_index)])
505         route_10_0_0_1.add_vpp_config()
506         route_10_0_0_2 = VppIpRoute(self, "10.0.0.2", 32,
507                                     [VppRoutePath(self.pg1.remote_ip4,
508                                                   self.pg1.sw_if_index)])
509         route_10_0_0_2.add_vpp_config()
510         route_2001_1 = VppIpRoute(self, "2001::1", 128,
511                                   [VppRoutePath(sub_if.remote_ip6,
512                                                 sub_if.sw_if_index,
513                                                 proto=DpoProto.DPO_PROTO_IP6)],
514                                   is_ip6=1)
515         route_2001_1.add_vpp_config()
516         route_2001_2 = VppIpRoute(self, "2001::2", 128,
517                                   [VppRoutePath(self.pg1.remote_ip6,
518                                                 self.pg1.sw_if_index,
519                                                 proto=DpoProto.DPO_PROTO_IP6)],
520                                   is_ip6=1)
521         route_2001_2.add_vpp_config()
522
523         p_v1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
524                 Dot1Q(vlan=11, prio=1) /
525                 IP(src="1.1.1.1", dst="10.0.0.2", tos=1) /
526                 UDP(sport=1234, dport=1234) /
527                 Raw(scapy.compat.chb(100) * 65))
528
529         p_v2 = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
530                 IP(src="1.1.1.1", dst="10.0.0.1", tos=1) /
531                 UDP(sport=1234, dport=1234) /
532                 Raw(scapy.compat.chb(100) * 65))
533
534         rx = self.send_and_expect(self.pg1, p_v2 * 65, self.pg0)
535
536         for p in rx:
537             self.assertEqual(p[Dot1Q].prio, 6)
538
539         rx = self.send_and_expect(self.pg0, p_v1 * 65, self.pg1)
540
541         for p in rx:
542             self.assertEqual(p[IP].tos, 254)
543
544         p_v1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
545                 Dot1Q(vlan=11, prio=2) /
546                 IPv6(src="2001::1", dst="2001::2", tc=1) /
547                 UDP(sport=1234, dport=1234) /
548                 Raw(scapy.compat.chb(100) * 65))
549
550         p_v2 = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
551                 IPv6(src="3001::1", dst="2001::1", tc=1) /
552                 UDP(sport=1234, dport=1234) /
553                 Raw(scapy.compat.chb(100) * 65))
554
555         rx = self.send_and_expect(self.pg1, p_v2 * 65, self.pg0)
556
557         for p in rx:
558             self.assertEqual(p[Dot1Q].prio, 6)
559
560         rx = self.send_and_expect(self.pg0, p_v1 * 65, self.pg1)
561
562         for p in rx:
563             self.assertEqual(p[IPv6].tc, 253)
564
565         #
566         # cleanup
567         #
568         sub_if.unconfig_ip4()
569         sub_if.unconfig_ip6()
570
571         self.vapi.qos_record_enable_disable(sub_if.sw_if_index,
572                                             QOS_SOURCE.VLAN,
573                                             0)
574         self.vapi.qos_mark_enable_disable(sub_if.sw_if_index,
575                                           QOS_SOURCE.VLAN,
576                                           1,
577                                           0)
578         self.vapi.qos_record_enable_disable(self.pg1.sw_if_index,
579                                             QOS_SOURCE.IP,
580                                             0)
581         self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
582                                           QOS_SOURCE.IP,
583                                           1,
584                                           0)
585
586
587 if __name__ == '__main__':
588     unittest.main(testRunner=VppTestRunner)