QoS recording and marking
[vpp.git] / test / test_qos.py
1 #!/usr/bin/env python
2
3 import unittest
4 import socket
5 import struct
6
7 from framework import VppTestCase, VppTestRunner
8 from vpp_object import VppObject
9 from vpp_papi_provider import QOS_SOURCE
10 from vpp_ip_route import VppIpRoute, VppRoutePath, VppMplsRoute
11 from vpp_sub_interface import VppSubInterface, VppDot1QSubint
12
13 from scapy.packet import Raw
14 from scapy.layers.l2 import Ether, Dot1Q
15 from scapy.layers.inet import IP, UDP
16 from scapy.layers.inet6 import IPv6
17 from scapy.contrib.mpls import MPLS
18
19
20 class TestQOS(VppTestCase):
21     """ QOS Test Case """
22
23     def setUp(self):
24         super(TestQOS, self).setUp()
25
26         self.create_pg_interfaces(range(5))
27
28         for i in self.pg_interfaces:
29             i.admin_up()
30             i.config_ip4()
31             i.resolve_arp()
32             i.config_ip6()
33             i.resolve_ndp()
34
35     def tearDown(self):
36         for i in self.pg_interfaces:
37             i.unconfig_ip4()
38             i.unconfig_ip6()
39
40         super(TestQOS, self).tearDown()
41
42     def test_qos_ip(self):
43         """ QoS Mark IP """
44
45         #
46         # for table 1 map the n=0xff possible values of input QoS mark,
47         # n to 1-n
48         #
49         output = [chr(0)] * 256
50         for i in range(0, 255):
51             output[i] = chr(255 - i)
52         os = ''.join(output)
53         rows = [{'outputs': os},
54                 {'outputs': os},
55                 {'outputs': os},
56                 {'outputs': os}]
57
58         self.vapi.qos_egress_map_update(1, rows)
59
60         #
61         # For table 2 (and up) use the value n for everything
62         #
63         output = [chr(2)] * 256
64         os = ''.join(output)
65         rows = [{'outputs': os},
66                 {'outputs': os},
67                 {'outputs': os},
68                 {'outputs': os}]
69
70         self.vapi.qos_egress_map_update(2, rows)
71
72         output = [chr(3)] * 256
73         os = ''.join(output)
74         rows = [{'outputs': os},
75                 {'outputs': os},
76                 {'outputs': os},
77                 {'outputs': os}]
78
79         self.vapi.qos_egress_map_update(3, rows)
80
81         output = [chr(4)] * 256
82         os = ''.join(output)
83         rows = [{'outputs': os},
84                 {'outputs': os},
85                 {'outputs': os},
86                 {'outputs': os}]
87         self.vapi.qos_egress_map_update(4, rows)
88         self.vapi.qos_egress_map_update(5, rows)
89         self.vapi.qos_egress_map_update(6, rows)
90         self.vapi.qos_egress_map_update(7, rows)
91
92         self.logger.info(self.vapi.cli("sh qos eg map"))
93
94         #
95         # Bind interface pgN to table n
96         #
97         self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
98                                           QOS_SOURCE.IP,
99                                           1,
100                                           1)
101         self.vapi.qos_mark_enable_disable(self.pg2.sw_if_index,
102                                           QOS_SOURCE.IP,
103                                           2,
104                                           1)
105         self.vapi.qos_mark_enable_disable(self.pg3.sw_if_index,
106                                           QOS_SOURCE.IP,
107                                           3,
108                                           1)
109         self.vapi.qos_mark_enable_disable(self.pg4.sw_if_index,
110                                           QOS_SOURCE.IP,
111                                           4,
112                                           1)
113
114         #
115         # packets ingress on Pg0
116         #
117         p_v4 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
118                 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, tos=1) /
119                 UDP(sport=1234, dport=1234) /
120                 Raw(chr(100) * 65))
121         p_v6 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
122                 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6,
123                      tc=1) /
124                 UDP(sport=1234, dport=1234) /
125                 Raw(chr(100) * 65))
126
127         #
128         # Since we have not yet enabled the recording of the input QoS
129         # from the input iP header, the egress packet's ToS will be unchanged
130         #
131         rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
132         for p in rx:
133             self.assertEqual(p[IP].tos, 1)
134         rx = self.send_and_expect(self.pg0, p_v6 * 65, self.pg1)
135         for p in rx:
136             self.assertEqual(p[IPv6].tc, 1)
137
138         #
139         # Enable QoS recrding on IP input for pg0
140         #
141         self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
142                                             QOS_SOURCE.IP,
143                                             1)
144
145         #
146         # send the same packets, this time expect the input TOS of 1
147         # to be mapped to pg1's egress value of 254
148         #
149         rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
150         for p in rx:
151             self.assertEqual(p[IP].tos, 254)
152         rx = self.send_and_expect(self.pg0, p_v6 * 65, self.pg1)
153         for p in rx:
154             self.assertEqual(p[IPv6].tc, 254)
155
156         #
157         # different input ToS to test the mapping
158         #
159         p_v4[IP].tos = 127
160         rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
161         for p in rx:
162             self.assertEqual(p[IP].tos, 128)
163         p_v6[IPv6].tc = 127
164         rx = self.send_and_expect(self.pg0, p_v6 * 65, self.pg1)
165         for p in rx:
166             self.assertEqual(p[IPv6].tc, 128)
167
168         p_v4[IP].tos = 254
169         rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
170         for p in rx:
171             self.assertEqual(p[IP].tos, 1)
172         p_v6[IPv6].tc = 254
173         rx = self.send_and_expect(self.pg0, p_v6 * 65, self.pg1)
174         for p in rx:
175             self.assertEqual(p[IPv6].tc, 1)
176
177         #
178         # send packets out the other interfaces to test the maps are
179         # correctly applied
180         #
181         p_v4[IP].dst = self.pg2.remote_ip4
182         rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg2)
183         for p in rx:
184             self.assertEqual(p[IP].tos, 2)
185
186         p_v4[IP].dst = self.pg3.remote_ip4
187         rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg3)
188         for p in rx:
189             self.assertEqual(p[IP].tos, 3)
190
191         p_v6[IPv6].dst = self.pg3.remote_ip6
192         rx = self.send_and_expect(self.pg0, p_v6 * 65, self.pg3)
193         for p in rx:
194             self.assertEqual(p[IPv6].tc, 3)
195
196         #
197         # remove the map on pg2 and pg3, now expect an unchanged IP tos
198         #
199         self.vapi.qos_mark_enable_disable(self.pg2.sw_if_index,
200                                           QOS_SOURCE.IP,
201                                           2,
202                                           0)
203         self.vapi.qos_mark_enable_disable(self.pg3.sw_if_index,
204                                           QOS_SOURCE.IP,
205                                           3,
206                                           0)
207         self.logger.info(self.vapi.cli("sh int feat pg2"))
208
209         p_v4[IP].dst = self.pg2.remote_ip4
210         rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg2)
211         for p in rx:
212             self.assertEqual(p[IP].tos, 254)
213
214         p_v4[IP].dst = self.pg3.remote_ip4
215         rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg3)
216         for p in rx:
217             self.assertEqual(p[IP].tos, 254)
218
219         #
220         # still mapping out of pg1
221         #
222         p_v4[IP].dst = self.pg1.remote_ip4
223         rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
224         for p in rx:
225             self.assertEqual(p[IP].tos, 1)
226
227         #
228         # disable the input recording on pg0
229         #
230         self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
231                                             QOS_SOURCE.IP,
232                                             0)
233
234         #
235         # back to an unchanged TOS value
236         #
237         rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
238         for p in rx:
239             self.assertEqual(p[IP].tos, 254)
240
241         #
242         # disable the egress map on pg1 and pg4
243         #
244         self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
245                                           QOS_SOURCE.IP,
246                                           1,
247                                           0)
248         self.vapi.qos_mark_enable_disable(self.pg4.sw_if_index,
249                                           QOS_SOURCE.IP,
250                                           4,
251                                           0)
252
253         #
254         # unchanged Tos on pg1
255         #
256         rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
257         for p in rx:
258             self.assertEqual(p[IP].tos, 254)
259
260         #
261         # clean-up the masp
262         #
263         self.vapi.qos_egress_map_delete(1)
264         self.vapi.qos_egress_map_delete(4)
265         self.vapi.qos_egress_map_delete(2)
266         self.vapi.qos_egress_map_delete(3)
267         self.vapi.qos_egress_map_delete(5)
268         self.vapi.qos_egress_map_delete(6)
269         self.vapi.qos_egress_map_delete(7)
270
271     def test_qos_mpls(self):
272         """ QoS Mark MPLS """
273
274         #
275         # 255 QoS for all input values
276         #
277         output = [chr(255)] * 256
278         os = ''.join(output)
279         rows = [{'outputs': os},
280                 {'outputs': os},
281                 {'outputs': os},
282                 {'outputs': os}]
283
284         self.vapi.qos_egress_map_update(1, rows)
285
286         #
287         # a route with 1 MPLS label
288         #
289         route_10_0_0_1 = VppIpRoute(self, "10.0.0.1", 32,
290                                     [VppRoutePath(self.pg1.remote_ip4,
291                                                   self.pg1.sw_if_index,
292                                                   labels=[32])])
293         route_10_0_0_1.add_vpp_config()
294
295         #
296         # a route with 3 MPLS labels
297         #
298         route_10_0_0_3 = VppIpRoute(self, "10.0.0.3", 32,
299                                     [VppRoutePath(self.pg1.remote_ip4,
300                                                   self.pg1.sw_if_index,
301                                                   labels=[63, 33, 34])])
302         route_10_0_0_3.add_vpp_config()
303
304         #
305         # enable IP QoS recording on the input Pg0 and MPLS egress marking
306         # on Pg1
307         #
308         self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
309                                             QOS_SOURCE.IP,
310                                             1)
311         self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
312                                           QOS_SOURCE.MPLS,
313                                           1,
314                                           1)
315
316         #
317         # packet that will get one label added and 3 labels added resp.
318         #
319         p_1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
320                IP(src=self.pg0.remote_ip4, dst="10.0.0.1", tos=1) /
321                UDP(sport=1234, dport=1234) /
322                Raw(chr(100) * 65))
323         p_3 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
324                IP(src=self.pg0.remote_ip4, dst="10.0.0.3", tos=1) /
325                UDP(sport=1234, dport=1234) /
326                Raw(chr(100) * 65))
327
328         rx = self.send_and_expect(self.pg0, p_1 * 65, self.pg1)
329
330         #
331         # only 3 bits of ToS value in MPLS make sure tos is correct
332         # and the label and EOS bit have not been corrupted
333         #
334         for p in rx:
335             self.assertEqual(p[MPLS].cos, 7)
336             self.assertEqual(p[MPLS].label, 32)
337             self.assertEqual(p[MPLS].s, 1)
338         rx = self.send_and_expect(self.pg0, p_3 * 65, self.pg1)
339         for p in rx:
340             self.assertEqual(p[MPLS].cos, 7)
341             self.assertEqual(p[MPLS].label, 63)
342             self.assertEqual(p[MPLS].s, 0)
343             h = p[MPLS].payload
344             self.assertEqual(h[MPLS].cos, 7)
345             self.assertEqual(h[MPLS].label, 33)
346             self.assertEqual(h[MPLS].s, 0)
347             h = h[MPLS].payload
348             self.assertEqual(h[MPLS].cos, 7)
349             self.assertEqual(h[MPLS].label, 34)
350             self.assertEqual(h[MPLS].s, 1)
351
352         #
353         # cleanup
354         #
355         self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
356                                             QOS_SOURCE.IP,
357                                             0)
358         self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
359                                           QOS_SOURCE.MPLS,
360                                           1,
361                                           0)
362         self.vapi.qos_egress_map_delete(1)
363
364
365 if __name__ == '__main__':
366     unittest.main(testRunner=VppTestRunner)