MAP: Convert from DPO to input feature.
[vpp.git] / test / test_qos.py
1 #!/usr/bin/env python
2
3 import unittest
4
5 from framework import VppTestCase, VppTestRunner
6 from vpp_papi_provider import QOS_SOURCE
7 from vpp_sub_interface import VppDot1QSubint
8 from vpp_ip import DpoProto
9 from vpp_ip_route import VppIpRoute, VppRoutePath, VppMplsRoute, \
10     VppMplsLabel, VppMplsTable
11
12 from scapy.packet import Raw
13 from scapy.layers.l2 import Ether, Dot1Q
14 from scapy.layers.inet import IP, UDP
15 from scapy.layers.inet6 import IPv6
16 from scapy.contrib.mpls import MPLS
17
18
19 class TestQOS(VppTestCase):
20     """ QOS Test Case """
21
22     def setUp(self):
23         super(TestQOS, self).setUp()
24
25         self.create_pg_interfaces(range(5))
26
27         tbl = VppMplsTable(self, 0)
28         tbl.add_vpp_config()
29
30         for i in self.pg_interfaces:
31             i.admin_up()
32             i.config_ip4()
33             i.resolve_arp()
34             i.config_ip6()
35             i.resolve_ndp()
36             i.enable_mpls()
37
38     def tearDown(self):
39         for i in self.pg_interfaces:
40             i.unconfig_ip4()
41             i.unconfig_ip6()
42             i.disable_mpls()
43
44         super(TestQOS, self).tearDown()
45
46     def test_qos_ip(self):
47         """ QoS Mark/Record IP """
48
49         #
50         # for table 1 map the n=0xff possible values of input QoS mark,
51         # n to 1-n
52         #
53         output = [chr(0)] * 256
54         for i in range(0, 255):
55             output[i] = chr(255 - i)
56         os = ''.join(output)
57         rows = [{'outputs': os},
58                 {'outputs': os},
59                 {'outputs': os},
60                 {'outputs': os}]
61
62         self.vapi.qos_egress_map_update(1, rows)
63
64         #
65         # For table 2 (and up) use the value n for everything
66         #
67         output = [chr(2)] * 256
68         os = ''.join(output)
69         rows = [{'outputs': os},
70                 {'outputs': os},
71                 {'outputs': os},
72                 {'outputs': os}]
73
74         self.vapi.qos_egress_map_update(2, rows)
75
76         output = [chr(3)] * 256
77         os = ''.join(output)
78         rows = [{'outputs': os},
79                 {'outputs': os},
80                 {'outputs': os},
81                 {'outputs': os}]
82
83         self.vapi.qos_egress_map_update(3, rows)
84
85         output = [chr(4)] * 256
86         os = ''.join(output)
87         rows = [{'outputs': os},
88                 {'outputs': os},
89                 {'outputs': os},
90                 {'outputs': os}]
91         self.vapi.qos_egress_map_update(4, rows)
92         self.vapi.qos_egress_map_update(5, rows)
93         self.vapi.qos_egress_map_update(6, rows)
94         self.vapi.qos_egress_map_update(7, rows)
95
96         self.logger.info(self.vapi.cli("sh qos eg map"))
97
98         #
99         # Bind interface pgN to table n
100         #
101         self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
102                                           QOS_SOURCE.IP,
103                                           1,
104                                           1)
105         self.vapi.qos_mark_enable_disable(self.pg2.sw_if_index,
106                                           QOS_SOURCE.IP,
107                                           2,
108                                           1)
109         self.vapi.qos_mark_enable_disable(self.pg3.sw_if_index,
110                                           QOS_SOURCE.IP,
111                                           3,
112                                           1)
113         self.vapi.qos_mark_enable_disable(self.pg4.sw_if_index,
114                                           QOS_SOURCE.IP,
115                                           4,
116                                           1)
117
118         #
119         # packets ingress on Pg0
120         #
121         p_v4 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
122                 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, tos=1) /
123                 UDP(sport=1234, dport=1234) /
124                 Raw(chr(100) * 65))
125         p_v6 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
126                 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6,
127                      tc=1) /
128                 UDP(sport=1234, dport=1234) /
129                 Raw(chr(100) * 65))
130
131         #
132         # Since we have not yet enabled the recording of the input QoS
133         # from the input iP header, the egress packet's ToS will be unchanged
134         #
135         rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
136         for p in rx:
137             self.assertEqual(p[IP].tos, 1)
138         rx = self.send_and_expect(self.pg0, p_v6 * 65, self.pg1)
139         for p in rx:
140             self.assertEqual(p[IPv6].tc, 1)
141
142         #
143         # Enable QoS recrding on IP input for pg0
144         #
145         self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
146                                             QOS_SOURCE.IP,
147                                             1)
148
149         #
150         # send the same packets, this time expect the input TOS of 1
151         # to be mapped to pg1's egress value of 254
152         #
153         rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
154         for p in rx:
155             self.assertEqual(p[IP].tos, 254)
156         rx = self.send_and_expect(self.pg0, p_v6 * 65, self.pg1)
157         for p in rx:
158             self.assertEqual(p[IPv6].tc, 254)
159
160         #
161         # different input ToS to test the mapping
162         #
163         p_v4[IP].tos = 127
164         rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
165         for p in rx:
166             self.assertEqual(p[IP].tos, 128)
167         p_v6[IPv6].tc = 127
168         rx = self.send_and_expect(self.pg0, p_v6 * 65, self.pg1)
169         for p in rx:
170             self.assertEqual(p[IPv6].tc, 128)
171
172         p_v4[IP].tos = 254
173         rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
174         for p in rx:
175             self.assertEqual(p[IP].tos, 1)
176         p_v6[IPv6].tc = 254
177         rx = self.send_and_expect(self.pg0, p_v6 * 65, self.pg1)
178         for p in rx:
179             self.assertEqual(p[IPv6].tc, 1)
180
181         #
182         # send packets out the other interfaces to test the maps are
183         # correctly applied
184         #
185         p_v4[IP].dst = self.pg2.remote_ip4
186         rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg2)
187         for p in rx:
188             self.assertEqual(p[IP].tos, 2)
189
190         p_v4[IP].dst = self.pg3.remote_ip4
191         rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg3)
192         for p in rx:
193             self.assertEqual(p[IP].tos, 3)
194
195         p_v6[IPv6].dst = self.pg3.remote_ip6
196         rx = self.send_and_expect(self.pg0, p_v6 * 65, self.pg3)
197         for p in rx:
198             self.assertEqual(p[IPv6].tc, 3)
199
200         #
201         # remove the map on pg2 and pg3, now expect an unchanged IP tos
202         #
203         self.vapi.qos_mark_enable_disable(self.pg2.sw_if_index,
204                                           QOS_SOURCE.IP,
205                                           2,
206                                           0)
207         self.vapi.qos_mark_enable_disable(self.pg3.sw_if_index,
208                                           QOS_SOURCE.IP,
209                                           3,
210                                           0)
211         self.logger.info(self.vapi.cli("sh int feat pg2"))
212
213         p_v4[IP].dst = self.pg2.remote_ip4
214         rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg2)
215         for p in rx:
216             self.assertEqual(p[IP].tos, 254)
217
218         p_v4[IP].dst = self.pg3.remote_ip4
219         rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg3)
220         for p in rx:
221             self.assertEqual(p[IP].tos, 254)
222
223         #
224         # still mapping out of pg1
225         #
226         p_v4[IP].dst = self.pg1.remote_ip4
227         rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
228         for p in rx:
229             self.assertEqual(p[IP].tos, 1)
230
231         #
232         # disable the input recording on pg0
233         #
234         self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
235                                             QOS_SOURCE.IP,
236                                             0)
237
238         #
239         # back to an unchanged TOS value
240         #
241         rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
242         for p in rx:
243             self.assertEqual(p[IP].tos, 254)
244
245         #
246         # disable the egress map on pg1 and pg4
247         #
248         self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
249                                           QOS_SOURCE.IP,
250                                           1,
251                                           0)
252         self.vapi.qos_mark_enable_disable(self.pg4.sw_if_index,
253                                           QOS_SOURCE.IP,
254                                           4,
255                                           0)
256
257         #
258         # unchanged Tos on pg1
259         #
260         rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
261         for p in rx:
262             self.assertEqual(p[IP].tos, 254)
263
264         #
265         # clean-up the masp
266         #
267         self.vapi.qos_egress_map_delete(1)
268         self.vapi.qos_egress_map_delete(4)
269         self.vapi.qos_egress_map_delete(2)
270         self.vapi.qos_egress_map_delete(3)
271         self.vapi.qos_egress_map_delete(5)
272         self.vapi.qos_egress_map_delete(6)
273         self.vapi.qos_egress_map_delete(7)
274
275     def test_qos_mpls(self):
276         """ QoS Mark/Record MPLS """
277
278         #
279         # 255 QoS for all input values
280         #
281         from_ext = 7
282         from_ip = 6
283         from_mpls = 5
284         from_vlan = 4
285         output = [chr(from_ext)] * 256
286         os1 = ''.join(output)
287         output = [chr(from_vlan)] * 256
288         os2 = ''.join(output)
289         output = [chr(from_mpls)] * 256
290         os3 = ''.join(output)
291         output = [chr(from_ip)] * 256
292         os4 = ''.join(output)
293         rows = [{'outputs': os1},
294                 {'outputs': os2},
295                 {'outputs': os3},
296                 {'outputs': os4}]
297
298         self.vapi.qos_egress_map_update(1, rows)
299
300         #
301         # a route with 1 MPLS label
302         #
303         route_10_0_0_1 = VppIpRoute(self, "10.0.0.1", 32,
304                                     [VppRoutePath(self.pg1.remote_ip4,
305                                                   self.pg1.sw_if_index,
306                                                   labels=[32])])
307         route_10_0_0_1.add_vpp_config()
308
309         #
310         # a route with 3 MPLS labels
311         #
312         route_10_0_0_3 = VppIpRoute(self, "10.0.0.3", 32,
313                                     [VppRoutePath(self.pg1.remote_ip4,
314                                                   self.pg1.sw_if_index,
315                                                   labels=[63, 33, 34])])
316         route_10_0_0_3.add_vpp_config()
317
318         #
319         # enable IP QoS recording on the input Pg0 and MPLS egress marking
320         # on Pg1
321         #
322         self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
323                                             QOS_SOURCE.IP,
324                                             1)
325         self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
326                                           QOS_SOURCE.MPLS,
327                                           1,
328                                           1)
329
330         #
331         # packet that will get one label added and 3 labels added resp.
332         #
333         p_1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
334                IP(src=self.pg0.remote_ip4, dst="10.0.0.1", tos=1) /
335                UDP(sport=1234, dport=1234) /
336                Raw(chr(100) * 65))
337         p_3 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
338                IP(src=self.pg0.remote_ip4, dst="10.0.0.3", tos=1) /
339                UDP(sport=1234, dport=1234) /
340                Raw(chr(100) * 65))
341
342         rx = self.send_and_expect(self.pg0, p_1 * 65, self.pg1)
343
344         #
345         # only 3 bits of ToS value in MPLS make sure tos is correct
346         # and the label and EOS bit have not been corrupted
347         #
348         for p in rx:
349             self.assertEqual(p[MPLS].cos, from_ip)
350             self.assertEqual(p[MPLS].label, 32)
351             self.assertEqual(p[MPLS].s, 1)
352         rx = self.send_and_expect(self.pg0, p_3 * 65, self.pg1)
353         for p in rx:
354             self.assertEqual(p[MPLS].cos, from_ip)
355             self.assertEqual(p[MPLS].label, 63)
356             self.assertEqual(p[MPLS].s, 0)
357             h = p[MPLS].payload
358             self.assertEqual(h[MPLS].cos, from_ip)
359             self.assertEqual(h[MPLS].label, 33)
360             self.assertEqual(h[MPLS].s, 0)
361             h = h[MPLS].payload
362             self.assertEqual(h[MPLS].cos, from_ip)
363             self.assertEqual(h[MPLS].label, 34)
364             self.assertEqual(h[MPLS].s, 1)
365
366         #
367         # enable MPLS QoS recording on the input Pg0 and IP egress marking
368         # on Pg1
369         #
370         self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
371                                             QOS_SOURCE.MPLS,
372                                             1)
373         self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
374                                           QOS_SOURCE.IP,
375                                           1,
376                                           1)
377
378         #
379         # MPLS x-connect - COS according to pg1 map
380         #
381         route_32_eos = VppMplsRoute(self, 32, 1,
382                                     [VppRoutePath(self.pg1.remote_ip4,
383                                                   self.pg1.sw_if_index,
384                                                   labels=[VppMplsLabel(33)])])
385         route_32_eos.add_vpp_config()
386
387         p_m1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
388                 MPLS(label=32, cos=3, ttl=2) /
389                 IP(src=self.pg0.remote_ip4, dst="10.0.0.1", tos=1) /
390                 UDP(sport=1234, dport=1234) /
391                 Raw(chr(100) * 65))
392
393         rx = self.send_and_expect(self.pg0, p_m1 * 65, self.pg1)
394         for p in rx:
395             self.assertEqual(p[MPLS].cos, from_mpls)
396             self.assertEqual(p[MPLS].label, 33)
397             self.assertEqual(p[MPLS].s, 1)
398
399         #
400         # MPLS deag - COS is copied from MPLS to IP
401         #
402         route_33_eos = VppMplsRoute(self, 33, 1,
403                                     [VppRoutePath("0.0.0.0",
404                                                   0xffffffff,
405                                                   nh_table_id=0)])
406         route_33_eos.add_vpp_config()
407
408         route_10_0_0_4 = VppIpRoute(self, "10.0.0.4", 32,
409                                     [VppRoutePath(self.pg1.remote_ip4,
410                                                   self.pg1.sw_if_index)])
411         route_10_0_0_4.add_vpp_config()
412
413         p_m2 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
414                 MPLS(label=33, ttl=2, cos=3) /
415                 IP(src=self.pg0.remote_ip4, dst="10.0.0.4", tos=1) /
416                 UDP(sport=1234, dport=1234) /
417                 Raw(chr(100) * 65))
418
419         rx = self.send_and_expect(self.pg0, p_m2 * 65, self.pg1)
420
421         for p in rx:
422             self.assertEqual(p[IP].tos, from_mpls)
423
424         #
425         # cleanup
426         #
427         self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
428                                             QOS_SOURCE.IP,
429                                             0)
430         self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
431                                           QOS_SOURCE.MPLS,
432                                           1,
433                                           0)
434         self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
435                                             QOS_SOURCE.MPLS,
436                                             0)
437         self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
438                                           QOS_SOURCE.IP,
439                                           1,
440                                           0)
441         self.vapi.qos_egress_map_delete(1)
442
443     def test_qos_vlan(self):
444         """QoS mark/record VLAN """
445
446         #
447         # QoS for all input values
448         #
449         output = [chr(0)] * 256
450         for i in range(0, 255):
451             output[i] = chr(255 - i)
452         os = ''.join(output)
453         rows = [{'outputs': os},
454                 {'outputs': os},
455                 {'outputs': os},
456                 {'outputs': os}]
457
458         self.vapi.qos_egress_map_update(1, rows)
459
460         sub_if = VppDot1QSubint(self, self.pg0, 11)
461
462         sub_if.admin_up()
463         sub_if.config_ip4()
464         sub_if.resolve_arp()
465         sub_if.config_ip6()
466         sub_if.resolve_ndp()
467
468         #
469         # enable VLAN QoS recording/marking on the input Pg0 subinterface and
470         #
471         self.vapi.qos_record_enable_disable(sub_if.sw_if_index,
472                                             QOS_SOURCE.VLAN,
473                                             1)
474         self.vapi.qos_mark_enable_disable(sub_if.sw_if_index,
475                                           QOS_SOURCE.VLAN,
476                                           1,
477                                           1)
478
479         #
480         # IP marking/recording on pg1
481         #
482         self.vapi.qos_record_enable_disable(self.pg1.sw_if_index,
483                                             QOS_SOURCE.IP,
484                                             1)
485         self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
486                                           QOS_SOURCE.IP,
487                                           1,
488                                           1)
489
490         #
491         # a routes to/from sub-interface
492         #
493         route_10_0_0_1 = VppIpRoute(self, "10.0.0.1", 32,
494                                     [VppRoutePath(sub_if.remote_ip4,
495                                                   sub_if.sw_if_index)])
496         route_10_0_0_1.add_vpp_config()
497         route_10_0_0_2 = VppIpRoute(self, "10.0.0.2", 32,
498                                     [VppRoutePath(self.pg1.remote_ip4,
499                                                   self.pg1.sw_if_index)])
500         route_10_0_0_2.add_vpp_config()
501         route_2001_1 = VppIpRoute(self, "2001::1", 128,
502                                   [VppRoutePath(sub_if.remote_ip6,
503                                                 sub_if.sw_if_index,
504                                                 proto=DpoProto.DPO_PROTO_IP6)],
505                                   is_ip6=1)
506         route_2001_1.add_vpp_config()
507         route_2001_2 = VppIpRoute(self, "2001::2", 128,
508                                   [VppRoutePath(self.pg1.remote_ip6,
509                                                 self.pg1.sw_if_index,
510                                                 proto=DpoProto.DPO_PROTO_IP6)],
511                                   is_ip6=1)
512         route_2001_2.add_vpp_config()
513
514         p_v1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
515                 Dot1Q(vlan=11, prio=1) /
516                 IP(src="1.1.1.1", dst="10.0.0.2", tos=1) /
517                 UDP(sport=1234, dport=1234) /
518                 Raw(chr(100) * 65))
519
520         p_v2 = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
521                 IP(src="1.1.1.1", dst="10.0.0.1", tos=1) /
522                 UDP(sport=1234, dport=1234) /
523                 Raw(chr(100) * 65))
524
525         rx = self.send_and_expect(self.pg1, p_v2 * 65, self.pg0)
526
527         for p in rx:
528             self.assertEqual(p[Dot1Q].prio, 6)
529
530         rx = self.send_and_expect(self.pg0, p_v1 * 65, self.pg1)
531
532         for p in rx:
533             self.assertEqual(p[IP].tos, 254)
534
535         p_v1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
536                 Dot1Q(vlan=11, prio=2) /
537                 IPv6(src="2001::1", dst="2001::2", tc=1) /
538                 UDP(sport=1234, dport=1234) /
539                 Raw(chr(100) * 65))
540
541         p_v2 = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
542                 IPv6(src="3001::1", dst="2001::1", tc=1) /
543                 UDP(sport=1234, dport=1234) /
544                 Raw(chr(100) * 65))
545
546         rx = self.send_and_expect(self.pg1, p_v2 * 65, self.pg0)
547
548         for p in rx:
549             self.assertEqual(p[Dot1Q].prio, 6)
550
551         rx = self.send_and_expect(self.pg0, p_v1 * 65, self.pg1)
552
553         for p in rx:
554             self.assertEqual(p[IPv6].tc, 253)
555
556         #
557         # cleanup
558         #
559         sub_if.unconfig_ip4()
560         sub_if.unconfig_ip6()
561
562         self.vapi.qos_record_enable_disable(sub_if.sw_if_index,
563                                             QOS_SOURCE.VLAN,
564                                             0)
565         self.vapi.qos_mark_enable_disable(sub_if.sw_if_index,
566                                           QOS_SOURCE.VLAN,
567                                           1,
568                                           0)
569         self.vapi.qos_record_enable_disable(self.pg1.sw_if_index,
570                                             QOS_SOURCE.IP,
571                                             0)
572         self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
573                                           QOS_SOURCE.IP,
574                                           1,
575                                           0)
576
577
578 if __name__ == '__main__':
579     unittest.main(testRunner=VppTestRunner)