Typos. A bunch of typos I've been collecting.
[vpp.git] / test / test_qos.py
1 #!/usr/bin/env python
2
3 import unittest
4
5 from framework import VppTestCase, VppTestRunner
6 from vpp_papi_provider import QOS_SOURCE
7 from vpp_sub_interface import VppDot1QSubint
8 from vpp_ip import DpoProto
9 from vpp_ip_route import VppIpRoute, VppRoutePath, VppMplsRoute, \
10     VppMplsLabel, VppMplsTable
11
12 import scapy.compat
13 from scapy.packet import Raw
14 from scapy.layers.l2 import Ether, Dot1Q
15 from scapy.layers.inet import IP, UDP
16 from scapy.layers.inet6 import IPv6
17 from scapy.contrib.mpls import MPLS
18
19
20 class TestQOS(VppTestCase):
21     """ QOS Test Case """
22
23     def setUp(self):
24         super(TestQOS, self).setUp()
25
26         self.create_pg_interfaces(range(5))
27
28         tbl = VppMplsTable(self, 0)
29         tbl.add_vpp_config()
30
31         for i in self.pg_interfaces:
32             i.admin_up()
33             i.config_ip4()
34             i.resolve_arp()
35             i.config_ip6()
36             i.resolve_ndp()
37             i.enable_mpls()
38
39     def tearDown(self):
40         for i in self.pg_interfaces:
41             i.unconfig_ip4()
42             i.unconfig_ip6()
43             i.disable_mpls()
44
45         super(TestQOS, self).tearDown()
46
47     def test_qos_ip(self):
48         """ QoS Mark/Record IP """
49
50         #
51         # for table 1 map the n=0xff possible values of input QoS mark,
52         # n to 1-n
53         #
54         output = [scapy.compat.chb(0)] * 256
55         for i in range(0, 255):
56             output[i] = scapy.compat.chb(255 - i)
57         os = b''.join(output)
58         rows = [{'outputs': os},
59                 {'outputs': os},
60                 {'outputs': os},
61                 {'outputs': os}]
62
63         self.vapi.qos_egress_map_update(1, rows)
64
65         #
66         # For table 2 (and up) use the value n for everything
67         #
68         output = [scapy.compat.chb(2)] * 256
69         os = b''.join(output)
70         rows = [{'outputs': os},
71                 {'outputs': os},
72                 {'outputs': os},
73                 {'outputs': os}]
74
75         self.vapi.qos_egress_map_update(2, rows)
76
77         output = [scapy.compat.chb(3)] * 256
78         os = b''.join(output)
79         rows = [{'outputs': os},
80                 {'outputs': os},
81                 {'outputs': os},
82                 {'outputs': os}]
83
84         self.vapi.qos_egress_map_update(3, rows)
85
86         output = [scapy.compat.chb(4)] * 256
87         os = b''.join(output)
88         rows = [{'outputs': os},
89                 {'outputs': os},
90                 {'outputs': os},
91                 {'outputs': os}]
92         self.vapi.qos_egress_map_update(4, rows)
93         self.vapi.qos_egress_map_update(5, rows)
94         self.vapi.qos_egress_map_update(6, rows)
95         self.vapi.qos_egress_map_update(7, rows)
96
97         self.logger.info(self.vapi.cli("sh qos eg map"))
98
99         #
100         # Bind interface pgN to table n
101         #
102         self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
103                                           QOS_SOURCE.IP,
104                                           1,
105                                           1)
106         self.vapi.qos_mark_enable_disable(self.pg2.sw_if_index,
107                                           QOS_SOURCE.IP,
108                                           2,
109                                           1)
110         self.vapi.qos_mark_enable_disable(self.pg3.sw_if_index,
111                                           QOS_SOURCE.IP,
112                                           3,
113                                           1)
114         self.vapi.qos_mark_enable_disable(self.pg4.sw_if_index,
115                                           QOS_SOURCE.IP,
116                                           4,
117                                           1)
118
119         #
120         # packets ingress on Pg0
121         #
122         p_v4 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
123                 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, tos=1) /
124                 UDP(sport=1234, dport=1234) /
125                 Raw(scapy.compat.chb(100) * 65))
126         p_v6 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
127                 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6,
128                      tc=1) /
129                 UDP(sport=1234, dport=1234) /
130                 Raw(scapy.compat.chb(100) * 65))
131
132         #
133         # Since we have not yet enabled the recording of the input QoS
134         # from the input iP header, the egress packet's ToS will be unchanged
135         #
136         rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
137         for p in rx:
138             self.assertEqual(p[IP].tos, 1)
139         rx = self.send_and_expect(self.pg0, p_v6 * 65, self.pg1)
140         for p in rx:
141             self.assertEqual(p[IPv6].tc, 1)
142
143         #
144         # Enable QoS recording on IP input for pg0
145         #
146         self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
147                                             QOS_SOURCE.IP,
148                                             1)
149
150         #
151         # send the same packets, this time expect the input TOS of 1
152         # to be mapped to pg1's egress value of 254
153         #
154         rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
155         for p in rx:
156             self.assertEqual(p[IP].tos, 254)
157         rx = self.send_and_expect(self.pg0, p_v6 * 65, self.pg1)
158         for p in rx:
159             self.assertEqual(p[IPv6].tc, 254)
160
161         #
162         # different input ToS to test the mapping
163         #
164         p_v4[IP].tos = 127
165         rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
166         for p in rx:
167             self.assertEqual(p[IP].tos, 128)
168         p_v6[IPv6].tc = 127
169         rx = self.send_and_expect(self.pg0, p_v6 * 65, self.pg1)
170         for p in rx:
171             self.assertEqual(p[IPv6].tc, 128)
172
173         p_v4[IP].tos = 254
174         rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
175         for p in rx:
176             self.assertEqual(p[IP].tos, 1)
177         p_v6[IPv6].tc = 254
178         rx = self.send_and_expect(self.pg0, p_v6 * 65, self.pg1)
179         for p in rx:
180             self.assertEqual(p[IPv6].tc, 1)
181
182         #
183         # send packets out the other interfaces to test the maps are
184         # correctly applied
185         #
186         p_v4[IP].dst = self.pg2.remote_ip4
187         rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg2)
188         for p in rx:
189             self.assertEqual(p[IP].tos, 2)
190
191         p_v4[IP].dst = self.pg3.remote_ip4
192         rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg3)
193         for p in rx:
194             self.assertEqual(p[IP].tos, 3)
195
196         p_v6[IPv6].dst = self.pg3.remote_ip6
197         rx = self.send_and_expect(self.pg0, p_v6 * 65, self.pg3)
198         for p in rx:
199             self.assertEqual(p[IPv6].tc, 3)
200
201         #
202         # remove the map on pg2 and pg3, now expect an unchanged IP tos
203         #
204         self.vapi.qos_mark_enable_disable(self.pg2.sw_if_index,
205                                           QOS_SOURCE.IP,
206                                           2,
207                                           0)
208         self.vapi.qos_mark_enable_disable(self.pg3.sw_if_index,
209                                           QOS_SOURCE.IP,
210                                           3,
211                                           0)
212         self.logger.info(self.vapi.cli("sh int feat pg2"))
213
214         p_v4[IP].dst = self.pg2.remote_ip4
215         rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg2)
216         for p in rx:
217             self.assertEqual(p[IP].tos, 254)
218
219         p_v4[IP].dst = self.pg3.remote_ip4
220         rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg3)
221         for p in rx:
222             self.assertEqual(p[IP].tos, 254)
223
224         #
225         # still mapping out of pg1
226         #
227         p_v4[IP].dst = self.pg1.remote_ip4
228         rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
229         for p in rx:
230             self.assertEqual(p[IP].tos, 1)
231
232         #
233         # disable the input recording on pg0
234         #
235         self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
236                                             QOS_SOURCE.IP,
237                                             0)
238
239         #
240         # back to an unchanged TOS value
241         #
242         rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
243         for p in rx:
244             self.assertEqual(p[IP].tos, 254)
245
246         #
247         # disable the egress map on pg1 and pg4
248         #
249         self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
250                                           QOS_SOURCE.IP,
251                                           1,
252                                           0)
253         self.vapi.qos_mark_enable_disable(self.pg4.sw_if_index,
254                                           QOS_SOURCE.IP,
255                                           4,
256                                           0)
257
258         #
259         # unchanged Tos on pg1
260         #
261         rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
262         for p in rx:
263             self.assertEqual(p[IP].tos, 254)
264
265         #
266         # clean-up the map
267         #
268         self.vapi.qos_egress_map_delete(1)
269         self.vapi.qos_egress_map_delete(4)
270         self.vapi.qos_egress_map_delete(2)
271         self.vapi.qos_egress_map_delete(3)
272         self.vapi.qos_egress_map_delete(5)
273         self.vapi.qos_egress_map_delete(6)
274         self.vapi.qos_egress_map_delete(7)
275
276     def test_qos_mpls(self):
277         """ QoS Mark/Record MPLS """
278
279         #
280         # 255 QoS for all input values
281         #
282         from_ext = 7
283         from_ip = 6
284         from_mpls = 5
285         from_vlan = 4
286         output = [scapy.compat.chb(from_ext)] * 256
287         os1 = b''.join(output)
288         output = [scapy.compat.chb(from_vlan)] * 256
289         os2 = b''.join(output)
290         output = [scapy.compat.chb(from_mpls)] * 256
291         os3 = b''.join(output)
292         output = [scapy.compat.chb(from_ip)] * 256
293         os4 = b''.join(output)
294         rows = [{'outputs': os1},
295                 {'outputs': os2},
296                 {'outputs': os3},
297                 {'outputs': os4}]
298
299         self.vapi.qos_egress_map_update(1, rows)
300
301         #
302         # a route with 1 MPLS label
303         #
304         route_10_0_0_1 = VppIpRoute(self, "10.0.0.1", 32,
305                                     [VppRoutePath(self.pg1.remote_ip4,
306                                                   self.pg1.sw_if_index,
307                                                   labels=[32])])
308         route_10_0_0_1.add_vpp_config()
309
310         #
311         # a route with 3 MPLS labels
312         #
313         route_10_0_0_3 = VppIpRoute(self, "10.0.0.3", 32,
314                                     [VppRoutePath(self.pg1.remote_ip4,
315                                                   self.pg1.sw_if_index,
316                                                   labels=[63, 33, 34])])
317         route_10_0_0_3.add_vpp_config()
318
319         #
320         # enable IP QoS recording on the input Pg0 and MPLS egress marking
321         # on Pg1
322         #
323         self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
324                                             QOS_SOURCE.IP,
325                                             1)
326         self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
327                                           QOS_SOURCE.MPLS,
328                                           1,
329                                           1)
330
331         #
332         # packet that will get one label added and 3 labels added resp.
333         #
334         p_1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
335                IP(src=self.pg0.remote_ip4, dst="10.0.0.1", tos=1) /
336                UDP(sport=1234, dport=1234) /
337                Raw(scapy.compat.chb(100) * 65))
338         p_3 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
339                IP(src=self.pg0.remote_ip4, dst="10.0.0.3", tos=1) /
340                UDP(sport=1234, dport=1234) /
341                Raw(scapy.compat.chb(100) * 65))
342
343         rx = self.send_and_expect(self.pg0, p_1 * 65, self.pg1)
344
345         #
346         # only 3 bits of ToS value in MPLS make sure tos is correct
347         # and the label and EOS bit have not been corrupted
348         #
349         for p in rx:
350             self.assertEqual(p[MPLS].cos, from_ip)
351             self.assertEqual(p[MPLS].label, 32)
352             self.assertEqual(p[MPLS].s, 1)
353         rx = self.send_and_expect(self.pg0, p_3 * 65, self.pg1)
354         for p in rx:
355             self.assertEqual(p[MPLS].cos, from_ip)
356             self.assertEqual(p[MPLS].label, 63)
357             self.assertEqual(p[MPLS].s, 0)
358             h = p[MPLS].payload
359             self.assertEqual(h[MPLS].cos, from_ip)
360             self.assertEqual(h[MPLS].label, 33)
361             self.assertEqual(h[MPLS].s, 0)
362             h = h[MPLS].payload
363             self.assertEqual(h[MPLS].cos, from_ip)
364             self.assertEqual(h[MPLS].label, 34)
365             self.assertEqual(h[MPLS].s, 1)
366
367         #
368         # enable MPLS QoS recording on the input Pg0 and IP egress marking
369         # on Pg1
370         #
371         self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
372                                             QOS_SOURCE.MPLS,
373                                             1)
374         self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
375                                           QOS_SOURCE.IP,
376                                           1,
377                                           1)
378
379         #
380         # MPLS x-connect - COS according to pg1 map
381         #
382         route_32_eos = VppMplsRoute(self, 32, 1,
383                                     [VppRoutePath(self.pg1.remote_ip4,
384                                                   self.pg1.sw_if_index,
385                                                   labels=[VppMplsLabel(33)])])
386         route_32_eos.add_vpp_config()
387
388         p_m1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
389                 MPLS(label=32, cos=3, ttl=2) /
390                 IP(src=self.pg0.remote_ip4, dst="10.0.0.1", tos=1) /
391                 UDP(sport=1234, dport=1234) /
392                 Raw(scapy.compat.chb(100) * 65))
393
394         rx = self.send_and_expect(self.pg0, p_m1 * 65, self.pg1)
395         for p in rx:
396             self.assertEqual(p[MPLS].cos, from_mpls)
397             self.assertEqual(p[MPLS].label, 33)
398             self.assertEqual(p[MPLS].s, 1)
399
400         #
401         # MPLS deag - COS is copied from MPLS to IP
402         #
403         route_33_eos = VppMplsRoute(self, 33, 1,
404                                     [VppRoutePath("0.0.0.0",
405                                                   0xffffffff,
406                                                   nh_table_id=0)])
407         route_33_eos.add_vpp_config()
408
409         route_10_0_0_4 = VppIpRoute(self, "10.0.0.4", 32,
410                                     [VppRoutePath(self.pg1.remote_ip4,
411                                                   self.pg1.sw_if_index)])
412         route_10_0_0_4.add_vpp_config()
413
414         p_m2 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
415                 MPLS(label=33, ttl=2, cos=3) /
416                 IP(src=self.pg0.remote_ip4, dst="10.0.0.4", tos=1) /
417                 UDP(sport=1234, dport=1234) /
418                 Raw(scapy.compat.chb(100) * 65))
419
420         rx = self.send_and_expect(self.pg0, p_m2 * 65, self.pg1)
421
422         for p in rx:
423             self.assertEqual(p[IP].tos, from_mpls)
424
425         #
426         # cleanup
427         #
428         self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
429                                             QOS_SOURCE.IP,
430                                             0)
431         self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
432                                           QOS_SOURCE.MPLS,
433                                           1,
434                                           0)
435         self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
436                                             QOS_SOURCE.MPLS,
437                                             0)
438         self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
439                                           QOS_SOURCE.IP,
440                                           1,
441                                           0)
442         self.vapi.qos_egress_map_delete(1)
443
444     def test_qos_vlan(self):
445         """QoS mark/record VLAN """
446
447         #
448         # QoS for all input values
449         #
450         output = [scapy.compat.chb(0)] * 256
451         for i in range(0, 255):
452             output[i] = scapy.compat.chb(255 - i)
453         os = b''.join(output)
454         rows = [{'outputs': os},
455                 {'outputs': os},
456                 {'outputs': os},
457                 {'outputs': os}]
458
459         self.vapi.qos_egress_map_update(1, rows)
460
461         sub_if = VppDot1QSubint(self, self.pg0, 11)
462
463         sub_if.admin_up()
464         sub_if.config_ip4()
465         sub_if.resolve_arp()
466         sub_if.config_ip6()
467         sub_if.resolve_ndp()
468
469         #
470         # enable VLAN QoS recording/marking on the input Pg0 subinterface and
471         #
472         self.vapi.qos_record_enable_disable(sub_if.sw_if_index,
473                                             QOS_SOURCE.VLAN,
474                                             1)
475         self.vapi.qos_mark_enable_disable(sub_if.sw_if_index,
476                                           QOS_SOURCE.VLAN,
477                                           1,
478                                           1)
479
480         #
481         # IP marking/recording on pg1
482         #
483         self.vapi.qos_record_enable_disable(self.pg1.sw_if_index,
484                                             QOS_SOURCE.IP,
485                                             1)
486         self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
487                                           QOS_SOURCE.IP,
488                                           1,
489                                           1)
490
491         #
492         # a routes to/from sub-interface
493         #
494         route_10_0_0_1 = VppIpRoute(self, "10.0.0.1", 32,
495                                     [VppRoutePath(sub_if.remote_ip4,
496                                                   sub_if.sw_if_index)])
497         route_10_0_0_1.add_vpp_config()
498         route_10_0_0_2 = VppIpRoute(self, "10.0.0.2", 32,
499                                     [VppRoutePath(self.pg1.remote_ip4,
500                                                   self.pg1.sw_if_index)])
501         route_10_0_0_2.add_vpp_config()
502         route_2001_1 = VppIpRoute(self, "2001::1", 128,
503                                   [VppRoutePath(sub_if.remote_ip6,
504                                                 sub_if.sw_if_index,
505                                                 proto=DpoProto.DPO_PROTO_IP6)],
506                                   is_ip6=1)
507         route_2001_1.add_vpp_config()
508         route_2001_2 = VppIpRoute(self, "2001::2", 128,
509                                   [VppRoutePath(self.pg1.remote_ip6,
510                                                 self.pg1.sw_if_index,
511                                                 proto=DpoProto.DPO_PROTO_IP6)],
512                                   is_ip6=1)
513         route_2001_2.add_vpp_config()
514
515         p_v1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
516                 Dot1Q(vlan=11, prio=1) /
517                 IP(src="1.1.1.1", dst="10.0.0.2", tos=1) /
518                 UDP(sport=1234, dport=1234) /
519                 Raw(scapy.compat.chb(100) * 65))
520
521         p_v2 = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
522                 IP(src="1.1.1.1", dst="10.0.0.1", tos=1) /
523                 UDP(sport=1234, dport=1234) /
524                 Raw(scapy.compat.chb(100) * 65))
525
526         rx = self.send_and_expect(self.pg1, p_v2 * 65, self.pg0)
527
528         for p in rx:
529             self.assertEqual(p[Dot1Q].prio, 6)
530
531         rx = self.send_and_expect(self.pg0, p_v1 * 65, self.pg1)
532
533         for p in rx:
534             self.assertEqual(p[IP].tos, 254)
535
536         p_v1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
537                 Dot1Q(vlan=11, prio=2) /
538                 IPv6(src="2001::1", dst="2001::2", tc=1) /
539                 UDP(sport=1234, dport=1234) /
540                 Raw(scapy.compat.chb(100) * 65))
541
542         p_v2 = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
543                 IPv6(src="3001::1", dst="2001::1", tc=1) /
544                 UDP(sport=1234, dport=1234) /
545                 Raw(scapy.compat.chb(100) * 65))
546
547         rx = self.send_and_expect(self.pg1, p_v2 * 65, self.pg0)
548
549         for p in rx:
550             self.assertEqual(p[Dot1Q].prio, 6)
551
552         rx = self.send_and_expect(self.pg0, p_v1 * 65, self.pg1)
553
554         for p in rx:
555             self.assertEqual(p[IPv6].tc, 253)
556
557         #
558         # cleanup
559         #
560         sub_if.unconfig_ip4()
561         sub_if.unconfig_ip6()
562
563         self.vapi.qos_record_enable_disable(sub_if.sw_if_index,
564                                             QOS_SOURCE.VLAN,
565                                             0)
566         self.vapi.qos_mark_enable_disable(sub_if.sw_if_index,
567                                           QOS_SOURCE.VLAN,
568                                           1,
569                                           0)
570         self.vapi.qos_record_enable_disable(self.pg1.sw_if_index,
571                                             QOS_SOURCE.IP,
572                                             0)
573         self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
574                                           QOS_SOURCE.IP,
575                                           1,
576                                           0)
577
578
579 if __name__ == '__main__':
580     unittest.main(testRunner=VppTestRunner)