qos: Store function
[vpp.git] / test / test_qos.py
1 #!/usr/bin/env python
2
3 import unittest
4
5 from framework import VppTestCase, VppTestRunner
6 from vpp_sub_interface import VppDot1QSubint
7 from vpp_ip import DpoProto
8 from vpp_ip_route import VppIpRoute, VppRoutePath, VppMplsRoute, \
9     VppMplsLabel, VppMplsTable, FibPathProto
10
11 import scapy.compat
12 from scapy.packet import Raw
13 from scapy.layers.l2 import Ether, Dot1Q
14 from scapy.layers.inet import IP, UDP
15 from scapy.layers.inet6 import IPv6
16 from scapy.contrib.mpls import MPLS
17 from vpp_papi import VppEnum
18 from vpp_qos import VppQosRecord, VppQosEgressMap, VppQosMark, VppQosStore
19
20 NUM_PKTS = 67
21
22
23 class TestQOS(VppTestCase):
24     """ QOS Test Case """
25
26     # Note: Since the enums aren't created dynamically until after
27     #       the papi client attaches to VPP, we put it in a property to
28     #       ensure it is the value at runtime, not at module load time.
29     @property
30     def QOS_SOURCE(self):
31         return VppEnum.vl_api_qos_source_t
32
33     @classmethod
34     def setUpClass(cls):
35         super(TestQOS, cls).setUpClass()
36
37     @classmethod
38     def tearDownClass(cls):
39         super(TestQOS, cls).tearDownClass()
40
41     def setUp(self):
42         super(TestQOS, self).setUp()
43
44         self.create_pg_interfaces(range(5))
45
46         tbl = VppMplsTable(self, 0)
47         tbl.add_vpp_config()
48
49         for i in self.pg_interfaces:
50             i.admin_up()
51             i.config_ip4()
52             i.resolve_arp()
53             i.config_ip6()
54             i.resolve_ndp()
55             i.enable_mpls()
56
57     def tearDown(self):
58         for i in self.pg_interfaces:
59             i.unconfig_ip4()
60             i.unconfig_ip6()
61             i.disable_mpls()
62
63         super(TestQOS, self).tearDown()
64
65     def test_qos_ip(self):
66         """ QoS Mark/Record/Store IP """
67
68         #
69         # for table 1 map the n=0xff possible values of input QoS mark,
70         # n to 1-n
71         #
72         output = [scapy.compat.chb(0)] * 256
73         for i in range(0, 255):
74             output[i] = scapy.compat.chb(255 - i)
75         os = b''.join(output)
76         rows = [{'outputs': os},
77                 {'outputs': os},
78                 {'outputs': os},
79                 {'outputs': os}]
80
81         qem1 = VppQosEgressMap(self, 1, rows).add_vpp_config()
82
83         #
84         # For table 2 (and up) use the value n for everything
85         #
86         output = [scapy.compat.chb(2)] * 256
87         os = b''.join(output)
88         rows = [{'outputs': os},
89                 {'outputs': os},
90                 {'outputs': os},
91                 {'outputs': os}]
92
93         qem2 = VppQosEgressMap(self, 2, rows).add_vpp_config()
94
95         output = [scapy.compat.chb(3)] * 256
96         os = b''.join(output)
97         rows = [{'outputs': os},
98                 {'outputs': os},
99                 {'outputs': os},
100                 {'outputs': os}]
101
102         qem3 = VppQosEgressMap(self, 3, rows).add_vpp_config()
103
104         output = [scapy.compat.chb(4)] * 256
105         os = b''.join(output)
106         rows = [{'outputs': os},
107                 {'outputs': os},
108                 {'outputs': os},
109                 {'outputs': os}]
110
111         qem4 = VppQosEgressMap(self, 4, rows).add_vpp_config()
112         qem5 = VppQosEgressMap(self, 5, rows).add_vpp_config()
113         qem6 = VppQosEgressMap(self, 6, rows).add_vpp_config()
114         qem7 = VppQosEgressMap(self, 7, rows).add_vpp_config()
115
116         self.assertTrue(qem7.query_vpp_config())
117         self.logger.info(self.vapi.cli("sh qos eg map"))
118
119         #
120         # Bind interface pgN to table n
121         #
122         qm1 = VppQosMark(self, self.pg1, qem1,
123                          self.QOS_SOURCE.QOS_API_SOURCE_IP).add_vpp_config()
124         qm2 = VppQosMark(self, self.pg2, qem2,
125                          self.QOS_SOURCE.QOS_API_SOURCE_IP).add_vpp_config()
126         qm3 = VppQosMark(self, self.pg3, qem3,
127                          self.QOS_SOURCE.QOS_API_SOURCE_IP).add_vpp_config()
128         qm4 = VppQosMark(self, self.pg4, qem4,
129                          self.QOS_SOURCE.QOS_API_SOURCE_IP).add_vpp_config()
130         self.assertTrue(qm3.query_vpp_config())
131
132         self.logger.info(self.vapi.cli("sh qos mark"))
133
134         #
135         # packets ingress on Pg0
136         #
137         p_v4 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
138                 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, tos=1) /
139                 UDP(sport=1234, dport=1234) /
140                 Raw(scapy.compat.chb(100) * NUM_PKTS))
141         p_v6 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
142                 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6,
143                      tc=1) /
144                 UDP(sport=1234, dport=1234) /
145                 Raw(scapy.compat.chb(100) * NUM_PKTS))
146
147         #
148         # Since we have not yet enabled the recording of the input QoS
149         # from the input iP header, the egress packet's ToS will be unchanged
150         #
151         rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg1)
152         for p in rx:
153             self.assertEqual(p[IP].tos, 1)
154         rx = self.send_and_expect(self.pg0, p_v6 * NUM_PKTS, self.pg1)
155         for p in rx:
156             self.assertEqual(p[IPv6].tc, 1)
157
158         #
159         # Enable QoS recording on IP input for pg0
160         #
161         qr1 = VppQosRecord(self, self.pg0,
162                            self.QOS_SOURCE.QOS_API_SOURCE_IP)
163         qr1.add_vpp_config()
164         self.logger.info(self.vapi.cli("sh qos record"))
165
166         #
167         # send the same packets, this time expect the input TOS of 1
168         # to be mapped to pg1's egress value of 254
169         #
170         rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg1)
171         for p in rx:
172             self.assertEqual(p[IP].tos, 254)
173         rx = self.send_and_expect(self.pg0, p_v6 * NUM_PKTS, self.pg1)
174         for p in rx:
175             self.assertEqual(p[IPv6].tc, 254)
176
177         #
178         # different input ToS to test the mapping
179         #
180         p_v4[IP].tos = 127
181         rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg1)
182         for p in rx:
183             self.assertEqual(p[IP].tos, 128)
184         p_v6[IPv6].tc = 127
185         rx = self.send_and_expect(self.pg0, p_v6 * NUM_PKTS, self.pg1)
186         for p in rx:
187             self.assertEqual(p[IPv6].tc, 128)
188
189         p_v4[IP].tos = 254
190         rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg1)
191         for p in rx:
192             self.assertEqual(p[IP].tos, 1)
193         p_v6[IPv6].tc = 254
194         rx = self.send_and_expect(self.pg0, p_v6 * NUM_PKTS, self.pg1)
195         for p in rx:
196             self.assertEqual(p[IPv6].tc, 1)
197
198         #
199         # send packets out the other interfaces to test the maps are
200         # correctly applied
201         #
202         p_v4[IP].dst = self.pg2.remote_ip4
203         rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg2)
204         for p in rx:
205             self.assertEqual(p[IP].tos, 2)
206
207         p_v4[IP].dst = self.pg3.remote_ip4
208         rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg3)
209         for p in rx:
210             self.assertEqual(p[IP].tos, 3)
211
212         p_v6[IPv6].dst = self.pg3.remote_ip6
213         rx = self.send_and_expect(self.pg0, p_v6 * NUM_PKTS, self.pg3)
214         for p in rx:
215             self.assertEqual(p[IPv6].tc, 3)
216
217         #
218         # remove the map on pg2 and pg3, now expect an unchanged IP tos
219         #
220         qm2.remove_vpp_config()
221         qm3.remove_vpp_config()
222         self.logger.info(self.vapi.cli("sh qos mark"))
223
224         self.assertFalse(qm3.query_vpp_config())
225         self.logger.info(self.vapi.cli("sh int feat pg2"))
226
227         p_v4[IP].dst = self.pg2.remote_ip4
228         rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg2)
229         for p in rx:
230             self.assertEqual(p[IP].tos, 254)
231
232         p_v4[IP].dst = self.pg3.remote_ip4
233         rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg3)
234         for p in rx:
235             self.assertEqual(p[IP].tos, 254)
236
237         #
238         # still mapping out of pg1
239         #
240         p_v4[IP].dst = self.pg1.remote_ip4
241         rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg1)
242         for p in rx:
243             self.assertEqual(p[IP].tos, 1)
244
245         #
246         # disable the input recording on pg0
247         #
248         self.assertTrue(qr1.query_vpp_config())
249         qr1.remove_vpp_config()
250
251         #
252         # back to an unchanged TOS value
253         #
254         rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg1)
255         for p in rx:
256             self.assertEqual(p[IP].tos, 254)
257
258         #
259         # enable QoS stroe instead of record
260         #
261         qst1 = VppQosStore(self, self.pg0,
262                            self.QOS_SOURCE.QOS_API_SOURCE_IP,
263                            5).add_vpp_config()
264         self.logger.info(self.vapi.cli("sh qos store"))
265
266         p_v4[IP].dst = self.pg1.remote_ip4
267         rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg1)
268         for p in rx:
269             self.assertEqual(p[IP].tos, 250)
270
271         #
272         # disable the input storing on pg0
273         #
274         self.assertTrue(qst1.query_vpp_config())
275         qst1.remove_vpp_config()
276
277         #
278         # back to an unchanged TOS value
279         #
280         rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg1)
281         for p in rx:
282             self.assertEqual(p[IP].tos, 254)
283
284         #
285         # disable the egress map on pg1 and pg4
286         #
287         qm1.remove_vpp_config()
288         qm4.remove_vpp_config()
289
290         #
291         # unchanged Tos on pg1
292         #
293         rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg1)
294         for p in rx:
295             self.assertEqual(p[IP].tos, 254)
296
297     def test_qos_mpls(self):
298         """ QoS Mark/Record MPLS """
299
300         #
301         # 255 QoS for all input values
302         #
303         from_ext = 7
304         from_ip = 6
305         from_mpls = 5
306         from_vlan = 4
307         output = [scapy.compat.chb(from_ext)] * 256
308         os1 = b''.join(output)
309         output = [scapy.compat.chb(from_vlan)] * 256
310         os2 = b''.join(output)
311         output = [scapy.compat.chb(from_mpls)] * 256
312         os3 = b''.join(output)
313         output = [scapy.compat.chb(from_ip)] * 256
314         os4 = b''.join(output)
315         rows = [{'outputs': os1},
316                 {'outputs': os2},
317                 {'outputs': os3},
318                 {'outputs': os4}]
319
320         qem1 = VppQosEgressMap(self, 1, rows).add_vpp_config()
321
322         #
323         # a route with 1 MPLS label
324         #
325         route_10_0_0_1 = VppIpRoute(self, "10.0.0.1", 32,
326                                     [VppRoutePath(self.pg1.remote_ip4,
327                                                   self.pg1.sw_if_index,
328                                                   labels=[32])])
329         route_10_0_0_1.add_vpp_config()
330
331         #
332         # a route with 3 MPLS labels
333         #
334         route_10_0_0_3 = VppIpRoute(self, "10.0.0.3", 32,
335                                     [VppRoutePath(self.pg1.remote_ip4,
336                                                   self.pg1.sw_if_index,
337                                                   labels=[63, 33, 34])])
338         route_10_0_0_3.add_vpp_config()
339
340         #
341         # enable IP QoS recording on the input Pg0 and MPLS egress marking
342         # on Pg1
343         #
344         qr1 = VppQosRecord(self, self.pg0,
345                            self.QOS_SOURCE.QOS_API_SOURCE_IP).add_vpp_config()
346         qm1 = VppQosMark(self, self.pg1, qem1,
347                          self.QOS_SOURCE.QOS_API_SOURCE_MPLS).add_vpp_config()
348
349         #
350         # packet that will get one label added and 3 labels added resp.
351         #
352         p_1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
353                IP(src=self.pg0.remote_ip4, dst="10.0.0.1", tos=1) /
354                UDP(sport=1234, dport=1234) /
355                Raw(scapy.compat.chb(100) * NUM_PKTS))
356         p_3 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
357                IP(src=self.pg0.remote_ip4, dst="10.0.0.3", tos=1) /
358                UDP(sport=1234, dport=1234) /
359                Raw(scapy.compat.chb(100) * NUM_PKTS))
360
361         rx = self.send_and_expect(self.pg0, p_1 * NUM_PKTS, self.pg1)
362
363         #
364         # only 3 bits of ToS value in MPLS make sure tos is correct
365         # and the label and EOS bit have not been corrupted
366         #
367         for p in rx:
368             self.assertEqual(p[MPLS].cos, from_ip)
369             self.assertEqual(p[MPLS].label, 32)
370             self.assertEqual(p[MPLS].s, 1)
371         rx = self.send_and_expect(self.pg0, p_3 * NUM_PKTS, self.pg1)
372         for p in rx:
373             self.assertEqual(p[MPLS].cos, from_ip)
374             self.assertEqual(p[MPLS].label, 63)
375             self.assertEqual(p[MPLS].s, 0)
376             h = p[MPLS].payload
377             self.assertEqual(h[MPLS].cos, from_ip)
378             self.assertEqual(h[MPLS].label, 33)
379             self.assertEqual(h[MPLS].s, 0)
380             h = h[MPLS].payload
381             self.assertEqual(h[MPLS].cos, from_ip)
382             self.assertEqual(h[MPLS].label, 34)
383             self.assertEqual(h[MPLS].s, 1)
384
385         #
386         # enable MPLS QoS recording on the input Pg0 and IP egress marking
387         # on Pg1
388         #
389         qr2 = VppQosRecord(
390             self, self.pg0,
391             self.QOS_SOURCE.QOS_API_SOURCE_MPLS).add_vpp_config()
392         qm2 = VppQosMark(
393             self, self.pg1, qem1,
394             self.QOS_SOURCE.QOS_API_SOURCE_IP).add_vpp_config()
395
396         #
397         # MPLS x-connect - COS according to pg1 map
398         #
399         route_32_eos = VppMplsRoute(self, 32, 1,
400                                     [VppRoutePath(self.pg1.remote_ip4,
401                                                   self.pg1.sw_if_index,
402                                                   labels=[VppMplsLabel(33)])])
403         route_32_eos.add_vpp_config()
404
405         p_m1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
406                 MPLS(label=32, cos=3, ttl=2) /
407                 IP(src=self.pg0.remote_ip4, dst="10.0.0.1", tos=1) /
408                 UDP(sport=1234, dport=1234) /
409                 Raw(scapy.compat.chb(100) * NUM_PKTS))
410
411         rx = self.send_and_expect(self.pg0, p_m1 * NUM_PKTS, self.pg1)
412         for p in rx:
413             self.assertEqual(p[MPLS].cos, from_mpls)
414             self.assertEqual(p[MPLS].label, 33)
415             self.assertEqual(p[MPLS].s, 1)
416
417         #
418         # MPLS deag - COS is copied from MPLS to IP
419         #
420         route_33_eos = VppMplsRoute(self, 33, 1,
421                                     [VppRoutePath("0.0.0.0",
422                                                   0xffffffff,
423                                                   nh_table_id=0)])
424         route_33_eos.add_vpp_config()
425
426         route_10_0_0_4 = VppIpRoute(self, "10.0.0.4", 32,
427                                     [VppRoutePath(self.pg1.remote_ip4,
428                                                   self.pg1.sw_if_index)])
429         route_10_0_0_4.add_vpp_config()
430
431         p_m2 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
432                 MPLS(label=33, ttl=2, cos=3) /
433                 IP(src=self.pg0.remote_ip4, dst="10.0.0.4", tos=1) /
434                 UDP(sport=1234, dport=1234) /
435                 Raw(scapy.compat.chb(100) * NUM_PKTS))
436
437         rx = self.send_and_expect(self.pg0, p_m2 * NUM_PKTS, self.pg1)
438
439         for p in rx:
440             self.assertEqual(p[IP].tos, from_mpls)
441
442     def test_qos_vlan(self):
443         """QoS mark/record VLAN """
444
445         #
446         # QoS for all input values
447         #
448         output = [scapy.compat.chb(0)] * 256
449         for i in range(0, 255):
450             output[i] = scapy.compat.chb(255 - i)
451         os = b''.join(output)
452         rows = [{'outputs': os},
453                 {'outputs': os},
454                 {'outputs': os},
455                 {'outputs': os}]
456
457         qem1 = VppQosEgressMap(self, 1, rows).add_vpp_config()
458
459         sub_if = VppDot1QSubint(self, self.pg0, 11)
460
461         sub_if.admin_up()
462         sub_if.config_ip4()
463         sub_if.resolve_arp()
464         sub_if.config_ip6()
465         sub_if.resolve_ndp()
466
467         #
468         # enable VLAN QoS recording/marking on the input Pg0 subinterface and
469         #
470         qr_v = VppQosRecord(
471             self, sub_if,
472             self.QOS_SOURCE.QOS_API_SOURCE_VLAN).add_vpp_config()
473         qm_v = VppQosMark(
474             self, sub_if, qem1,
475             self.QOS_SOURCE.QOS_API_SOURCE_VLAN).add_vpp_config()
476
477         #
478         # IP marking/recording on pg1
479         #
480         qr_ip = VppQosRecord(
481             self, self.pg1,
482             self.QOS_SOURCE.QOS_API_SOURCE_IP).add_vpp_config()
483         qm_ip = VppQosMark(
484             self, self.pg1, qem1,
485             self.QOS_SOURCE.QOS_API_SOURCE_IP).add_vpp_config()
486
487         #
488         # a routes to/from sub-interface
489         #
490         route_10_0_0_1 = VppIpRoute(self, "10.0.0.1", 32,
491                                     [VppRoutePath(sub_if.remote_ip4,
492                                                   sub_if.sw_if_index)])
493         route_10_0_0_1.add_vpp_config()
494         route_10_0_0_2 = VppIpRoute(self, "10.0.0.2", 32,
495                                     [VppRoutePath(self.pg1.remote_ip4,
496                                                   self.pg1.sw_if_index)])
497         route_10_0_0_2.add_vpp_config()
498         route_2001_1 = VppIpRoute(self, "2001::1", 128,
499                                   [VppRoutePath(sub_if.remote_ip6,
500                                                 sub_if.sw_if_index)])
501         route_2001_1.add_vpp_config()
502         route_2001_2 = VppIpRoute(self, "2001::2", 128,
503                                   [VppRoutePath(self.pg1.remote_ip6,
504                                                 self.pg1.sw_if_index)])
505         route_2001_2.add_vpp_config()
506
507         p_v1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
508                 Dot1Q(vlan=11, prio=1) /
509                 IP(src="1.1.1.1", dst="10.0.0.2", tos=1) /
510                 UDP(sport=1234, dport=1234) /
511                 Raw(scapy.compat.chb(100) * NUM_PKTS))
512
513         p_v2 = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
514                 IP(src="1.1.1.1", dst="10.0.0.1", tos=1) /
515                 UDP(sport=1234, dport=1234) /
516                 Raw(scapy.compat.chb(100) * NUM_PKTS))
517
518         rx = self.send_and_expect(self.pg1, p_v2 * NUM_PKTS, self.pg0)
519
520         for p in rx:
521             self.assertEqual(p[Dot1Q].prio, 6)
522
523         rx = self.send_and_expect(self.pg0, p_v1 * NUM_PKTS, self.pg1)
524
525         for p in rx:
526             self.assertEqual(p[IP].tos, 254)
527
528         p_v1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
529                 Dot1Q(vlan=11, prio=2) /
530                 IPv6(src="2001::1", dst="2001::2", tc=1) /
531                 UDP(sport=1234, dport=1234) /
532                 Raw(scapy.compat.chb(100) * NUM_PKTS))
533
534         p_v2 = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
535                 IPv6(src="3001::1", dst="2001::1", tc=1) /
536                 UDP(sport=1234, dport=1234) /
537                 Raw(scapy.compat.chb(100) * NUM_PKTS))
538
539         rx = self.send_and_expect(self.pg1, p_v2 * NUM_PKTS, self.pg0)
540
541         for p in rx:
542             self.assertEqual(p[Dot1Q].prio, 6)
543
544         rx = self.send_and_expect(self.pg0, p_v1 * NUM_PKTS, self.pg1)
545
546         for p in rx:
547             self.assertEqual(p[IPv6].tc, 253)
548
549         #
550         # cleanup
551         #
552         sub_if.unconfig_ip4()
553         sub_if.unconfig_ip6()
554
555
556 if __name__ == '__main__':
557     unittest.main(testRunner=VppTestRunner)