IGMP: proxy device
[vpp.git] / test / test_qos.py
1 #!/usr/bin/env python
2
3 import unittest
4
5 from framework import VppTestCase, VppTestRunner
6 from vpp_papi_provider import QOS_SOURCE
7 from vpp_sub_interface import VppDot1QSubint
8 from vpp_ip import DpoProto
9 from vpp_ip_route import VppIpRoute, VppRoutePath, VppMplsRoute, \
10     VppMplsLabel, VppMplsTable
11
12 from scapy.packet import Raw
13 from scapy.layers.l2 import Ether, Dot1Q
14 from scapy.layers.inet import IP, UDP
15 from scapy.layers.inet6 import IPv6
16 from scapy.contrib.mpls import MPLS
17
18
19 class TestQOS(VppTestCase):
20     """ QOS Test Case """
21
22     def setUp(self):
23         super(TestQOS, self).setUp()
24
25         self.create_pg_interfaces(range(5))
26
27         tbl = VppMplsTable(self, 0)
28         tbl.add_vpp_config()
29
30         for i in self.pg_interfaces:
31             i.admin_up()
32             i.config_ip4()
33             i.resolve_arp()
34             i.config_ip6()
35             i.resolve_ndp()
36             i.enable_mpls()
37
38     def tearDown(self):
39         for i in self.pg_interfaces:
40             i.unconfig_ip4()
41             i.unconfig_ip6()
42             i.disable_mpls()
43
44         super(TestQOS, self).tearDown()
45
46     def test_qos_ip(self):
47         """ QoS Mark/Record IP """
48
49         #
50         # for table 1 map the n=0xff possible values of input QoS mark,
51         # n to 1-n
52         #
53         output = [chr(0)] * 256
54         for i in range(0, 255):
55             output[i] = chr(255 - i)
56         os = ''.join(output)
57         rows = [{'outputs': os},
58                 {'outputs': os},
59                 {'outputs': os},
60                 {'outputs': os}]
61
62         self.vapi.qos_egress_map_update(1, rows)
63
64         #
65         # For table 2 (and up) use the value n for everything
66         #
67         output = [chr(2)] * 256
68         os = ''.join(output)
69         rows = [{'outputs': os},
70                 {'outputs': os},
71                 {'outputs': os},
72                 {'outputs': os}]
73
74         self.vapi.qos_egress_map_update(2, rows)
75
76         output = [chr(3)] * 256
77         os = ''.join(output)
78         rows = [{'outputs': os},
79                 {'outputs': os},
80                 {'outputs': os},
81                 {'outputs': os}]
82
83         self.vapi.qos_egress_map_update(3, rows)
84
85         output = [chr(4)] * 256
86         os = ''.join(output)
87         rows = [{'outputs': os},
88                 {'outputs': os},
89                 {'outputs': os},
90                 {'outputs': os}]
91         self.vapi.qos_egress_map_update(4, rows)
92         self.vapi.qos_egress_map_update(5, rows)
93         self.vapi.qos_egress_map_update(6, rows)
94         self.vapi.qos_egress_map_update(7, rows)
95
96         self.logger.info(self.vapi.cli("sh qos eg map"))
97
98         #
99         # Bind interface pgN to table n
100         #
101         self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
102                                           QOS_SOURCE.IP,
103                                           1,
104                                           1)
105         self.vapi.qos_mark_enable_disable(self.pg2.sw_if_index,
106                                           QOS_SOURCE.IP,
107                                           2,
108                                           1)
109         self.vapi.qos_mark_enable_disable(self.pg3.sw_if_index,
110                                           QOS_SOURCE.IP,
111                                           3,
112                                           1)
113         self.vapi.qos_mark_enable_disable(self.pg4.sw_if_index,
114                                           QOS_SOURCE.IP,
115                                           4,
116                                           1)
117
118         #
119         # packets ingress on Pg0
120         #
121         p_v4 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
122                 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, tos=1) /
123                 UDP(sport=1234, dport=1234) /
124                 Raw(chr(100) * 65))
125         p_v6 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
126                 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6,
127                      tc=1) /
128                 UDP(sport=1234, dport=1234) /
129                 Raw(chr(100) * 65))
130
131         #
132         # Since we have not yet enabled the recording of the input QoS
133         # from the input iP header, the egress packet's ToS will be unchanged
134         #
135         rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
136         for p in rx:
137             self.assertEqual(p[IP].tos, 1)
138         rx = self.send_and_expect(self.pg0, p_v6 * 65, self.pg1)
139         for p in rx:
140             self.assertEqual(p[IPv6].tc, 1)
141
142         #
143         # Enable QoS recrding on IP input for pg0
144         #
145         self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
146                                             QOS_SOURCE.IP,
147                                             1)
148
149         #
150         # send the same packets, this time expect the input TOS of 1
151         # to be mapped to pg1's egress value of 254
152         #
153         rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
154         for p in rx:
155             self.assertEqual(p[IP].tos, 254)
156         rx = self.send_and_expect(self.pg0, p_v6 * 65, self.pg1)
157         for p in rx:
158             self.assertEqual(p[IPv6].tc, 254)
159
160         #
161         # different input ToS to test the mapping
162         #
163         p_v4[IP].tos = 127
164         rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
165         for p in rx:
166             self.assertEqual(p[IP].tos, 128)
167         p_v6[IPv6].tc = 127
168         rx = self.send_and_expect(self.pg0, p_v6 * 65, self.pg1)
169         for p in rx:
170             self.assertEqual(p[IPv6].tc, 128)
171
172         p_v4[IP].tos = 254
173         rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
174         for p in rx:
175             self.assertEqual(p[IP].tos, 1)
176         p_v6[IPv6].tc = 254
177         rx = self.send_and_expect(self.pg0, p_v6 * 65, self.pg1)
178         for p in rx:
179             self.assertEqual(p[IPv6].tc, 1)
180
181         #
182         # send packets out the other interfaces to test the maps are
183         # correctly applied
184         #
185         p_v4[IP].dst = self.pg2.remote_ip4
186         rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg2)
187         for p in rx:
188             self.assertEqual(p[IP].tos, 2)
189
190         p_v4[IP].dst = self.pg3.remote_ip4
191         rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg3)
192         for p in rx:
193             self.assertEqual(p[IP].tos, 3)
194
195         p_v6[IPv6].dst = self.pg3.remote_ip6
196         rx = self.send_and_expect(self.pg0, p_v6 * 65, self.pg3)
197         for p in rx:
198             self.assertEqual(p[IPv6].tc, 3)
199
200         #
201         # remove the map on pg2 and pg3, now expect an unchanged IP tos
202         #
203         self.vapi.qos_mark_enable_disable(self.pg2.sw_if_index,
204                                           QOS_SOURCE.IP,
205                                           2,
206                                           0)
207         self.vapi.qos_mark_enable_disable(self.pg3.sw_if_index,
208                                           QOS_SOURCE.IP,
209                                           3,
210                                           0)
211         self.logger.info(self.vapi.cli("sh int feat pg2"))
212
213         p_v4[IP].dst = self.pg2.remote_ip4
214         rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg2)
215         for p in rx:
216             self.assertEqual(p[IP].tos, 254)
217
218         p_v4[IP].dst = self.pg3.remote_ip4
219         rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg3)
220         for p in rx:
221             self.assertEqual(p[IP].tos, 254)
222
223         #
224         # still mapping out of pg1
225         #
226         p_v4[IP].dst = self.pg1.remote_ip4
227         rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
228         for p in rx:
229             self.assertEqual(p[IP].tos, 1)
230
231         #
232         # disable the input recording on pg0
233         #
234         self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
235                                             QOS_SOURCE.IP,
236                                             0)
237
238         #
239         # back to an unchanged TOS value
240         #
241         rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
242         for p in rx:
243             self.assertEqual(p[IP].tos, 254)
244
245         #
246         # disable the egress map on pg1 and pg4
247         #
248         self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
249                                           QOS_SOURCE.IP,
250                                           1,
251                                           0)
252         self.vapi.qos_mark_enable_disable(self.pg4.sw_if_index,
253                                           QOS_SOURCE.IP,
254                                           4,
255                                           0)
256
257         #
258         # unchanged Tos on pg1
259         #
260         rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
261         for p in rx:
262             self.assertEqual(p[IP].tos, 254)
263
264         #
265         # clean-up the masp
266         #
267         self.vapi.qos_egress_map_delete(1)
268         self.vapi.qos_egress_map_delete(4)
269         self.vapi.qos_egress_map_delete(2)
270         self.vapi.qos_egress_map_delete(3)
271         self.vapi.qos_egress_map_delete(5)
272         self.vapi.qos_egress_map_delete(6)
273         self.vapi.qos_egress_map_delete(7)
274
275     def test_qos_mpls(self):
276         """ QoS Mark/Record MPLS """
277
278         #
279         # 255 QoS for all input values
280         #
281         output = [chr(255)] * 256
282         os = ''.join(output)
283         rows = [{'outputs': os},
284                 {'outputs': os},
285                 {'outputs': os},
286                 {'outputs': os}]
287
288         self.vapi.qos_egress_map_update(1, rows)
289
290         #
291         # a route with 1 MPLS label
292         #
293         route_10_0_0_1 = VppIpRoute(self, "10.0.0.1", 32,
294                                     [VppRoutePath(self.pg1.remote_ip4,
295                                                   self.pg1.sw_if_index,
296                                                   labels=[32])])
297         route_10_0_0_1.add_vpp_config()
298
299         #
300         # a route with 3 MPLS labels
301         #
302         route_10_0_0_3 = VppIpRoute(self, "10.0.0.3", 32,
303                                     [VppRoutePath(self.pg1.remote_ip4,
304                                                   self.pg1.sw_if_index,
305                                                   labels=[63, 33, 34])])
306         route_10_0_0_3.add_vpp_config()
307
308         #
309         # enable IP QoS recording on the input Pg0 and MPLS egress marking
310         # on Pg1
311         #
312         self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
313                                             QOS_SOURCE.IP,
314                                             1)
315         self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
316                                           QOS_SOURCE.MPLS,
317                                           1,
318                                           1)
319
320         #
321         # packet that will get one label added and 3 labels added resp.
322         #
323         p_1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
324                IP(src=self.pg0.remote_ip4, dst="10.0.0.1", tos=1) /
325                UDP(sport=1234, dport=1234) /
326                Raw(chr(100) * 65))
327         p_3 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
328                IP(src=self.pg0.remote_ip4, dst="10.0.0.3", tos=1) /
329                UDP(sport=1234, dport=1234) /
330                Raw(chr(100) * 65))
331
332         rx = self.send_and_expect(self.pg0, p_1 * 65, self.pg1)
333
334         #
335         # only 3 bits of ToS value in MPLS make sure tos is correct
336         # and the label and EOS bit have not been corrupted
337         #
338         for p in rx:
339             self.assertEqual(p[MPLS].cos, 7)
340             self.assertEqual(p[MPLS].label, 32)
341             self.assertEqual(p[MPLS].s, 1)
342         rx = self.send_and_expect(self.pg0, p_3 * 65, self.pg1)
343         for p in rx:
344             self.assertEqual(p[MPLS].cos, 7)
345             self.assertEqual(p[MPLS].label, 63)
346             self.assertEqual(p[MPLS].s, 0)
347             h = p[MPLS].payload
348             self.assertEqual(h[MPLS].cos, 7)
349             self.assertEqual(h[MPLS].label, 33)
350             self.assertEqual(h[MPLS].s, 0)
351             h = h[MPLS].payload
352             self.assertEqual(h[MPLS].cos, 7)
353             self.assertEqual(h[MPLS].label, 34)
354             self.assertEqual(h[MPLS].s, 1)
355
356         #
357         # enable MPLS QoS recording on the input Pg0 and IP egress marking
358         # on Pg1
359         #
360         self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
361                                             QOS_SOURCE.MPLS,
362                                             1)
363         self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
364                                           QOS_SOURCE.IP,
365                                           1,
366                                           1)
367
368         #
369         # MPLS x-connect - COS is preserved
370         #
371         route_32_eos = VppMplsRoute(self, 32, 1,
372                                     [VppRoutePath(self.pg1.remote_ip4,
373                                                   self.pg1.sw_if_index,
374                                                   labels=[VppMplsLabel(33)])])
375         route_32_eos.add_vpp_config()
376
377         p_m1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
378                 MPLS(label=32, cos=3, ttl=2) /
379                 IP(src=self.pg0.remote_ip4, dst="10.0.0.1", tos=1) /
380                 UDP(sport=1234, dport=1234) /
381                 Raw(chr(100) * 65))
382
383         rx = self.send_and_expect(self.pg0, p_m1 * 65, self.pg1)
384         for p in rx:
385             self.assertEqual(p[MPLS].cos, 7)
386             self.assertEqual(p[MPLS].label, 33)
387             self.assertEqual(p[MPLS].s, 1)
388
389         #
390         # MPLS deag - COS is copied from MPLS to IP
391         #
392         route_33_eos = VppMplsRoute(self, 33, 1,
393                                     [VppRoutePath("0.0.0.0",
394                                                   0xffffffff,
395                                                   nh_table_id=0)])
396         route_33_eos.add_vpp_config()
397
398         route_10_0_0_4 = VppIpRoute(self, "10.0.0.4", 32,
399                                     [VppRoutePath(self.pg1.remote_ip4,
400                                                   self.pg1.sw_if_index)])
401         route_10_0_0_4.add_vpp_config()
402
403         p_m2 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
404                 MPLS(label=33, ttl=2, cos=3) /
405                 IP(src=self.pg0.remote_ip4, dst="10.0.0.4", tos=1) /
406                 UDP(sport=1234, dport=1234) /
407                 Raw(chr(100) * 65))
408
409         rx = self.send_and_expect(self.pg0, p_m2 * 65, self.pg1)
410
411         for p in rx:
412             self.assertEqual(p[IP].tos, 255)
413
414         #
415         # cleanup
416         #
417         self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
418                                             QOS_SOURCE.IP,
419                                             0)
420         self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
421                                           QOS_SOURCE.MPLS,
422                                           1,
423                                           0)
424         self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
425                                             QOS_SOURCE.MPLS,
426                                             0)
427         self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
428                                           QOS_SOURCE.IP,
429                                           1,
430                                           0)
431         self.vapi.qos_egress_map_delete(1)
432
433     def test_qos_vlan(self):
434         """QoS mark/record VLAN """
435
436         #
437         # QoS for all input values
438         #
439         output = [chr(0)] * 256
440         for i in range(0, 255):
441             output[i] = chr(255 - i)
442         os = ''.join(output)
443         rows = [{'outputs': os},
444                 {'outputs': os},
445                 {'outputs': os},
446                 {'outputs': os}]
447
448         self.vapi.qos_egress_map_update(1, rows)
449
450         sub_if = VppDot1QSubint(self, self.pg0, 11)
451
452         sub_if.admin_up()
453         sub_if.config_ip4()
454         sub_if.resolve_arp()
455         sub_if.config_ip6()
456         sub_if.resolve_ndp()
457
458         #
459         # enable VLAN QoS recording/marking on the input Pg0 subinterface and
460         #
461         self.vapi.qos_record_enable_disable(sub_if.sw_if_index,
462                                             QOS_SOURCE.VLAN,
463                                             1)
464         self.vapi.qos_mark_enable_disable(sub_if.sw_if_index,
465                                           QOS_SOURCE.VLAN,
466                                           1,
467                                           1)
468
469         #
470         # IP marking/recording on pg1
471         #
472         self.vapi.qos_record_enable_disable(self.pg1.sw_if_index,
473                                             QOS_SOURCE.IP,
474                                             1)
475         self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
476                                           QOS_SOURCE.IP,
477                                           1,
478                                           1)
479
480         #
481         # a routes to/from sub-interface
482         #
483         route_10_0_0_1 = VppIpRoute(self, "10.0.0.1", 32,
484                                     [VppRoutePath(sub_if.remote_ip4,
485                                                   sub_if.sw_if_index)])
486         route_10_0_0_1.add_vpp_config()
487         route_10_0_0_2 = VppIpRoute(self, "10.0.0.2", 32,
488                                     [VppRoutePath(self.pg1.remote_ip4,
489                                                   self.pg1.sw_if_index)])
490         route_10_0_0_2.add_vpp_config()
491         route_2001_1 = VppIpRoute(self, "2001::1", 128,
492                                   [VppRoutePath(sub_if.remote_ip6,
493                                                 sub_if.sw_if_index,
494                                                 proto=DpoProto.DPO_PROTO_IP6)],
495                                   is_ip6=1)
496         route_2001_1.add_vpp_config()
497         route_2001_2 = VppIpRoute(self, "2001::2", 128,
498                                   [VppRoutePath(self.pg1.remote_ip6,
499                                                 self.pg1.sw_if_index,
500                                                 proto=DpoProto.DPO_PROTO_IP6)],
501                                   is_ip6=1)
502         route_2001_2.add_vpp_config()
503
504         p_v1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
505                 Dot1Q(vlan=11, prio=1) /
506                 IP(src="1.1.1.1", dst="10.0.0.2", tos=1) /
507                 UDP(sport=1234, dport=1234) /
508                 Raw(chr(100) * 65))
509
510         p_v2 = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
511                 IP(src="1.1.1.1", dst="10.0.0.1", tos=1) /
512                 UDP(sport=1234, dport=1234) /
513                 Raw(chr(100) * 65))
514
515         rx = self.send_and_expect(self.pg1, p_v2 * 65, self.pg0)
516
517         for p in rx:
518             self.assertEqual(p[Dot1Q].prio, 6)
519
520         rx = self.send_and_expect(self.pg0, p_v1 * 65, self.pg1)
521
522         for p in rx:
523             self.assertEqual(p[IP].tos, 254)
524
525         p_v1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
526                 Dot1Q(vlan=11, prio=2) /
527                 IPv6(src="2001::1", dst="2001::2", tc=1) /
528                 UDP(sport=1234, dport=1234) /
529                 Raw(chr(100) * 65))
530
531         p_v2 = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
532                 IPv6(src="3001::1", dst="2001::1", tc=1) /
533                 UDP(sport=1234, dport=1234) /
534                 Raw(chr(100) * 65))
535
536         rx = self.send_and_expect(self.pg1, p_v2 * 65, self.pg0)
537
538         for p in rx:
539             self.assertEqual(p[Dot1Q].prio, 6)
540
541         rx = self.send_and_expect(self.pg0, p_v1 * 65, self.pg1)
542
543         for p in rx:
544             self.assertEqual(p[IPv6].tc, 253)
545
546         #
547         # cleanup
548         #
549         sub_if.unconfig_ip4()
550         sub_if.unconfig_ip6()
551
552         self.vapi.qos_record_enable_disable(sub_if.sw_if_index,
553                                             QOS_SOURCE.VLAN,
554                                             0)
555         self.vapi.qos_mark_enable_disable(sub_if.sw_if_index,
556                                           QOS_SOURCE.VLAN,
557                                           1,
558                                           0)
559         self.vapi.qos_record_enable_disable(self.pg1.sw_if_index,
560                                             QOS_SOURCE.IP,
561                                             0)
562         self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
563                                           QOS_SOURCE.IP,
564                                           1,
565                                           0)
566
567
568 if __name__ == '__main__':
569     unittest.main(testRunner=VppTestRunner)