tests: refactor. Replace literal constant w/ named constant.
[vpp.git] / test / test_qos.py
1 #!/usr/bin/env python
2
3 import unittest
4
5 from framework import VppTestCase, VppTestRunner
6 from vpp_papi_provider import QOS_SOURCE
7 from vpp_sub_interface import VppDot1QSubint
8 from vpp_ip import DpoProto
9 from vpp_ip_route import VppIpRoute, VppRoutePath, VppMplsRoute, \
10     VppMplsLabel, VppMplsTable
11
12 import scapy.compat
13 from scapy.packet import Raw
14 from scapy.layers.l2 import Ether, Dot1Q
15 from scapy.layers.inet import IP, UDP
16 from scapy.layers.inet6 import IPv6
17 from scapy.contrib.mpls import MPLS
18
19 NUM_PKTS = 67
20
21
22 class TestQOS(VppTestCase):
23     """ QOS Test Case """
24
25     @classmethod
26     def setUpClass(cls):
27         super(TestQOS, cls).setUpClass()
28
29     @classmethod
30     def tearDownClass(cls):
31         super(TestQOS, cls).tearDownClass()
32
33     def setUp(self):
34         super(TestQOS, self).setUp()
35
36         self.create_pg_interfaces(range(5))
37
38         tbl = VppMplsTable(self, 0)
39         tbl.add_vpp_config()
40
41         for i in self.pg_interfaces:
42             i.admin_up()
43             i.config_ip4()
44             i.resolve_arp()
45             i.config_ip6()
46             i.resolve_ndp()
47             i.enable_mpls()
48
49     def tearDown(self):
50         for i in self.pg_interfaces:
51             i.unconfig_ip4()
52             i.unconfig_ip6()
53             i.disable_mpls()
54
55         super(TestQOS, self).tearDown()
56
57     def test_qos_ip(self):
58         """ QoS Mark/Record IP """
59
60         #
61         # for table 1 map the n=0xff possible values of input QoS mark,
62         # n to 1-n
63         #
64         output = [scapy.compat.chb(0)] * 256
65         for i in range(0, 255):
66             output[i] = scapy.compat.chb(255 - i)
67         os = b''.join(output)
68         rows = [{'outputs': os},
69                 {'outputs': os},
70                 {'outputs': os},
71                 {'outputs': os}]
72
73         self.vapi.qos_egress_map_update(1, rows)
74
75         #
76         # For table 2 (and up) use the value n for everything
77         #
78         output = [scapy.compat.chb(2)] * 256
79         os = b''.join(output)
80         rows = [{'outputs': os},
81                 {'outputs': os},
82                 {'outputs': os},
83                 {'outputs': os}]
84
85         self.vapi.qos_egress_map_update(2, rows)
86
87         output = [scapy.compat.chb(3)] * 256
88         os = b''.join(output)
89         rows = [{'outputs': os},
90                 {'outputs': os},
91                 {'outputs': os},
92                 {'outputs': os}]
93
94         self.vapi.qos_egress_map_update(3, rows)
95
96         output = [scapy.compat.chb(4)] * 256
97         os = b''.join(output)
98         rows = [{'outputs': os},
99                 {'outputs': os},
100                 {'outputs': os},
101                 {'outputs': os}]
102         self.vapi.qos_egress_map_update(4, rows)
103         self.vapi.qos_egress_map_update(5, rows)
104         self.vapi.qos_egress_map_update(6, rows)
105         self.vapi.qos_egress_map_update(7, rows)
106
107         self.logger.info(self.vapi.cli("sh qos eg map"))
108
109         #
110         # Bind interface pgN to table n
111         #
112         self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
113                                           QOS_SOURCE.IP,
114                                           1,
115                                           1)
116         self.vapi.qos_mark_enable_disable(self.pg2.sw_if_index,
117                                           QOS_SOURCE.IP,
118                                           2,
119                                           1)
120         self.vapi.qos_mark_enable_disable(self.pg3.sw_if_index,
121                                           QOS_SOURCE.IP,
122                                           3,
123                                           1)
124         self.vapi.qos_mark_enable_disable(self.pg4.sw_if_index,
125                                           QOS_SOURCE.IP,
126                                           4,
127                                           1)
128
129         #
130         # packets ingress on Pg0
131         #
132         p_v4 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
133                 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, tos=1) /
134                 UDP(sport=1234, dport=1234) /
135                 Raw(scapy.compat.chb(100) * NUM_PKTS))
136         p_v6 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
137                 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6,
138                      tc=1) /
139                 UDP(sport=1234, dport=1234) /
140                 Raw(scapy.compat.chb(100) * NUM_PKTS))
141
142         #
143         # Since we have not yet enabled the recording of the input QoS
144         # from the input iP header, the egress packet's ToS will be unchanged
145         #
146         rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg1)
147         for p in rx:
148             self.assertEqual(p[IP].tos, 1)
149         rx = self.send_and_expect(self.pg0, p_v6 * NUM_PKTS, self.pg1)
150         for p in rx:
151             self.assertEqual(p[IPv6].tc, 1)
152
153         #
154         # Enable QoS recording on IP input for pg0
155         #
156         self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
157                                             QOS_SOURCE.IP,
158                                             1)
159
160         #
161         # send the same packets, this time expect the input TOS of 1
162         # to be mapped to pg1's egress value of 254
163         #
164         rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg1)
165         for p in rx:
166             self.assertEqual(p[IP].tos, 254)
167         rx = self.send_and_expect(self.pg0, p_v6 * NUM_PKTS, self.pg1)
168         for p in rx:
169             self.assertEqual(p[IPv6].tc, 254)
170
171         #
172         # different input ToS to test the mapping
173         #
174         p_v4[IP].tos = 127
175         rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg1)
176         for p in rx:
177             self.assertEqual(p[IP].tos, 128)
178         p_v6[IPv6].tc = 127
179         rx = self.send_and_expect(self.pg0, p_v6 * NUM_PKTS, self.pg1)
180         for p in rx:
181             self.assertEqual(p[IPv6].tc, 128)
182
183         p_v4[IP].tos = 254
184         rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg1)
185         for p in rx:
186             self.assertEqual(p[IP].tos, 1)
187         p_v6[IPv6].tc = 254
188         rx = self.send_and_expect(self.pg0, p_v6 * NUM_PKTS, self.pg1)
189         for p in rx:
190             self.assertEqual(p[IPv6].tc, 1)
191
192         #
193         # send packets out the other interfaces to test the maps are
194         # correctly applied
195         #
196         p_v4[IP].dst = self.pg2.remote_ip4
197         rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg2)
198         for p in rx:
199             self.assertEqual(p[IP].tos, 2)
200
201         p_v4[IP].dst = self.pg3.remote_ip4
202         rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg3)
203         for p in rx:
204             self.assertEqual(p[IP].tos, 3)
205
206         p_v6[IPv6].dst = self.pg3.remote_ip6
207         rx = self.send_and_expect(self.pg0, p_v6 * NUM_PKTS, self.pg3)
208         for p in rx:
209             self.assertEqual(p[IPv6].tc, 3)
210
211         #
212         # remove the map on pg2 and pg3, now expect an unchanged IP tos
213         #
214         self.vapi.qos_mark_enable_disable(self.pg2.sw_if_index,
215                                           QOS_SOURCE.IP,
216                                           2,
217                                           0)
218         self.vapi.qos_mark_enable_disable(self.pg3.sw_if_index,
219                                           QOS_SOURCE.IP,
220                                           3,
221                                           0)
222         self.logger.info(self.vapi.cli("sh int feat pg2"))
223
224         p_v4[IP].dst = self.pg2.remote_ip4
225         rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg2)
226         for p in rx:
227             self.assertEqual(p[IP].tos, 254)
228
229         p_v4[IP].dst = self.pg3.remote_ip4
230         rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg3)
231         for p in rx:
232             self.assertEqual(p[IP].tos, 254)
233
234         #
235         # still mapping out of pg1
236         #
237         p_v4[IP].dst = self.pg1.remote_ip4
238         rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg1)
239         for p in rx:
240             self.assertEqual(p[IP].tos, 1)
241
242         #
243         # disable the input recording on pg0
244         #
245         self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
246                                             QOS_SOURCE.IP,
247                                             0)
248
249         #
250         # back to an unchanged TOS value
251         #
252         rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg1)
253         for p in rx:
254             self.assertEqual(p[IP].tos, 254)
255
256         #
257         # disable the egress map on pg1 and pg4
258         #
259         self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
260                                           QOS_SOURCE.IP,
261                                           1,
262                                           0)
263         self.vapi.qos_mark_enable_disable(self.pg4.sw_if_index,
264                                           QOS_SOURCE.IP,
265                                           4,
266                                           0)
267
268         #
269         # unchanged Tos on pg1
270         #
271         rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg1)
272         for p in rx:
273             self.assertEqual(p[IP].tos, 254)
274
275         #
276         # clean-up the map
277         #
278         self.vapi.qos_egress_map_delete(1)
279         self.vapi.qos_egress_map_delete(4)
280         self.vapi.qos_egress_map_delete(2)
281         self.vapi.qos_egress_map_delete(3)
282         self.vapi.qos_egress_map_delete(5)
283         self.vapi.qos_egress_map_delete(6)
284         self.vapi.qos_egress_map_delete(7)
285
286     def test_qos_mpls(self):
287         """ QoS Mark/Record MPLS """
288
289         #
290         # 255 QoS for all input values
291         #
292         from_ext = 7
293         from_ip = 6
294         from_mpls = 5
295         from_vlan = 4
296         output = [scapy.compat.chb(from_ext)] * 256
297         os1 = b''.join(output)
298         output = [scapy.compat.chb(from_vlan)] * 256
299         os2 = b''.join(output)
300         output = [scapy.compat.chb(from_mpls)] * 256
301         os3 = b''.join(output)
302         output = [scapy.compat.chb(from_ip)] * 256
303         os4 = b''.join(output)
304         rows = [{'outputs': os1},
305                 {'outputs': os2},
306                 {'outputs': os3},
307                 {'outputs': os4}]
308
309         self.vapi.qos_egress_map_update(1, rows)
310
311         #
312         # a route with 1 MPLS label
313         #
314         route_10_0_0_1 = VppIpRoute(self, "10.0.0.1", 32,
315                                     [VppRoutePath(self.pg1.remote_ip4,
316                                                   self.pg1.sw_if_index,
317                                                   labels=[32])])
318         route_10_0_0_1.add_vpp_config()
319
320         #
321         # a route with 3 MPLS labels
322         #
323         route_10_0_0_3 = VppIpRoute(self, "10.0.0.3", 32,
324                                     [VppRoutePath(self.pg1.remote_ip4,
325                                                   self.pg1.sw_if_index,
326                                                   labels=[63, 33, 34])])
327         route_10_0_0_3.add_vpp_config()
328
329         #
330         # enable IP QoS recording on the input Pg0 and MPLS egress marking
331         # on Pg1
332         #
333         self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
334                                             QOS_SOURCE.IP,
335                                             1)
336         self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
337                                           QOS_SOURCE.MPLS,
338                                           1,
339                                           1)
340
341         #
342         # packet that will get one label added and 3 labels added resp.
343         #
344         p_1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
345                IP(src=self.pg0.remote_ip4, dst="10.0.0.1", tos=1) /
346                UDP(sport=1234, dport=1234) /
347                Raw(scapy.compat.chb(100) * NUM_PKTS))
348         p_3 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
349                IP(src=self.pg0.remote_ip4, dst="10.0.0.3", tos=1) /
350                UDP(sport=1234, dport=1234) /
351                Raw(scapy.compat.chb(100) * NUM_PKTS))
352
353         rx = self.send_and_expect(self.pg0, p_1 * NUM_PKTS, self.pg1)
354
355         #
356         # only 3 bits of ToS value in MPLS make sure tos is correct
357         # and the label and EOS bit have not been corrupted
358         #
359         for p in rx:
360             self.assertEqual(p[MPLS].cos, from_ip)
361             self.assertEqual(p[MPLS].label, 32)
362             self.assertEqual(p[MPLS].s, 1)
363         rx = self.send_and_expect(self.pg0, p_3 * NUM_PKTS, self.pg1)
364         for p in rx:
365             self.assertEqual(p[MPLS].cos, from_ip)
366             self.assertEqual(p[MPLS].label, 63)
367             self.assertEqual(p[MPLS].s, 0)
368             h = p[MPLS].payload
369             self.assertEqual(h[MPLS].cos, from_ip)
370             self.assertEqual(h[MPLS].label, 33)
371             self.assertEqual(h[MPLS].s, 0)
372             h = h[MPLS].payload
373             self.assertEqual(h[MPLS].cos, from_ip)
374             self.assertEqual(h[MPLS].label, 34)
375             self.assertEqual(h[MPLS].s, 1)
376
377         #
378         # enable MPLS QoS recording on the input Pg0 and IP egress marking
379         # on Pg1
380         #
381         self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
382                                             QOS_SOURCE.MPLS,
383                                             1)
384         self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
385                                           QOS_SOURCE.IP,
386                                           1,
387                                           1)
388
389         #
390         # MPLS x-connect - COS according to pg1 map
391         #
392         route_32_eos = VppMplsRoute(self, 32, 1,
393                                     [VppRoutePath(self.pg1.remote_ip4,
394                                                   self.pg1.sw_if_index,
395                                                   labels=[VppMplsLabel(33)])])
396         route_32_eos.add_vpp_config()
397
398         p_m1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
399                 MPLS(label=32, cos=3, ttl=2) /
400                 IP(src=self.pg0.remote_ip4, dst="10.0.0.1", tos=1) /
401                 UDP(sport=1234, dport=1234) /
402                 Raw(scapy.compat.chb(100) * NUM_PKTS))
403
404         rx = self.send_and_expect(self.pg0, p_m1 * NUM_PKTS, self.pg1)
405         for p in rx:
406             self.assertEqual(p[MPLS].cos, from_mpls)
407             self.assertEqual(p[MPLS].label, 33)
408             self.assertEqual(p[MPLS].s, 1)
409
410         #
411         # MPLS deag - COS is copied from MPLS to IP
412         #
413         route_33_eos = VppMplsRoute(self, 33, 1,
414                                     [VppRoutePath("0.0.0.0",
415                                                   0xffffffff,
416                                                   nh_table_id=0)])
417         route_33_eos.add_vpp_config()
418
419         route_10_0_0_4 = VppIpRoute(self, "10.0.0.4", 32,
420                                     [VppRoutePath(self.pg1.remote_ip4,
421                                                   self.pg1.sw_if_index)])
422         route_10_0_0_4.add_vpp_config()
423
424         p_m2 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
425                 MPLS(label=33, ttl=2, cos=3) /
426                 IP(src=self.pg0.remote_ip4, dst="10.0.0.4", tos=1) /
427                 UDP(sport=1234, dport=1234) /
428                 Raw(scapy.compat.chb(100) * NUM_PKTS))
429
430         rx = self.send_and_expect(self.pg0, p_m2 * NUM_PKTS, self.pg1)
431
432         for p in rx:
433             self.assertEqual(p[IP].tos, from_mpls)
434
435         #
436         # cleanup
437         #
438         self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
439                                             QOS_SOURCE.IP,
440                                             0)
441         self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
442                                           QOS_SOURCE.MPLS,
443                                           1,
444                                           0)
445         self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
446                                             QOS_SOURCE.MPLS,
447                                             0)
448         self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
449                                           QOS_SOURCE.IP,
450                                           1,
451                                           0)
452         self.vapi.qos_egress_map_delete(1)
453
454     def test_qos_vlan(self):
455         """QoS mark/record VLAN """
456
457         #
458         # QoS for all input values
459         #
460         output = [scapy.compat.chb(0)] * 256
461         for i in range(0, 255):
462             output[i] = scapy.compat.chb(255 - i)
463         os = b''.join(output)
464         rows = [{'outputs': os},
465                 {'outputs': os},
466                 {'outputs': os},
467                 {'outputs': os}]
468
469         self.vapi.qos_egress_map_update(1, rows)
470
471         sub_if = VppDot1QSubint(self, self.pg0, 11)
472
473         sub_if.admin_up()
474         sub_if.config_ip4()
475         sub_if.resolve_arp()
476         sub_if.config_ip6()
477         sub_if.resolve_ndp()
478
479         #
480         # enable VLAN QoS recording/marking on the input Pg0 subinterface and
481         #
482         self.vapi.qos_record_enable_disable(sub_if.sw_if_index,
483                                             QOS_SOURCE.VLAN,
484                                             1)
485         self.vapi.qos_mark_enable_disable(sub_if.sw_if_index,
486                                           QOS_SOURCE.VLAN,
487                                           1,
488                                           1)
489
490         #
491         # IP marking/recording on pg1
492         #
493         self.vapi.qos_record_enable_disable(self.pg1.sw_if_index,
494                                             QOS_SOURCE.IP,
495                                             1)
496         self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
497                                           QOS_SOURCE.IP,
498                                           1,
499                                           1)
500
501         #
502         # a routes to/from sub-interface
503         #
504         route_10_0_0_1 = VppIpRoute(self, "10.0.0.1", 32,
505                                     [VppRoutePath(sub_if.remote_ip4,
506                                                   sub_if.sw_if_index)])
507         route_10_0_0_1.add_vpp_config()
508         route_10_0_0_2 = VppIpRoute(self, "10.0.0.2", 32,
509                                     [VppRoutePath(self.pg1.remote_ip4,
510                                                   self.pg1.sw_if_index)])
511         route_10_0_0_2.add_vpp_config()
512         route_2001_1 = VppIpRoute(self, "2001::1", 128,
513                                   [VppRoutePath(sub_if.remote_ip6,
514                                                 sub_if.sw_if_index,
515                                                 proto=DpoProto.DPO_PROTO_IP6)],
516                                   is_ip6=1)
517         route_2001_1.add_vpp_config()
518         route_2001_2 = VppIpRoute(self, "2001::2", 128,
519                                   [VppRoutePath(self.pg1.remote_ip6,
520                                                 self.pg1.sw_if_index,
521                                                 proto=DpoProto.DPO_PROTO_IP6)],
522                                   is_ip6=1)
523         route_2001_2.add_vpp_config()
524
525         p_v1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
526                 Dot1Q(vlan=11, prio=1) /
527                 IP(src="1.1.1.1", dst="10.0.0.2", tos=1) /
528                 UDP(sport=1234, dport=1234) /
529                 Raw(scapy.compat.chb(100) * NUM_PKTS))
530
531         p_v2 = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
532                 IP(src="1.1.1.1", dst="10.0.0.1", tos=1) /
533                 UDP(sport=1234, dport=1234) /
534                 Raw(scapy.compat.chb(100) * NUM_PKTS))
535
536         rx = self.send_and_expect(self.pg1, p_v2 * NUM_PKTS, self.pg0)
537
538         for p in rx:
539             self.assertEqual(p[Dot1Q].prio, 6)
540
541         rx = self.send_and_expect(self.pg0, p_v1 * NUM_PKTS, self.pg1)
542
543         for p in rx:
544             self.assertEqual(p[IP].tos, 254)
545
546         p_v1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
547                 Dot1Q(vlan=11, prio=2) /
548                 IPv6(src="2001::1", dst="2001::2", tc=1) /
549                 UDP(sport=1234, dport=1234) /
550                 Raw(scapy.compat.chb(100) * NUM_PKTS))
551
552         p_v2 = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
553                 IPv6(src="3001::1", dst="2001::1", tc=1) /
554                 UDP(sport=1234, dport=1234) /
555                 Raw(scapy.compat.chb(100) * NUM_PKTS))
556
557         rx = self.send_and_expect(self.pg1, p_v2 * NUM_PKTS, self.pg0)
558
559         for p in rx:
560             self.assertEqual(p[Dot1Q].prio, 6)
561
562         rx = self.send_and_expect(self.pg0, p_v1 * NUM_PKTS, self.pg1)
563
564         for p in rx:
565             self.assertEqual(p[IPv6].tc, 253)
566
567         #
568         # cleanup
569         #
570         sub_if.unconfig_ip4()
571         sub_if.unconfig_ip6()
572
573         self.vapi.qos_record_enable_disable(sub_if.sw_if_index,
574                                             QOS_SOURCE.VLAN,
575                                             0)
576         self.vapi.qos_mark_enable_disable(sub_if.sw_if_index,
577                                           QOS_SOURCE.VLAN,
578                                           1,
579                                           0)
580         self.vapi.qos_record_enable_disable(self.pg1.sw_if_index,
581                                             QOS_SOURCE.IP,
582                                             0)
583         self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
584                                           QOS_SOURCE.IP,
585                                           1,
586                                           0)
587
588
589 if __name__ == '__main__':
590     unittest.main(testRunner=VppTestRunner)