76fa8a11b6ee9c17818e94cc2a46b0854214b1c3
[vpp.git] / test / test_qos.py
1 #!/usr/bin/env python
2
3 import unittest
4
5 from framework import VppTestCase, VppTestRunner
6 from vpp_papi_provider import QOS_SOURCE
7 from vpp_sub_interface import VppDot1QSubint
8 from vpp_ip_route import VppIpRoute, VppRoutePath, VppMplsRoute, \
9     VppMplsLabel, VppMplsTable, DpoProto
10
11 from scapy.packet import Raw
12 from scapy.layers.l2 import Ether, Dot1Q
13 from scapy.layers.inet import IP, UDP
14 from scapy.layers.inet6 import IPv6
15 from scapy.contrib.mpls import MPLS
16
17
18 class TestQOS(VppTestCase):
19     """ QOS Test Case """
20
21     def setUp(self):
22         super(TestQOS, self).setUp()
23
24         self.create_pg_interfaces(range(5))
25
26         tbl = VppMplsTable(self, 0)
27         tbl.add_vpp_config()
28
29         for i in self.pg_interfaces:
30             i.admin_up()
31             i.config_ip4()
32             i.resolve_arp()
33             i.config_ip6()
34             i.resolve_ndp()
35             i.enable_mpls()
36
37     def tearDown(self):
38         for i in self.pg_interfaces:
39             i.unconfig_ip4()
40             i.unconfig_ip6()
41             i.disable_mpls()
42
43         super(TestQOS, self).tearDown()
44
45     def test_qos_ip(self):
46         """ QoS Mark/Record IP """
47
48         #
49         # for table 1 map the n=0xff possible values of input QoS mark,
50         # n to 1-n
51         #
52         output = [chr(0)] * 256
53         for i in range(0, 255):
54             output[i] = chr(255 - i)
55         os = ''.join(output)
56         rows = [{'outputs': os},
57                 {'outputs': os},
58                 {'outputs': os},
59                 {'outputs': os}]
60
61         self.vapi.qos_egress_map_update(1, rows)
62
63         #
64         # For table 2 (and up) use the value n for everything
65         #
66         output = [chr(2)] * 256
67         os = ''.join(output)
68         rows = [{'outputs': os},
69                 {'outputs': os},
70                 {'outputs': os},
71                 {'outputs': os}]
72
73         self.vapi.qos_egress_map_update(2, rows)
74
75         output = [chr(3)] * 256
76         os = ''.join(output)
77         rows = [{'outputs': os},
78                 {'outputs': os},
79                 {'outputs': os},
80                 {'outputs': os}]
81
82         self.vapi.qos_egress_map_update(3, rows)
83
84         output = [chr(4)] * 256
85         os = ''.join(output)
86         rows = [{'outputs': os},
87                 {'outputs': os},
88                 {'outputs': os},
89                 {'outputs': os}]
90         self.vapi.qos_egress_map_update(4, rows)
91         self.vapi.qos_egress_map_update(5, rows)
92         self.vapi.qos_egress_map_update(6, rows)
93         self.vapi.qos_egress_map_update(7, rows)
94
95         self.logger.info(self.vapi.cli("sh qos eg map"))
96
97         #
98         # Bind interface pgN to table n
99         #
100         self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
101                                           QOS_SOURCE.IP,
102                                           1,
103                                           1)
104         self.vapi.qos_mark_enable_disable(self.pg2.sw_if_index,
105                                           QOS_SOURCE.IP,
106                                           2,
107                                           1)
108         self.vapi.qos_mark_enable_disable(self.pg3.sw_if_index,
109                                           QOS_SOURCE.IP,
110                                           3,
111                                           1)
112         self.vapi.qos_mark_enable_disable(self.pg4.sw_if_index,
113                                           QOS_SOURCE.IP,
114                                           4,
115                                           1)
116
117         #
118         # packets ingress on Pg0
119         #
120         p_v4 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
121                 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, tos=1) /
122                 UDP(sport=1234, dport=1234) /
123                 Raw(chr(100) * 65))
124         p_v6 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
125                 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6,
126                      tc=1) /
127                 UDP(sport=1234, dport=1234) /
128                 Raw(chr(100) * 65))
129
130         #
131         # Since we have not yet enabled the recording of the input QoS
132         # from the input iP header, the egress packet's ToS will be unchanged
133         #
134         rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
135         for p in rx:
136             self.assertEqual(p[IP].tos, 1)
137         rx = self.send_and_expect(self.pg0, p_v6 * 65, self.pg1)
138         for p in rx:
139             self.assertEqual(p[IPv6].tc, 1)
140
141         #
142         # Enable QoS recrding on IP input for pg0
143         #
144         self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
145                                             QOS_SOURCE.IP,
146                                             1)
147
148         #
149         # send the same packets, this time expect the input TOS of 1
150         # to be mapped to pg1's egress value of 254
151         #
152         rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
153         for p in rx:
154             self.assertEqual(p[IP].tos, 254)
155         rx = self.send_and_expect(self.pg0, p_v6 * 65, self.pg1)
156         for p in rx:
157             self.assertEqual(p[IPv6].tc, 254)
158
159         #
160         # different input ToS to test the mapping
161         #
162         p_v4[IP].tos = 127
163         rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
164         for p in rx:
165             self.assertEqual(p[IP].tos, 128)
166         p_v6[IPv6].tc = 127
167         rx = self.send_and_expect(self.pg0, p_v6 * 65, self.pg1)
168         for p in rx:
169             self.assertEqual(p[IPv6].tc, 128)
170
171         p_v4[IP].tos = 254
172         rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
173         for p in rx:
174             self.assertEqual(p[IP].tos, 1)
175         p_v6[IPv6].tc = 254
176         rx = self.send_and_expect(self.pg0, p_v6 * 65, self.pg1)
177         for p in rx:
178             self.assertEqual(p[IPv6].tc, 1)
179
180         #
181         # send packets out the other interfaces to test the maps are
182         # correctly applied
183         #
184         p_v4[IP].dst = self.pg2.remote_ip4
185         rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg2)
186         for p in rx:
187             self.assertEqual(p[IP].tos, 2)
188
189         p_v4[IP].dst = self.pg3.remote_ip4
190         rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg3)
191         for p in rx:
192             self.assertEqual(p[IP].tos, 3)
193
194         p_v6[IPv6].dst = self.pg3.remote_ip6
195         rx = self.send_and_expect(self.pg0, p_v6 * 65, self.pg3)
196         for p in rx:
197             self.assertEqual(p[IPv6].tc, 3)
198
199         #
200         # remove the map on pg2 and pg3, now expect an unchanged IP tos
201         #
202         self.vapi.qos_mark_enable_disable(self.pg2.sw_if_index,
203                                           QOS_SOURCE.IP,
204                                           2,
205                                           0)
206         self.vapi.qos_mark_enable_disable(self.pg3.sw_if_index,
207                                           QOS_SOURCE.IP,
208                                           3,
209                                           0)
210         self.logger.info(self.vapi.cli("sh int feat pg2"))
211
212         p_v4[IP].dst = self.pg2.remote_ip4
213         rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg2)
214         for p in rx:
215             self.assertEqual(p[IP].tos, 254)
216
217         p_v4[IP].dst = self.pg3.remote_ip4
218         rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg3)
219         for p in rx:
220             self.assertEqual(p[IP].tos, 254)
221
222         #
223         # still mapping out of pg1
224         #
225         p_v4[IP].dst = self.pg1.remote_ip4
226         rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
227         for p in rx:
228             self.assertEqual(p[IP].tos, 1)
229
230         #
231         # disable the input recording on pg0
232         #
233         self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
234                                             QOS_SOURCE.IP,
235                                             0)
236
237         #
238         # back to an unchanged TOS value
239         #
240         rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
241         for p in rx:
242             self.assertEqual(p[IP].tos, 254)
243
244         #
245         # disable the egress map on pg1 and pg4
246         #
247         self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
248                                           QOS_SOURCE.IP,
249                                           1,
250                                           0)
251         self.vapi.qos_mark_enable_disable(self.pg4.sw_if_index,
252                                           QOS_SOURCE.IP,
253                                           4,
254                                           0)
255
256         #
257         # unchanged Tos on pg1
258         #
259         rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
260         for p in rx:
261             self.assertEqual(p[IP].tos, 254)
262
263         #
264         # clean-up the masp
265         #
266         self.vapi.qos_egress_map_delete(1)
267         self.vapi.qos_egress_map_delete(4)
268         self.vapi.qos_egress_map_delete(2)
269         self.vapi.qos_egress_map_delete(3)
270         self.vapi.qos_egress_map_delete(5)
271         self.vapi.qos_egress_map_delete(6)
272         self.vapi.qos_egress_map_delete(7)
273
274     def test_qos_mpls(self):
275         """ QoS Mark/Record MPLS """
276
277         #
278         # 255 QoS for all input values
279         #
280         output = [chr(255)] * 256
281         os = ''.join(output)
282         rows = [{'outputs': os},
283                 {'outputs': os},
284                 {'outputs': os},
285                 {'outputs': os}]
286
287         self.vapi.qos_egress_map_update(1, rows)
288
289         #
290         # a route with 1 MPLS label
291         #
292         route_10_0_0_1 = VppIpRoute(self, "10.0.0.1", 32,
293                                     [VppRoutePath(self.pg1.remote_ip4,
294                                                   self.pg1.sw_if_index,
295                                                   labels=[32])])
296         route_10_0_0_1.add_vpp_config()
297
298         #
299         # a route with 3 MPLS labels
300         #
301         route_10_0_0_3 = VppIpRoute(self, "10.0.0.3", 32,
302                                     [VppRoutePath(self.pg1.remote_ip4,
303                                                   self.pg1.sw_if_index,
304                                                   labels=[63, 33, 34])])
305         route_10_0_0_3.add_vpp_config()
306
307         #
308         # enable IP QoS recording on the input Pg0 and MPLS egress marking
309         # on Pg1
310         #
311         self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
312                                             QOS_SOURCE.IP,
313                                             1)
314         self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
315                                           QOS_SOURCE.MPLS,
316                                           1,
317                                           1)
318
319         #
320         # packet that will get one label added and 3 labels added resp.
321         #
322         p_1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
323                IP(src=self.pg0.remote_ip4, dst="10.0.0.1", tos=1) /
324                UDP(sport=1234, dport=1234) /
325                Raw(chr(100) * 65))
326         p_3 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
327                IP(src=self.pg0.remote_ip4, dst="10.0.0.3", tos=1) /
328                UDP(sport=1234, dport=1234) /
329                Raw(chr(100) * 65))
330
331         rx = self.send_and_expect(self.pg0, p_1 * 65, self.pg1)
332
333         #
334         # only 3 bits of ToS value in MPLS make sure tos is correct
335         # and the label and EOS bit have not been corrupted
336         #
337         for p in rx:
338             self.assertEqual(p[MPLS].cos, 7)
339             self.assertEqual(p[MPLS].label, 32)
340             self.assertEqual(p[MPLS].s, 1)
341         rx = self.send_and_expect(self.pg0, p_3 * 65, self.pg1)
342         for p in rx:
343             self.assertEqual(p[MPLS].cos, 7)
344             self.assertEqual(p[MPLS].label, 63)
345             self.assertEqual(p[MPLS].s, 0)
346             h = p[MPLS].payload
347             self.assertEqual(h[MPLS].cos, 7)
348             self.assertEqual(h[MPLS].label, 33)
349             self.assertEqual(h[MPLS].s, 0)
350             h = h[MPLS].payload
351             self.assertEqual(h[MPLS].cos, 7)
352             self.assertEqual(h[MPLS].label, 34)
353             self.assertEqual(h[MPLS].s, 1)
354
355         #
356         # enable MPLS QoS recording on the input Pg0 and IP egress marking
357         # on Pg1
358         #
359         self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
360                                             QOS_SOURCE.MPLS,
361                                             1)
362         self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
363                                           QOS_SOURCE.IP,
364                                           1,
365                                           1)
366
367         #
368         # MPLS x-connect - COS is preserved
369         #
370         route_32_eos = VppMplsRoute(self, 32, 1,
371                                     [VppRoutePath(self.pg1.remote_ip4,
372                                                   self.pg1.sw_if_index,
373                                                   labels=[VppMplsLabel(33)])])
374         route_32_eos.add_vpp_config()
375
376         p_m1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
377                 MPLS(label=32, cos=3, ttl=2) /
378                 IP(src=self.pg0.remote_ip4, dst="10.0.0.1", tos=1) /
379                 UDP(sport=1234, dport=1234) /
380                 Raw(chr(100) * 65))
381
382         rx = self.send_and_expect(self.pg0, p_m1 * 65, self.pg1)
383         for p in rx:
384             self.assertEqual(p[MPLS].cos, 7)
385             self.assertEqual(p[MPLS].label, 33)
386             self.assertEqual(p[MPLS].s, 1)
387
388         #
389         # MPLS deag - COS is copied from MPLS to IP
390         #
391         route_33_eos = VppMplsRoute(self, 33, 1,
392                                     [VppRoutePath("0.0.0.0",
393                                                   0xffffffff,
394                                                   nh_table_id=0)])
395         route_33_eos.add_vpp_config()
396
397         route_10_0_0_4 = VppIpRoute(self, "10.0.0.4", 32,
398                                     [VppRoutePath(self.pg1.remote_ip4,
399                                                   self.pg1.sw_if_index)])
400         route_10_0_0_4.add_vpp_config()
401
402         p_m2 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
403                 MPLS(label=33, ttl=2, cos=3) /
404                 IP(src=self.pg0.remote_ip4, dst="10.0.0.4", tos=1) /
405                 UDP(sport=1234, dport=1234) /
406                 Raw(chr(100) * 65))
407
408         rx = self.send_and_expect(self.pg0, p_m2 * 65, self.pg1)
409
410         for p in rx:
411             self.assertEqual(p[IP].tos, 255)
412
413         #
414         # cleanup
415         #
416         self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
417                                             QOS_SOURCE.IP,
418                                             0)
419         self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
420                                           QOS_SOURCE.MPLS,
421                                           1,
422                                           0)
423         self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
424                                             QOS_SOURCE.MPLS,
425                                             0)
426         self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
427                                           QOS_SOURCE.IP,
428                                           1,
429                                           0)
430         self.vapi.qos_egress_map_delete(1)
431
432     def test_qos_vlan(self):
433         """QoS mark/record VLAN """
434
435         #
436         # QoS for all input values
437         #
438         output = [chr(0)] * 256
439         for i in range(0, 255):
440             output[i] = chr(255 - i)
441         os = ''.join(output)
442         rows = [{'outputs': os},
443                 {'outputs': os},
444                 {'outputs': os},
445                 {'outputs': os}]
446
447         self.vapi.qos_egress_map_update(1, rows)
448
449         sub_if = VppDot1QSubint(self, self.pg0, 11)
450
451         sub_if.admin_up()
452         sub_if.config_ip4()
453         sub_if.resolve_arp()
454         sub_if.config_ip6()
455         sub_if.resolve_ndp()
456
457         #
458         # enable VLAN QoS recording/marking on the input Pg0 subinterface and
459         #
460         self.vapi.qos_record_enable_disable(sub_if.sw_if_index,
461                                             QOS_SOURCE.VLAN,
462                                             1)
463         self.vapi.qos_mark_enable_disable(sub_if.sw_if_index,
464                                           QOS_SOURCE.VLAN,
465                                           1,
466                                           1)
467
468         #
469         # IP marking/recording on pg1
470         #
471         self.vapi.qos_record_enable_disable(self.pg1.sw_if_index,
472                                             QOS_SOURCE.IP,
473                                             1)
474         self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
475                                           QOS_SOURCE.IP,
476                                           1,
477                                           1)
478
479         #
480         # a routes to/from sub-interface
481         #
482         route_10_0_0_1 = VppIpRoute(self, "10.0.0.1", 32,
483                                     [VppRoutePath(sub_if.remote_ip4,
484                                                   sub_if.sw_if_index)])
485         route_10_0_0_1.add_vpp_config()
486         route_10_0_0_2 = VppIpRoute(self, "10.0.0.2", 32,
487                                     [VppRoutePath(self.pg1.remote_ip4,
488                                                   self.pg1.sw_if_index)])
489         route_10_0_0_2.add_vpp_config()
490         route_2001_1 = VppIpRoute(self, "2001::1", 128,
491                                   [VppRoutePath(sub_if.remote_ip6,
492                                                 sub_if.sw_if_index,
493                                                 proto=DpoProto.DPO_PROTO_IP6)],
494                                   is_ip6=1)
495         route_2001_1.add_vpp_config()
496         route_2001_2 = VppIpRoute(self, "2001::2", 128,
497                                   [VppRoutePath(self.pg1.remote_ip6,
498                                                 self.pg1.sw_if_index,
499                                                 proto=DpoProto.DPO_PROTO_IP6)],
500                                   is_ip6=1)
501         route_2001_2.add_vpp_config()
502
503         p_v1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
504                 Dot1Q(vlan=11, prio=1) /
505                 IP(src="1.1.1.1", dst="10.0.0.2", tos=1) /
506                 UDP(sport=1234, dport=1234) /
507                 Raw(chr(100) * 65))
508
509         p_v2 = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
510                 IP(src="1.1.1.1", dst="10.0.0.1", tos=1) /
511                 UDP(sport=1234, dport=1234) /
512                 Raw(chr(100) * 65))
513
514         rx = self.send_and_expect(self.pg1, p_v2 * 65, self.pg0)
515
516         for p in rx:
517             self.assertEqual(p[Dot1Q].prio, 6)
518
519         rx = self.send_and_expect(self.pg0, p_v1 * 65, self.pg1)
520
521         for p in rx:
522             self.assertEqual(p[IP].tos, 254)
523
524         p_v1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
525                 Dot1Q(vlan=11, prio=2) /
526                 IPv6(src="2001::1", dst="2001::2", tc=1) /
527                 UDP(sport=1234, dport=1234) /
528                 Raw(chr(100) * 65))
529
530         p_v2 = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
531                 IPv6(src="3001::1", dst="2001::1", tc=1) /
532                 UDP(sport=1234, dport=1234) /
533                 Raw(chr(100) * 65))
534
535         rx = self.send_and_expect(self.pg1, p_v2 * 65, self.pg0)
536
537         for p in rx:
538             self.assertEqual(p[Dot1Q].prio, 6)
539
540         rx = self.send_and_expect(self.pg0, p_v1 * 65, self.pg1)
541
542         for p in rx:
543             self.assertEqual(p[IPv6].tc, 253)
544
545         #
546         # cleanup
547         #
548         sub_if.unconfig_ip4()
549         sub_if.unconfig_ip6()
550
551         self.vapi.qos_record_enable_disable(sub_if.sw_if_index,
552                                             QOS_SOURCE.VLAN,
553                                             0)
554         self.vapi.qos_mark_enable_disable(sub_if.sw_if_index,
555                                           QOS_SOURCE.VLAN,
556                                           1,
557                                           0)
558         self.vapi.qos_record_enable_disable(self.pg1.sw_if_index,
559                                             QOS_SOURCE.IP,
560                                             0)
561         self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
562                                           QOS_SOURCE.IP,
563                                           1,
564                                           0)
565
566
567 if __name__ == '__main__':
568     unittest.main(testRunner=VppTestRunner)