qos: QoS dump APIs
[vpp.git] / test / test_qos.py
1 #!/usr/bin/env python
2
3 import unittest
4
5 from framework import VppTestCase, VppTestRunner
6 from vpp_sub_interface import VppDot1QSubint
7 from vpp_ip import DpoProto
8 from vpp_ip_route import VppIpRoute, VppRoutePath, VppMplsRoute, \
9     VppMplsLabel, VppMplsTable, FibPathProto
10
11 import scapy.compat
12 from scapy.packet import Raw
13 from scapy.layers.l2 import Ether, Dot1Q
14 from scapy.layers.inet import IP, UDP
15 from scapy.layers.inet6 import IPv6
16 from scapy.contrib.mpls import MPLS
17 from vpp_papi import VppEnum
18 from vpp_qos import VppQosRecord, VppQosEgressMap, VppQosMark
19
20 NUM_PKTS = 67
21
22
23 class TestQOS(VppTestCase):
24     """ QOS Test Case """
25
26     # Note: Since the enums aren't created dynamically until after
27     #       the papi client attaches to VPP, we put it in a property to
28     #       ensure it is the value at runtime, not at module load time.
29     @property
30     def QOS_SOURCE(self):
31         return VppEnum.vl_api_qos_source_t
32
33     @classmethod
34     def setUpClass(cls):
35         super(TestQOS, cls).setUpClass()
36
37     @classmethod
38     def tearDownClass(cls):
39         super(TestQOS, cls).tearDownClass()
40
41     def setUp(self):
42         super(TestQOS, self).setUp()
43
44         self.create_pg_interfaces(range(5))
45
46         tbl = VppMplsTable(self, 0)
47         tbl.add_vpp_config()
48
49         for i in self.pg_interfaces:
50             i.admin_up()
51             i.config_ip4()
52             i.resolve_arp()
53             i.config_ip6()
54             i.resolve_ndp()
55             i.enable_mpls()
56
57     def tearDown(self):
58         for i in self.pg_interfaces:
59             i.unconfig_ip4()
60             i.unconfig_ip6()
61             i.disable_mpls()
62
63         super(TestQOS, self).tearDown()
64
65     def test_qos_ip(self):
66         """ QoS Mark/Record IP """
67
68         #
69         # for table 1 map the n=0xff possible values of input QoS mark,
70         # n to 1-n
71         #
72         output = [scapy.compat.chb(0)] * 256
73         for i in range(0, 255):
74             output[i] = scapy.compat.chb(255 - i)
75         os = b''.join(output)
76         rows = [{'outputs': os},
77                 {'outputs': os},
78                 {'outputs': os},
79                 {'outputs': os}]
80
81         qem1 = VppQosEgressMap(self, 1, rows).add_vpp_config()
82
83         #
84         # For table 2 (and up) use the value n for everything
85         #
86         output = [scapy.compat.chb(2)] * 256
87         os = b''.join(output)
88         rows = [{'outputs': os},
89                 {'outputs': os},
90                 {'outputs': os},
91                 {'outputs': os}]
92
93         qem2 = VppQosEgressMap(self, 2, rows).add_vpp_config()
94
95         output = [scapy.compat.chb(3)] * 256
96         os = b''.join(output)
97         rows = [{'outputs': os},
98                 {'outputs': os},
99                 {'outputs': os},
100                 {'outputs': os}]
101
102         qem3 = VppQosEgressMap(self, 3, rows).add_vpp_config()
103
104         output = [scapy.compat.chb(4)] * 256
105         os = b''.join(output)
106         rows = [{'outputs': os},
107                 {'outputs': os},
108                 {'outputs': os},
109                 {'outputs': os}]
110
111         qem4 = VppQosEgressMap(self, 4, rows).add_vpp_config()
112         qem5 = VppQosEgressMap(self, 5, rows).add_vpp_config()
113         qem6 = VppQosEgressMap(self, 6, rows).add_vpp_config()
114         qem7 = VppQosEgressMap(self, 7, rows).add_vpp_config()
115
116         self.assertTrue(qem7.query_vpp_config())
117         self.logger.info(self.vapi.cli("sh qos eg map"))
118
119         #
120         # Bind interface pgN to table n
121         #
122         qm1 = VppQosMark(self, self.pg1, qem1,
123                          self.QOS_SOURCE.QOS_API_SOURCE_IP).add_vpp_config()
124         qm2 = VppQosMark(self, self.pg2, qem2,
125                          self.QOS_SOURCE.QOS_API_SOURCE_IP).add_vpp_config()
126         qm3 = VppQosMark(self, self.pg3, qem3,
127                          self.QOS_SOURCE.QOS_API_SOURCE_IP).add_vpp_config()
128         qm4 = VppQosMark(self, self.pg4, qem4,
129                          self.QOS_SOURCE.QOS_API_SOURCE_IP).add_vpp_config()
130         self.assertTrue(qm3.query_vpp_config())
131
132         self.logger.info(self.vapi.cli("sh qos mark"))
133
134         #
135         # packets ingress on Pg0
136         #
137         p_v4 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
138                 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, tos=1) /
139                 UDP(sport=1234, dport=1234) /
140                 Raw(scapy.compat.chb(100) * NUM_PKTS))
141         p_v6 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
142                 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6,
143                      tc=1) /
144                 UDP(sport=1234, dport=1234) /
145                 Raw(scapy.compat.chb(100) * NUM_PKTS))
146
147         #
148         # Since we have not yet enabled the recording of the input QoS
149         # from the input iP header, the egress packet's ToS will be unchanged
150         #
151         rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg1)
152         for p in rx:
153             self.assertEqual(p[IP].tos, 1)
154         rx = self.send_and_expect(self.pg0, p_v6 * NUM_PKTS, self.pg1)
155         for p in rx:
156             self.assertEqual(p[IPv6].tc, 1)
157
158         #
159         # Enable QoS recording on IP input for pg0
160         #
161         qr1 = VppQosRecord(self, self.pg0,
162                            self.QOS_SOURCE.QOS_API_SOURCE_IP)
163         qr1.add_vpp_config()
164         self.logger.info(self.vapi.cli("sh qos record"))
165
166         #
167         # send the same packets, this time expect the input TOS of 1
168         # to be mapped to pg1's egress value of 254
169         #
170         rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg1)
171         for p in rx:
172             self.assertEqual(p[IP].tos, 254)
173         rx = self.send_and_expect(self.pg0, p_v6 * NUM_PKTS, self.pg1)
174         for p in rx:
175             self.assertEqual(p[IPv6].tc, 254)
176
177         #
178         # different input ToS to test the mapping
179         #
180         p_v4[IP].tos = 127
181         rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg1)
182         for p in rx:
183             self.assertEqual(p[IP].tos, 128)
184         p_v6[IPv6].tc = 127
185         rx = self.send_and_expect(self.pg0, p_v6 * NUM_PKTS, self.pg1)
186         for p in rx:
187             self.assertEqual(p[IPv6].tc, 128)
188
189         p_v4[IP].tos = 254
190         rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg1)
191         for p in rx:
192             self.assertEqual(p[IP].tos, 1)
193         p_v6[IPv6].tc = 254
194         rx = self.send_and_expect(self.pg0, p_v6 * NUM_PKTS, self.pg1)
195         for p in rx:
196             self.assertEqual(p[IPv6].tc, 1)
197
198         #
199         # send packets out the other interfaces to test the maps are
200         # correctly applied
201         #
202         p_v4[IP].dst = self.pg2.remote_ip4
203         rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg2)
204         for p in rx:
205             self.assertEqual(p[IP].tos, 2)
206
207         p_v4[IP].dst = self.pg3.remote_ip4
208         rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg3)
209         for p in rx:
210             self.assertEqual(p[IP].tos, 3)
211
212         p_v6[IPv6].dst = self.pg3.remote_ip6
213         rx = self.send_and_expect(self.pg0, p_v6 * NUM_PKTS, self.pg3)
214         for p in rx:
215             self.assertEqual(p[IPv6].tc, 3)
216
217         #
218         # remove the map on pg2 and pg3, now expect an unchanged IP tos
219         #
220         qm2.remove_vpp_config()
221         qm3.remove_vpp_config()
222         self.logger.info(self.vapi.cli("sh qos mark"))
223
224         self.assertFalse(qm3.query_vpp_config())
225         self.logger.info(self.vapi.cli("sh int feat pg2"))
226
227         p_v4[IP].dst = self.pg2.remote_ip4
228         rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg2)
229         for p in rx:
230             self.assertEqual(p[IP].tos, 254)
231
232         p_v4[IP].dst = self.pg3.remote_ip4
233         rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg3)
234         for p in rx:
235             self.assertEqual(p[IP].tos, 254)
236
237         #
238         # still mapping out of pg1
239         #
240         p_v4[IP].dst = self.pg1.remote_ip4
241         rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg1)
242         for p in rx:
243             self.assertEqual(p[IP].tos, 1)
244
245         #
246         # disable the input recording on pg0
247         #
248         self.assertTrue(qr1.query_vpp_config())
249         qr1.remove_vpp_config()
250
251         #
252         # back to an unchanged TOS value
253         #
254         rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg1)
255         for p in rx:
256             self.assertEqual(p[IP].tos, 254)
257
258         #
259         # disable the egress map on pg1 and pg4
260         #
261         qm1.remove_vpp_config()
262         qm4.remove_vpp_config()
263
264         #
265         # unchanged Tos on pg1
266         #
267         rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg1)
268         for p in rx:
269             self.assertEqual(p[IP].tos, 254)
270
271     def test_qos_mpls(self):
272         """ QoS Mark/Record MPLS """
273
274         #
275         # 255 QoS for all input values
276         #
277         from_ext = 7
278         from_ip = 6
279         from_mpls = 5
280         from_vlan = 4
281         output = [scapy.compat.chb(from_ext)] * 256
282         os1 = b''.join(output)
283         output = [scapy.compat.chb(from_vlan)] * 256
284         os2 = b''.join(output)
285         output = [scapy.compat.chb(from_mpls)] * 256
286         os3 = b''.join(output)
287         output = [scapy.compat.chb(from_ip)] * 256
288         os4 = b''.join(output)
289         rows = [{'outputs': os1},
290                 {'outputs': os2},
291                 {'outputs': os3},
292                 {'outputs': os4}]
293
294         qem1 = VppQosEgressMap(self, 1, rows).add_vpp_config()
295
296         #
297         # a route with 1 MPLS label
298         #
299         route_10_0_0_1 = VppIpRoute(self, "10.0.0.1", 32,
300                                     [VppRoutePath(self.pg1.remote_ip4,
301                                                   self.pg1.sw_if_index,
302                                                   labels=[32])])
303         route_10_0_0_1.add_vpp_config()
304
305         #
306         # a route with 3 MPLS labels
307         #
308         route_10_0_0_3 = VppIpRoute(self, "10.0.0.3", 32,
309                                     [VppRoutePath(self.pg1.remote_ip4,
310                                                   self.pg1.sw_if_index,
311                                                   labels=[63, 33, 34])])
312         route_10_0_0_3.add_vpp_config()
313
314         #
315         # enable IP QoS recording on the input Pg0 and MPLS egress marking
316         # on Pg1
317         #
318         qr1 = VppQosRecord(self, self.pg0,
319                            self.QOS_SOURCE.QOS_API_SOURCE_IP).add_vpp_config()
320         qm1 = VppQosMark(self, self.pg1, qem1,
321                          self.QOS_SOURCE.QOS_API_SOURCE_MPLS).add_vpp_config()
322
323         #
324         # packet that will get one label added and 3 labels added resp.
325         #
326         p_1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
327                IP(src=self.pg0.remote_ip4, dst="10.0.0.1", tos=1) /
328                UDP(sport=1234, dport=1234) /
329                Raw(scapy.compat.chb(100) * NUM_PKTS))
330         p_3 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
331                IP(src=self.pg0.remote_ip4, dst="10.0.0.3", tos=1) /
332                UDP(sport=1234, dport=1234) /
333                Raw(scapy.compat.chb(100) * NUM_PKTS))
334
335         rx = self.send_and_expect(self.pg0, p_1 * NUM_PKTS, self.pg1)
336
337         #
338         # only 3 bits of ToS value in MPLS make sure tos is correct
339         # and the label and EOS bit have not been corrupted
340         #
341         for p in rx:
342             self.assertEqual(p[MPLS].cos, from_ip)
343             self.assertEqual(p[MPLS].label, 32)
344             self.assertEqual(p[MPLS].s, 1)
345         rx = self.send_and_expect(self.pg0, p_3 * NUM_PKTS, self.pg1)
346         for p in rx:
347             self.assertEqual(p[MPLS].cos, from_ip)
348             self.assertEqual(p[MPLS].label, 63)
349             self.assertEqual(p[MPLS].s, 0)
350             h = p[MPLS].payload
351             self.assertEqual(h[MPLS].cos, from_ip)
352             self.assertEqual(h[MPLS].label, 33)
353             self.assertEqual(h[MPLS].s, 0)
354             h = h[MPLS].payload
355             self.assertEqual(h[MPLS].cos, from_ip)
356             self.assertEqual(h[MPLS].label, 34)
357             self.assertEqual(h[MPLS].s, 1)
358
359         #
360         # enable MPLS QoS recording on the input Pg0 and IP egress marking
361         # on Pg1
362         #
363         qr2 = VppQosRecord(
364             self, self.pg0,
365             self.QOS_SOURCE.QOS_API_SOURCE_MPLS).add_vpp_config()
366         qm2 = VppQosMark(
367             self, self.pg1, qem1,
368             self.QOS_SOURCE.QOS_API_SOURCE_IP).add_vpp_config()
369
370         #
371         # MPLS x-connect - COS according to pg1 map
372         #
373         route_32_eos = VppMplsRoute(self, 32, 1,
374                                     [VppRoutePath(self.pg1.remote_ip4,
375                                                   self.pg1.sw_if_index,
376                                                   labels=[VppMplsLabel(33)])])
377         route_32_eos.add_vpp_config()
378
379         p_m1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
380                 MPLS(label=32, cos=3, ttl=2) /
381                 IP(src=self.pg0.remote_ip4, dst="10.0.0.1", tos=1) /
382                 UDP(sport=1234, dport=1234) /
383                 Raw(scapy.compat.chb(100) * NUM_PKTS))
384
385         rx = self.send_and_expect(self.pg0, p_m1 * NUM_PKTS, self.pg1)
386         for p in rx:
387             self.assertEqual(p[MPLS].cos, from_mpls)
388             self.assertEqual(p[MPLS].label, 33)
389             self.assertEqual(p[MPLS].s, 1)
390
391         #
392         # MPLS deag - COS is copied from MPLS to IP
393         #
394         route_33_eos = VppMplsRoute(self, 33, 1,
395                                     [VppRoutePath("0.0.0.0",
396                                                   0xffffffff,
397                                                   nh_table_id=0)])
398         route_33_eos.add_vpp_config()
399
400         route_10_0_0_4 = VppIpRoute(self, "10.0.0.4", 32,
401                                     [VppRoutePath(self.pg1.remote_ip4,
402                                                   self.pg1.sw_if_index)])
403         route_10_0_0_4.add_vpp_config()
404
405         p_m2 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
406                 MPLS(label=33, ttl=2, cos=3) /
407                 IP(src=self.pg0.remote_ip4, dst="10.0.0.4", tos=1) /
408                 UDP(sport=1234, dport=1234) /
409                 Raw(scapy.compat.chb(100) * NUM_PKTS))
410
411         rx = self.send_and_expect(self.pg0, p_m2 * NUM_PKTS, self.pg1)
412
413         for p in rx:
414             self.assertEqual(p[IP].tos, from_mpls)
415
416     def test_qos_vlan(self):
417         """QoS mark/record VLAN """
418
419         #
420         # QoS for all input values
421         #
422         output = [scapy.compat.chb(0)] * 256
423         for i in range(0, 255):
424             output[i] = scapy.compat.chb(255 - i)
425         os = b''.join(output)
426         rows = [{'outputs': os},
427                 {'outputs': os},
428                 {'outputs': os},
429                 {'outputs': os}]
430
431         qem1 = VppQosEgressMap(self, 1, rows).add_vpp_config()
432
433         sub_if = VppDot1QSubint(self, self.pg0, 11)
434
435         sub_if.admin_up()
436         sub_if.config_ip4()
437         sub_if.resolve_arp()
438         sub_if.config_ip6()
439         sub_if.resolve_ndp()
440
441         #
442         # enable VLAN QoS recording/marking on the input Pg0 subinterface and
443         #
444         qr_v = VppQosRecord(
445             self, sub_if,
446             self.QOS_SOURCE.QOS_API_SOURCE_VLAN).add_vpp_config()
447         qm_v = VppQosMark(
448             self, sub_if, qem1,
449             self.QOS_SOURCE.QOS_API_SOURCE_VLAN).add_vpp_config()
450
451         #
452         # IP marking/recording on pg1
453         #
454         qr_ip = VppQosRecord(
455             self, self.pg1,
456             self.QOS_SOURCE.QOS_API_SOURCE_IP).add_vpp_config()
457         qm_ip = VppQosMark(
458             self, self.pg1, qem1,
459             self.QOS_SOURCE.QOS_API_SOURCE_IP).add_vpp_config()
460
461         #
462         # a routes to/from sub-interface
463         #
464         route_10_0_0_1 = VppIpRoute(self, "10.0.0.1", 32,
465                                     [VppRoutePath(sub_if.remote_ip4,
466                                                   sub_if.sw_if_index)])
467         route_10_0_0_1.add_vpp_config()
468         route_10_0_0_2 = VppIpRoute(self, "10.0.0.2", 32,
469                                     [VppRoutePath(self.pg1.remote_ip4,
470                                                   self.pg1.sw_if_index)])
471         route_10_0_0_2.add_vpp_config()
472         route_2001_1 = VppIpRoute(self, "2001::1", 128,
473                                   [VppRoutePath(sub_if.remote_ip6,
474                                                 sub_if.sw_if_index)])
475         route_2001_1.add_vpp_config()
476         route_2001_2 = VppIpRoute(self, "2001::2", 128,
477                                   [VppRoutePath(self.pg1.remote_ip6,
478                                                 self.pg1.sw_if_index)])
479         route_2001_2.add_vpp_config()
480
481         p_v1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
482                 Dot1Q(vlan=11, prio=1) /
483                 IP(src="1.1.1.1", dst="10.0.0.2", tos=1) /
484                 UDP(sport=1234, dport=1234) /
485                 Raw(scapy.compat.chb(100) * NUM_PKTS))
486
487         p_v2 = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
488                 IP(src="1.1.1.1", dst="10.0.0.1", tos=1) /
489                 UDP(sport=1234, dport=1234) /
490                 Raw(scapy.compat.chb(100) * NUM_PKTS))
491
492         rx = self.send_and_expect(self.pg1, p_v2 * NUM_PKTS, self.pg0)
493
494         for p in rx:
495             self.assertEqual(p[Dot1Q].prio, 6)
496
497         rx = self.send_and_expect(self.pg0, p_v1 * NUM_PKTS, self.pg1)
498
499         for p in rx:
500             self.assertEqual(p[IP].tos, 254)
501
502         p_v1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
503                 Dot1Q(vlan=11, prio=2) /
504                 IPv6(src="2001::1", dst="2001::2", tc=1) /
505                 UDP(sport=1234, dport=1234) /
506                 Raw(scapy.compat.chb(100) * NUM_PKTS))
507
508         p_v2 = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
509                 IPv6(src="3001::1", dst="2001::1", tc=1) /
510                 UDP(sport=1234, dport=1234) /
511                 Raw(scapy.compat.chb(100) * NUM_PKTS))
512
513         rx = self.send_and_expect(self.pg1, p_v2 * NUM_PKTS, self.pg0)
514
515         for p in rx:
516             self.assertEqual(p[Dot1Q].prio, 6)
517
518         rx = self.send_and_expect(self.pg0, p_v1 * NUM_PKTS, self.pg1)
519
520         for p in rx:
521             self.assertEqual(p[IPv6].tc, 253)
522
523         #
524         # cleanup
525         #
526         sub_if.unconfig_ip4()
527         sub_if.unconfig_ip6()
528
529
530 if __name__ == '__main__':
531     unittest.main(testRunner=VppTestRunner)