tests: replace pycodestyle with black
[vpp.git] / test / test_srmpls.py
1 #!/usr/bin/env python3
2
3 import unittest
4 import socket
5
6 from framework import VppTestCase, VppTestRunner
7 from vpp_ip import DpoProto
8 from vpp_ip_route import (
9     VppIpRoute,
10     VppRoutePath,
11     VppMplsRoute,
12     VppIpTable,
13     VppMplsTable,
14     VppMplsLabel,
15 )
16 from vpp_mpls_tunnel_interface import VppMPLSTunnelInterface
17
18 from scapy.packet import Raw
19 from scapy.layers.l2 import Ether
20 from scapy.layers.inet import IP, UDP, ICMP
21 from scapy.layers.inet6 import IPv6, ICMPv6TimeExceeded
22 from scapy.contrib.mpls import MPLS
23
24
25 def verify_filter(capture, sent):
26     if not len(capture) == len(sent):
27         # filter out any IPv6 RAs from the capture
28         for p in capture:
29             if p.haslayer(IPv6):
30                 capture.remove(p)
31     return capture
32
33
34 def verify_mpls_stack(tst, rx, mpls_labels):
35     # the rx'd packet has the MPLS label popped
36     eth = rx[Ether]
37     tst.assertEqual(eth.type, 0x8847)
38
39     rx_mpls = rx[MPLS]
40
41     for ii in range(len(mpls_labels)):
42         tst.assertEqual(rx_mpls.label, mpls_labels[ii].value)
43         tst.assertEqual(rx_mpls.cos, mpls_labels[ii].exp)
44         tst.assertEqual(rx_mpls.ttl, mpls_labels[ii].ttl)
45
46         if ii == len(mpls_labels) - 1:
47             tst.assertEqual(rx_mpls.s, 1)
48         else:
49             # not end of stack
50             tst.assertEqual(rx_mpls.s, 0)
51             # pop the label to expose the next
52             rx_mpls = rx_mpls[MPLS].payload
53
54
55 class TestSRMPLS(VppTestCase):
56     """SR-MPLS Test Case"""
57
58     @classmethod
59     def setUpClass(cls):
60         super(TestSRMPLS, cls).setUpClass()
61
62     @classmethod
63     def tearDownClass(cls):
64         super(TestSRMPLS, cls).tearDownClass()
65
66     def setUp(self):
67         super(TestSRMPLS, self).setUp()
68
69         # create 2 pg interfaces
70         self.create_pg_interfaces(range(4))
71
72         # setup both interfaces
73         # assign them different tables.
74         table_id = 0
75         self.tables = []
76
77         tbl = VppMplsTable(self, 0)
78         tbl.add_vpp_config()
79         self.tables.append(tbl)
80
81         for i in self.pg_interfaces:
82             i.admin_up()
83             i.config_ip4()
84             i.resolve_arp()
85             i.config_ip6()
86             i.resolve_ndp()
87             i.enable_mpls()
88
89     def tearDown(self):
90         for i in self.pg_interfaces:
91             i.unconfig_ip4()
92             i.unconfig_ip6()
93             i.disable_mpls()
94             i.admin_down()
95         super(TestSRMPLS, self).tearDown()
96
97     def create_stream_ip4(self, src_if, dst_ip, ip_ttl=64, ip_dscp=0):
98         self.reset_packet_infos()
99         pkts = []
100         for i in range(0, 257):
101             info = self.create_packet_info(src_if, src_if)
102             payload = self.info_to_payload(info)
103             p = (
104                 Ether(dst=src_if.local_mac, src=src_if.remote_mac)
105                 / IP(src=src_if.remote_ip4, dst=dst_ip, ttl=ip_ttl, tos=ip_dscp)
106                 / UDP(sport=1234, dport=1234)
107                 / Raw(payload)
108             )
109             info.data = p.copy()
110             pkts.append(p)
111         return pkts
112
113     def verify_capture_labelled_ip4(
114         self, src_if, capture, sent, mpls_labels, ip_ttl=None
115     ):
116         try:
117             capture = verify_filter(capture, sent)
118
119             self.assertEqual(len(capture), len(sent))
120
121             for i in range(len(capture)):
122                 tx = sent[i]
123                 rx = capture[i]
124                 tx_ip = tx[IP]
125                 rx_ip = rx[IP]
126
127                 verify_mpls_stack(self, rx, mpls_labels)
128
129                 self.assertEqual(rx_ip.src, tx_ip.src)
130                 self.assertEqual(rx_ip.dst, tx_ip.dst)
131                 if not ip_ttl:
132                     # IP processing post pop has decremented the TTL
133                     self.assertEqual(rx_ip.ttl + 1, tx_ip.ttl)
134                 else:
135                     self.assertEqual(rx_ip.ttl, ip_ttl)
136
137         except:
138             raise
139
140     def verify_capture_tunneled_ip4(self, src_if, capture, sent, mpls_labels):
141         try:
142             capture = verify_filter(capture, sent)
143
144             self.assertEqual(len(capture), len(sent))
145
146             for i in range(len(capture)):
147                 tx = sent[i]
148                 rx = capture[i]
149                 tx_ip = tx[IP]
150                 rx_ip = rx[IP]
151
152                 verify_mpls_stack(self, rx, mpls_labels)
153
154                 self.assertEqual(rx_ip.src, tx_ip.src)
155                 self.assertEqual(rx_ip.dst, tx_ip.dst)
156                 # IP processing post pop has decremented the TTL
157                 self.assertEqual(rx_ip.ttl + 1, tx_ip.ttl)
158
159         except:
160             raise
161
162     def test_sr_mpls(self):
163         """SR MPLS"""
164
165         #
166         # A simple MPLS xconnect - neos label in label out
167         #
168         route_32_eos = VppMplsRoute(
169             self,
170             32,
171             0,
172             [
173                 VppRoutePath(
174                     self.pg0.remote_ip4, self.pg0.sw_if_index, labels=[VppMplsLabel(32)]
175                 )
176             ],
177         )
178         route_32_eos.add_vpp_config()
179
180         #
181         # A binding SID with only one label
182         #
183         self.vapi.sr_mpls_policy_add(999, 1, 0, [32])
184
185         #
186         # A labeled IP route that resolves thru the binding SID
187         #
188         ip_10_0_0_1 = VppIpRoute(
189             self,
190             "10.0.0.1",
191             32,
192             [
193                 VppRoutePath(
194                     "0.0.0.0", 0xFFFFFFFF, nh_via_label=999, labels=[VppMplsLabel(55)]
195                 )
196             ],
197         )
198         ip_10_0_0_1.add_vpp_config()
199
200         tx = self.create_stream_ip4(self.pg1, "10.0.0.1")
201         rx = self.send_and_expect(self.pg1, tx, self.pg0)
202         self.verify_capture_labelled_ip4(
203             self.pg0, rx, tx, [VppMplsLabel(32), VppMplsLabel(55)]
204         )
205
206         #
207         # An unlabeled IP route that resolves thru the binding SID
208         #
209         ip_10_0_0_1 = VppIpRoute(
210             self,
211             "10.0.0.2",
212             32,
213             [VppRoutePath("0.0.0.0", 0xFFFFFFFF, nh_via_label=999)],
214         )
215         ip_10_0_0_1.add_vpp_config()
216
217         tx = self.create_stream_ip4(self.pg1, "10.0.0.2")
218         rx = self.send_and_expect(self.pg1, tx, self.pg0)
219         self.verify_capture_labelled_ip4(self.pg0, rx, tx, [VppMplsLabel(32)])
220
221         self.vapi.sr_mpls_policy_del(999)
222
223         #
224         # this time the SID has many labels pushed
225         #
226         self.vapi.sr_mpls_policy_add(999, 1, 0, [32, 33, 34])
227
228         tx = self.create_stream_ip4(self.pg1, "10.0.0.1")
229         rx = self.send_and_expect(self.pg1, tx, self.pg0)
230         self.verify_capture_labelled_ip4(
231             self.pg0,
232             rx,
233             tx,
234             [VppMplsLabel(32), VppMplsLabel(33), VppMplsLabel(34), VppMplsLabel(55)],
235         )
236         tx = self.create_stream_ip4(self.pg1, "10.0.0.2")
237         rx = self.send_and_expect(self.pg1, tx, self.pg0)
238         self.verify_capture_labelled_ip4(
239             self.pg0, rx, tx, [VppMplsLabel(32), VppMplsLabel(33), VppMplsLabel(34)]
240         )
241
242         #
243         # Resolve an MPLS tunnel via the SID
244         #
245         mpls_tun = VppMPLSTunnelInterface(
246             self,
247             [
248                 VppRoutePath(
249                     "0.0.0.0",
250                     0xFFFFFFFF,
251                     nh_via_label=999,
252                     labels=[VppMplsLabel(44), VppMplsLabel(46)],
253                 )
254             ],
255         )
256         mpls_tun.add_vpp_config()
257         mpls_tun.admin_up()
258
259         #
260         # add an unlabelled route through the new tunnel
261         #
262         route_10_0_0_3 = VppIpRoute(
263             self, "10.0.0.3", 32, [VppRoutePath("0.0.0.0", mpls_tun._sw_if_index)]
264         )
265         route_10_0_0_3.add_vpp_config()
266         self.logger.info(self.vapi.cli("sh mpls tun 0"))
267         self.logger.info(self.vapi.cli("sh adj 21"))
268
269         tx = self.create_stream_ip4(self.pg1, "10.0.0.3")
270         rx = self.send_and_expect(self.pg1, tx, self.pg0)
271         self.verify_capture_tunneled_ip4(
272             self.pg0,
273             rx,
274             tx,
275             [
276                 VppMplsLabel(32),
277                 VppMplsLabel(33),
278                 VppMplsLabel(34),
279                 VppMplsLabel(44),
280                 VppMplsLabel(46),
281             ],
282         )
283
284         #
285         # add a labelled route through the new tunnel
286         #
287         route_10_0_0_3 = VppIpRoute(
288             self,
289             "10.0.0.4",
290             32,
291             [VppRoutePath("0.0.0.0", mpls_tun._sw_if_index, labels=[VppMplsLabel(55)])],
292         )
293         route_10_0_0_3.add_vpp_config()
294
295         tx = self.create_stream_ip4(self.pg1, "10.0.0.4")
296         rx = self.send_and_expect(self.pg1, tx, self.pg0)
297         self.verify_capture_tunneled_ip4(
298             self.pg0,
299             rx,
300             tx,
301             [
302                 VppMplsLabel(32),
303                 VppMplsLabel(33),
304                 VppMplsLabel(34),
305                 VppMplsLabel(44),
306                 VppMplsLabel(46),
307                 VppMplsLabel(55),
308             ],
309         )
310
311         self.vapi.sr_mpls_policy_del(999)
312
313
314 if __name__ == "__main__":
315     unittest.main(testRunner=VppTestRunner)