939cca5d8d1010ed0498e2615e41aecf4a0c6265
[vpp.git] / test / test_qos.py
1 #!/usr/bin/env python
2
3 import unittest
4
5 from framework import VppTestCase, VppTestRunner
6 from vpp_papi_provider import QOS_SOURCE
7 from vpp_ip_route import VppIpRoute, VppRoutePath
8
9 from scapy.packet import Raw
10 from scapy.layers.l2 import Ether
11 from scapy.layers.inet import IP, UDP
12 from scapy.layers.inet6 import IPv6
13 from scapy.contrib.mpls import MPLS
14
15
16 class TestQOS(VppTestCase):
17     """ QOS Test Case """
18
19     def setUp(self):
20         super(TestQOS, self).setUp()
21
22         self.create_pg_interfaces(range(5))
23
24         for i in self.pg_interfaces:
25             i.admin_up()
26             i.config_ip4()
27             i.resolve_arp()
28             i.config_ip6()
29             i.resolve_ndp()
30
31     def tearDown(self):
32         for i in self.pg_interfaces:
33             i.unconfig_ip4()
34             i.unconfig_ip6()
35
36         super(TestQOS, self).tearDown()
37
38     def test_qos_ip(self):
39         """ QoS Mark IP """
40
41         #
42         # for table 1 map the n=0xff possible values of input QoS mark,
43         # n to 1-n
44         #
45         output = [chr(0)] * 256
46         for i in range(0, 255):
47             output[i] = chr(255 - i)
48         os = ''.join(output)
49         rows = [{'outputs': os},
50                 {'outputs': os},
51                 {'outputs': os},
52                 {'outputs': os}]
53
54         self.vapi.qos_egress_map_update(1, rows)
55
56         #
57         # For table 2 (and up) use the value n for everything
58         #
59         output = [chr(2)] * 256
60         os = ''.join(output)
61         rows = [{'outputs': os},
62                 {'outputs': os},
63                 {'outputs': os},
64                 {'outputs': os}]
65
66         self.vapi.qos_egress_map_update(2, rows)
67
68         output = [chr(3)] * 256
69         os = ''.join(output)
70         rows = [{'outputs': os},
71                 {'outputs': os},
72                 {'outputs': os},
73                 {'outputs': os}]
74
75         self.vapi.qos_egress_map_update(3, rows)
76
77         output = [chr(4)] * 256
78         os = ''.join(output)
79         rows = [{'outputs': os},
80                 {'outputs': os},
81                 {'outputs': os},
82                 {'outputs': os}]
83         self.vapi.qos_egress_map_update(4, rows)
84         self.vapi.qos_egress_map_update(5, rows)
85         self.vapi.qos_egress_map_update(6, rows)
86         self.vapi.qos_egress_map_update(7, rows)
87
88         self.logger.info(self.vapi.cli("sh qos eg map"))
89
90         #
91         # Bind interface pgN to table n
92         #
93         self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
94                                           QOS_SOURCE.IP,
95                                           1,
96                                           1)
97         self.vapi.qos_mark_enable_disable(self.pg2.sw_if_index,
98                                           QOS_SOURCE.IP,
99                                           2,
100                                           1)
101         self.vapi.qos_mark_enable_disable(self.pg3.sw_if_index,
102                                           QOS_SOURCE.IP,
103                                           3,
104                                           1)
105         self.vapi.qos_mark_enable_disable(self.pg4.sw_if_index,
106                                           QOS_SOURCE.IP,
107                                           4,
108                                           1)
109
110         #
111         # packets ingress on Pg0
112         #
113         p_v4 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
114                 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, tos=1) /
115                 UDP(sport=1234, dport=1234) /
116                 Raw(chr(100) * 65))
117         p_v6 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
118                 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6,
119                      tc=1) /
120                 UDP(sport=1234, dport=1234) /
121                 Raw(chr(100) * 65))
122
123         #
124         # Since we have not yet enabled the recording of the input QoS
125         # from the input iP header, the egress packet's ToS will be unchanged
126         #
127         rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
128         for p in rx:
129             self.assertEqual(p[IP].tos, 1)
130         rx = self.send_and_expect(self.pg0, p_v6 * 65, self.pg1)
131         for p in rx:
132             self.assertEqual(p[IPv6].tc, 1)
133
134         #
135         # Enable QoS recrding on IP input for pg0
136         #
137         self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
138                                             QOS_SOURCE.IP,
139                                             1)
140
141         #
142         # send the same packets, this time expect the input TOS of 1
143         # to be mapped to pg1's egress value of 254
144         #
145         rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
146         for p in rx:
147             self.assertEqual(p[IP].tos, 254)
148         rx = self.send_and_expect(self.pg0, p_v6 * 65, self.pg1)
149         for p in rx:
150             self.assertEqual(p[IPv6].tc, 254)
151
152         #
153         # different input ToS to test the mapping
154         #
155         p_v4[IP].tos = 127
156         rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
157         for p in rx:
158             self.assertEqual(p[IP].tos, 128)
159         p_v6[IPv6].tc = 127
160         rx = self.send_and_expect(self.pg0, p_v6 * 65, self.pg1)
161         for p in rx:
162             self.assertEqual(p[IPv6].tc, 128)
163
164         p_v4[IP].tos = 254
165         rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
166         for p in rx:
167             self.assertEqual(p[IP].tos, 1)
168         p_v6[IPv6].tc = 254
169         rx = self.send_and_expect(self.pg0, p_v6 * 65, self.pg1)
170         for p in rx:
171             self.assertEqual(p[IPv6].tc, 1)
172
173         #
174         # send packets out the other interfaces to test the maps are
175         # correctly applied
176         #
177         p_v4[IP].dst = self.pg2.remote_ip4
178         rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg2)
179         for p in rx:
180             self.assertEqual(p[IP].tos, 2)
181
182         p_v4[IP].dst = self.pg3.remote_ip4
183         rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg3)
184         for p in rx:
185             self.assertEqual(p[IP].tos, 3)
186
187         p_v6[IPv6].dst = self.pg3.remote_ip6
188         rx = self.send_and_expect(self.pg0, p_v6 * 65, self.pg3)
189         for p in rx:
190             self.assertEqual(p[IPv6].tc, 3)
191
192         #
193         # remove the map on pg2 and pg3, now expect an unchanged IP tos
194         #
195         self.vapi.qos_mark_enable_disable(self.pg2.sw_if_index,
196                                           QOS_SOURCE.IP,
197                                           2,
198                                           0)
199         self.vapi.qos_mark_enable_disable(self.pg3.sw_if_index,
200                                           QOS_SOURCE.IP,
201                                           3,
202                                           0)
203         self.logger.info(self.vapi.cli("sh int feat pg2"))
204
205         p_v4[IP].dst = self.pg2.remote_ip4
206         rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg2)
207         for p in rx:
208             self.assertEqual(p[IP].tos, 254)
209
210         p_v4[IP].dst = self.pg3.remote_ip4
211         rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg3)
212         for p in rx:
213             self.assertEqual(p[IP].tos, 254)
214
215         #
216         # still mapping out of pg1
217         #
218         p_v4[IP].dst = self.pg1.remote_ip4
219         rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
220         for p in rx:
221             self.assertEqual(p[IP].tos, 1)
222
223         #
224         # disable the input recording on pg0
225         #
226         self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
227                                             QOS_SOURCE.IP,
228                                             0)
229
230         #
231         # back to an unchanged TOS value
232         #
233         rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
234         for p in rx:
235             self.assertEqual(p[IP].tos, 254)
236
237         #
238         # disable the egress map on pg1 and pg4
239         #
240         self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
241                                           QOS_SOURCE.IP,
242                                           1,
243                                           0)
244         self.vapi.qos_mark_enable_disable(self.pg4.sw_if_index,
245                                           QOS_SOURCE.IP,
246                                           4,
247                                           0)
248
249         #
250         # unchanged Tos on pg1
251         #
252         rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
253         for p in rx:
254             self.assertEqual(p[IP].tos, 254)
255
256         #
257         # clean-up the masp
258         #
259         self.vapi.qos_egress_map_delete(1)
260         self.vapi.qos_egress_map_delete(4)
261         self.vapi.qos_egress_map_delete(2)
262         self.vapi.qos_egress_map_delete(3)
263         self.vapi.qos_egress_map_delete(5)
264         self.vapi.qos_egress_map_delete(6)
265         self.vapi.qos_egress_map_delete(7)
266
267     def test_qos_mpls(self):
268         """ QoS Mark MPLS """
269
270         #
271         # 255 QoS for all input values
272         #
273         output = [chr(255)] * 256
274         os = ''.join(output)
275         rows = [{'outputs': os},
276                 {'outputs': os},
277                 {'outputs': os},
278                 {'outputs': os}]
279
280         self.vapi.qos_egress_map_update(1, rows)
281
282         #
283         # a route with 1 MPLS label
284         #
285         route_10_0_0_1 = VppIpRoute(self, "10.0.0.1", 32,
286                                     [VppRoutePath(self.pg1.remote_ip4,
287                                                   self.pg1.sw_if_index,
288                                                   labels=[32])])
289         route_10_0_0_1.add_vpp_config()
290
291         #
292         # a route with 3 MPLS labels
293         #
294         route_10_0_0_3 = VppIpRoute(self, "10.0.0.3", 32,
295                                     [VppRoutePath(self.pg1.remote_ip4,
296                                                   self.pg1.sw_if_index,
297                                                   labels=[63, 33, 34])])
298         route_10_0_0_3.add_vpp_config()
299
300         #
301         # enable IP QoS recording on the input Pg0 and MPLS egress marking
302         # on Pg1
303         #
304         self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
305                                             QOS_SOURCE.IP,
306                                             1)
307         self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
308                                           QOS_SOURCE.MPLS,
309                                           1,
310                                           1)
311
312         #
313         # packet that will get one label added and 3 labels added resp.
314         #
315         p_1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
316                IP(src=self.pg0.remote_ip4, dst="10.0.0.1", tos=1) /
317                UDP(sport=1234, dport=1234) /
318                Raw(chr(100) * 65))
319         p_3 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
320                IP(src=self.pg0.remote_ip4, dst="10.0.0.3", tos=1) /
321                UDP(sport=1234, dport=1234) /
322                Raw(chr(100) * 65))
323
324         rx = self.send_and_expect(self.pg0, p_1 * 65, self.pg1)
325
326         #
327         # only 3 bits of ToS value in MPLS make sure tos is correct
328         # and the label and EOS bit have not been corrupted
329         #
330         for p in rx:
331             self.assertEqual(p[MPLS].cos, 7)
332             self.assertEqual(p[MPLS].label, 32)
333             self.assertEqual(p[MPLS].s, 1)
334         rx = self.send_and_expect(self.pg0, p_3 * 65, self.pg1)
335         for p in rx:
336             self.assertEqual(p[MPLS].cos, 7)
337             self.assertEqual(p[MPLS].label, 63)
338             self.assertEqual(p[MPLS].s, 0)
339             h = p[MPLS].payload
340             self.assertEqual(h[MPLS].cos, 7)
341             self.assertEqual(h[MPLS].label, 33)
342             self.assertEqual(h[MPLS].s, 0)
343             h = h[MPLS].payload
344             self.assertEqual(h[MPLS].cos, 7)
345             self.assertEqual(h[MPLS].label, 34)
346             self.assertEqual(h[MPLS].s, 1)
347
348         #
349         # cleanup
350         #
351         self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
352                                             QOS_SOURCE.IP,
353                                             0)
354         self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
355                                           QOS_SOURCE.MPLS,
356                                           1,
357                                           0)
358         self.vapi.qos_egress_map_delete(1)
359
360
361 if __name__ == '__main__':
362     unittest.main(testRunner=VppTestRunner)