tests: Use errno value rather than a specific int
[vpp.git] / test / test_srmpls.py
1 #!/usr/bin/env python3
2
3 import unittest
4
5 from framework import VppTestCase
6 from asfframework import VppTestRunner
7 from vpp_ip_route import (
8     VppIpRoute,
9     VppRoutePath,
10     VppMplsRoute,
11     VppMplsTable,
12     VppMplsLabel,
13 )
14 from vpp_mpls_tunnel_interface import VppMPLSTunnelInterface
15
16 from scapy.packet import Raw
17 from scapy.layers.l2 import Ether
18 from scapy.layers.inet import IP, UDP
19 from scapy.layers.inet6 import IPv6
20 from scapy.contrib.mpls import MPLS
21
22
23 def verify_filter(capture, sent):
24     if not len(capture) == len(sent):
25         # filter out any IPv6 RAs from the capture
26         for p in capture:
27             if p.haslayer(IPv6):
28                 capture.remove(p)
29     return capture
30
31
32 def verify_mpls_stack(tst, rx, mpls_labels):
33     # the rx'd packet has the MPLS label popped
34     eth = rx[Ether]
35     tst.assertEqual(eth.type, 0x8847)
36
37     rx_mpls = rx[MPLS]
38
39     for ii in range(len(mpls_labels)):
40         tst.assertEqual(rx_mpls.label, mpls_labels[ii].value)
41         tst.assertEqual(rx_mpls.cos, mpls_labels[ii].exp)
42         tst.assertEqual(rx_mpls.ttl, mpls_labels[ii].ttl)
43
44         if ii == len(mpls_labels) - 1:
45             tst.assertEqual(rx_mpls.s, 1)
46         else:
47             # not end of stack
48             tst.assertEqual(rx_mpls.s, 0)
49             # pop the label to expose the next
50             rx_mpls = rx_mpls[MPLS].payload
51
52
53 class TestSRMPLS(VppTestCase):
54     """SR-MPLS Test Case"""
55
56     @classmethod
57     def setUpClass(cls):
58         super(TestSRMPLS, cls).setUpClass()
59
60     @classmethod
61     def tearDownClass(cls):
62         super(TestSRMPLS, cls).tearDownClass()
63
64     def setUp(self):
65         super(TestSRMPLS, self).setUp()
66
67         # create 2 pg interfaces
68         self.create_pg_interfaces(range(4))
69
70         # setup both interfaces
71         # assign them different tables.
72         table_id = 0
73         self.tables = []
74
75         tbl = VppMplsTable(self, 0)
76         tbl.add_vpp_config()
77         self.tables.append(tbl)
78
79         for i in self.pg_interfaces:
80             i.admin_up()
81             i.config_ip4()
82             i.resolve_arp()
83             i.config_ip6()
84             i.resolve_ndp()
85             i.enable_mpls()
86
87     def tearDown(self):
88         for i in self.pg_interfaces:
89             i.unconfig_ip4()
90             i.unconfig_ip6()
91             i.disable_mpls()
92             i.admin_down()
93         super(TestSRMPLS, self).tearDown()
94
95     def create_stream_ip4(self, src_if, dst_ip, ip_ttl=64, ip_dscp=0):
96         self.reset_packet_infos()
97         pkts = []
98         for i in range(0, 257):
99             info = self.create_packet_info(src_if, src_if)
100             payload = self.info_to_payload(info)
101             p = (
102                 Ether(dst=src_if.local_mac, src=src_if.remote_mac)
103                 / IP(src=src_if.remote_ip4, dst=dst_ip, ttl=ip_ttl, tos=ip_dscp)
104                 / UDP(sport=1234, dport=1234)
105                 / Raw(payload)
106             )
107             info.data = p.copy()
108             pkts.append(p)
109         return pkts
110
111     def verify_capture_labelled_ip4(
112         self, src_if, capture, sent, mpls_labels, ip_ttl=None
113     ):
114         try:
115             capture = verify_filter(capture, sent)
116
117             self.assertEqual(len(capture), len(sent))
118
119             for i in range(len(capture)):
120                 tx = sent[i]
121                 rx = capture[i]
122                 tx_ip = tx[IP]
123                 rx_ip = rx[IP]
124
125                 verify_mpls_stack(self, rx, mpls_labels)
126
127                 self.assertEqual(rx_ip.src, tx_ip.src)
128                 self.assertEqual(rx_ip.dst, tx_ip.dst)
129                 if not ip_ttl:
130                     # IP processing post pop has decremented the TTL
131                     self.assertEqual(rx_ip.ttl + 1, tx_ip.ttl)
132                 else:
133                     self.assertEqual(rx_ip.ttl, ip_ttl)
134
135         except:
136             raise
137
138     def verify_capture_tunneled_ip4(self, src_if, capture, sent, mpls_labels):
139         try:
140             capture = verify_filter(capture, sent)
141
142             self.assertEqual(len(capture), len(sent))
143
144             for i in range(len(capture)):
145                 tx = sent[i]
146                 rx = capture[i]
147                 tx_ip = tx[IP]
148                 rx_ip = rx[IP]
149
150                 verify_mpls_stack(self, rx, mpls_labels)
151
152                 self.assertEqual(rx_ip.src, tx_ip.src)
153                 self.assertEqual(rx_ip.dst, tx_ip.dst)
154                 # IP processing post pop has decremented the TTL
155                 self.assertEqual(rx_ip.ttl + 1, tx_ip.ttl)
156
157         except:
158             raise
159
160     def test_sr_mpls(self):
161         """SR MPLS"""
162
163         #
164         # A simple MPLS xconnect - neos label in label out
165         #
166         route_32_eos = VppMplsRoute(
167             self,
168             32,
169             0,
170             [
171                 VppRoutePath(
172                     self.pg0.remote_ip4, self.pg0.sw_if_index, labels=[VppMplsLabel(32)]
173                 )
174             ],
175         )
176         route_32_eos.add_vpp_config()
177
178         #
179         # A binding SID with only one label
180         #
181         self.vapi.sr_mpls_policy_add(999, 1, 0, [32])
182
183         #
184         # A labeled IP route that resolves thru the binding SID
185         #
186         ip_10_0_0_1 = VppIpRoute(
187             self,
188             "10.0.0.1",
189             32,
190             [
191                 VppRoutePath(
192                     "0.0.0.0", 0xFFFFFFFF, nh_via_label=999, labels=[VppMplsLabel(55)]
193                 )
194             ],
195         )
196         ip_10_0_0_1.add_vpp_config()
197
198         tx = self.create_stream_ip4(self.pg1, "10.0.0.1")
199         rx = self.send_and_expect(self.pg1, tx, self.pg0)
200         self.verify_capture_labelled_ip4(
201             self.pg0, rx, tx, [VppMplsLabel(32), VppMplsLabel(55)]
202         )
203
204         #
205         # An unlabeled IP route that resolves thru the binding SID
206         #
207         ip_10_0_0_1 = VppIpRoute(
208             self,
209             "10.0.0.2",
210             32,
211             [VppRoutePath("0.0.0.0", 0xFFFFFFFF, nh_via_label=999)],
212         )
213         ip_10_0_0_1.add_vpp_config()
214
215         tx = self.create_stream_ip4(self.pg1, "10.0.0.2")
216         rx = self.send_and_expect(self.pg1, tx, self.pg0)
217         self.verify_capture_labelled_ip4(self.pg0, rx, tx, [VppMplsLabel(32)])
218
219         self.vapi.sr_mpls_policy_del(999)
220
221         #
222         # this time the SID has many labels pushed
223         #
224         self.vapi.sr_mpls_policy_add(999, 1, 0, [32, 33, 34])
225
226         tx = self.create_stream_ip4(self.pg1, "10.0.0.1")
227         rx = self.send_and_expect(self.pg1, tx, self.pg0)
228         self.verify_capture_labelled_ip4(
229             self.pg0,
230             rx,
231             tx,
232             [VppMplsLabel(32), VppMplsLabel(33), VppMplsLabel(34), VppMplsLabel(55)],
233         )
234         tx = self.create_stream_ip4(self.pg1, "10.0.0.2")
235         rx = self.send_and_expect(self.pg1, tx, self.pg0)
236         self.verify_capture_labelled_ip4(
237             self.pg0, rx, tx, [VppMplsLabel(32), VppMplsLabel(33), VppMplsLabel(34)]
238         )
239
240         #
241         # Resolve an MPLS tunnel via the SID
242         #
243         mpls_tun = VppMPLSTunnelInterface(
244             self,
245             [
246                 VppRoutePath(
247                     "0.0.0.0",
248                     0xFFFFFFFF,
249                     nh_via_label=999,
250                     labels=[VppMplsLabel(44), VppMplsLabel(46)],
251                 )
252             ],
253         )
254         mpls_tun.add_vpp_config()
255         mpls_tun.admin_up()
256
257         #
258         # add an unlabelled route through the new tunnel
259         #
260         route_10_0_0_3 = VppIpRoute(
261             self, "10.0.0.3", 32, [VppRoutePath("0.0.0.0", mpls_tun._sw_if_index)]
262         )
263         route_10_0_0_3.add_vpp_config()
264         self.logger.info(self.vapi.cli("sh mpls tun 0"))
265         self.logger.info(self.vapi.cli("sh adj 21"))
266
267         tx = self.create_stream_ip4(self.pg1, "10.0.0.3")
268         rx = self.send_and_expect(self.pg1, tx, self.pg0)
269         self.verify_capture_tunneled_ip4(
270             self.pg0,
271             rx,
272             tx,
273             [
274                 VppMplsLabel(32),
275                 VppMplsLabel(33),
276                 VppMplsLabel(34),
277                 VppMplsLabel(44),
278                 VppMplsLabel(46),
279             ],
280         )
281
282         #
283         # add a labelled route through the new tunnel
284         #
285         route_10_0_0_3 = VppIpRoute(
286             self,
287             "10.0.0.4",
288             32,
289             [VppRoutePath("0.0.0.0", mpls_tun._sw_if_index, labels=[VppMplsLabel(55)])],
290         )
291         route_10_0_0_3.add_vpp_config()
292
293         tx = self.create_stream_ip4(self.pg1, "10.0.0.4")
294         rx = self.send_and_expect(self.pg1, tx, self.pg0)
295         self.verify_capture_tunneled_ip4(
296             self.pg0,
297             rx,
298             tx,
299             [
300                 VppMplsLabel(32),
301                 VppMplsLabel(33),
302                 VppMplsLabel(34),
303                 VppMplsLabel(44),
304                 VppMplsLabel(46),
305                 VppMplsLabel(55),
306             ],
307         )
308
309         self.vapi.sr_mpls_policy_del(999)
310
311
312 if __name__ == "__main__":
313     unittest.main(testRunner=VppTestRunner)