tests: Use errno value rather than a specific int
[vpp.git] / test / test_srv6_mobile.py
1 #!/usr/bin/env python3
2
3 from framework import VppTestCase
4 from ipaddress import IPv4Address
5 from ipaddress import IPv6Address
6 from vpp_ip_route import VppIpRoute, VppRoutePath, FibPathProto, VppIpTable
7
8 from vpp_srv6_mobile import (
9     SRv6MobileNhtype,
10     VppSRv6MobilePolicy,
11     VppSRv6MobileLocalSID,
12 )
13
14 from scapy.contrib.gtp import *
15 from scapy.all import *
16
17
18 class TestSRv6EndMGTP4E(VppTestCase):
19     """SRv6 End.M.GTP4.E (SRv6 -> GTP-U)"""
20
21     @classmethod
22     def setUpClass(cls):
23         super(TestSRv6EndMGTP4E, cls).setUpClass()
24         try:
25             cls.create_pg_interfaces(range(2))
26             cls.pg_if_i = cls.pg_interfaces[0]
27             cls.pg_if_o = cls.pg_interfaces[1]
28
29             cls.pg_if_i.config_ip6()
30             cls.pg_if_o.config_ip4()
31
32             cls.ip4_dst = cls.pg_if_o.remote_ip4
33             # cls.ip4_src = cls.pg_if_o.local_ip4
34             cls.ip4_src = "192.168.192.10"
35
36             for pg_if in cls.pg_interfaces:
37                 pg_if.admin_up()
38                 pg_if.resolve_arp()
39
40         except Exception:
41             super(TestSRv6EndMGTP4E, cls).tearDownClass()
42             raise
43
44     def create_packets(self, inner):
45         ip4_dst = IPv4Address(str(self.ip4_dst))
46         # 32bit prefix + 32bit IPv4 DA + 8bit + 32bit TEID + 24bit
47         dst = b"\xaa" * 4 + ip4_dst.packed + b"\x11" + b"\xbb" * 4 + b"\x11" * 3
48         ip6_dst = IPv6Address(dst)
49
50         ip4_src = IPv4Address(str(self.ip4_src))
51         # 64bit prefix + 32bit IPv4 SA + 16 bit port + 16bit
52         src = b"\xcc" * 8 + ip4_src.packed + b"\xdd" * 2 + b"\x11" * 2
53         ip6_src = IPv6Address(src)
54
55         self.logger.info("ip4 dst: {}".format(ip4_dst))
56         self.logger.info("ip4 src: {}".format(ip4_src))
57         self.logger.info("ip6 dst (remote srgw): {}".format(ip6_dst))
58         self.logger.info("ip6 src (local  srgw): {}".format(ip6_src))
59
60         pkts = list()
61         for d, s in inner:
62             pkt = (
63                 Ether()
64                 / IPv6(dst=str(ip6_dst), src=str(ip6_src))
65                 / IPv6ExtHdrSegmentRouting()
66                 / IPv6(dst=d, src=s)
67                 / UDP(sport=1000, dport=23)
68             )
69             self.logger.info(pkt.show2(dump=True))
70             pkts.append(pkt)
71
72         return pkts
73
74     def test_srv6_mobile(self):
75         """test_srv6_mobile"""
76         pkts = self.create_packets([("A::1", "B::1"), ("C::1", "D::1")])
77
78         # "sr localsid address {} behavior end.m.gtp4.e v4src_position 64 fib-table 0"
79         # ".format(pkts[0]["IPv6"].dst)
80         localsid = VppSRv6MobileLocalSID(
81             self,
82             # address params case is length 0
83             localsid_prefix="{}/{}".format(pkts[0]["IPv6"].dst, 0),
84             behavior="end.m.gtp4.e",
85             v4src_position=64,
86             fib_table=0,
87         )
88         localsid.add_vpp_config()
89
90         # log the localsids
91         self.logger.info(self.vapi.cli("show sr localsid"))
92
93         self.vapi.cli("clear errors")
94
95         self.pg0.add_stream(pkts)
96         self.pg_enable_capture(self.pg_interfaces)
97         self.pg_start()
98
99         self.logger.info(self.vapi.cli("show errors"))
100         self.logger.info(self.vapi.cli("show int address"))
101
102         capture = self.pg1.get_capture(len(pkts))
103
104         for pkt in capture:
105             self.logger.info(pkt.show2(dump=True))
106             self.assertEqual(pkt[IP].dst, self.ip4_dst)
107             self.assertEqual(pkt[IP].src, self.ip4_src)
108             self.assertEqual(pkt[GTP_U_Header].teid, 0xBBBBBBBB)
109
110
111 class TestSRv6TMGTP4D(VppTestCase):
112     """SRv6 T.M.GTP4.D (GTP-U -> SRv6)"""
113
114     @classmethod
115     def setUpClass(cls):
116         super(TestSRv6TMGTP4D, cls).setUpClass()
117         try:
118             cls.create_pg_interfaces(range(2))
119             cls.pg_if_i = cls.pg_interfaces[0]
120             cls.pg_if_o = cls.pg_interfaces[1]
121
122             cls.pg_if_i.config_ip4()
123             cls.pg_if_i.config_ip6()
124             cls.pg_if_o.config_ip4()
125             cls.pg_if_o.config_ip6()
126
127             cls.ip4_dst = "1.1.1.1"
128             cls.ip4_src = "2.2.2.2"
129
130             cls.ip6_dst = cls.pg_if_o.remote_ip6
131
132             for pg_if in cls.pg_interfaces:
133                 pg_if.admin_up()
134                 pg_if.resolve_arp()
135                 pg_if.resolve_ndp(timeout=5)
136
137         except Exception:
138             super(TestSRv6TMGTP4D, cls).tearDownClass()
139             raise
140
141     def create_packets(self, inner):
142         ip4_dst = IPv4Address(str(self.ip4_dst))
143
144         ip4_src = IPv4Address(str(self.ip4_src))
145
146         self.logger.info("ip4 dst: {}".format(ip4_dst))
147         self.logger.info("ip4 src: {}".format(ip4_src))
148
149         pkts = list()
150         for d, s in inner:
151             pkt = (
152                 Ether()
153                 / IP(dst=str(ip4_dst), src=str(ip4_src))
154                 / UDP(sport=2152, dport=2152)
155                 / GTP_U_Header(gtp_type="g_pdu", teid=200)
156                 / IPv6(dst=d, src=s)
157                 / UDP(sport=1000, dport=23)
158             )
159             self.logger.info(pkt.show2(dump=True))
160             pkts.append(pkt)
161
162         return pkts
163
164     def test_srv6_mobile(self):
165         """test_srv6_mobile"""
166         pkts = self.create_packets([("A::1", "B::1"), ("C::1", "D::1")])
167
168         self.vapi.cli("set sr encaps source addr A1::1")
169         self.vapi.cli("sr policy add bsid D4:: next D2:: next D3::")
170
171         # sr policy add bsid D5:: behavior t.m.gtp4.d D4::/32 v6src_prefix C1::/64 nhtype ipv6 fib-table 0 drop-in
172         policy = VppSRv6MobilePolicy(
173             self,
174             bsid_addr="D5::",
175             behavior="t.m.gtp4.d",
176             sr_prefix="{}/{}".format("D4::", 32),
177             v6src_prefix="{}/{}".format("C1::", 64),
178             nhtype=SRv6MobileNhtype.SRV6_NHTYPE_API_IPV6,
179             fib_table=0,
180             drop_in=1,
181         )
182         policy.add_vpp_config()
183
184         self.vapi.cli("sr steer l3 {}/32 via bsid D5::".format(self.ip4_dst))
185
186         # "ip route add D2::/32 via {}".format(self.ip6_dst)
187         route = VppIpRoute(
188             self, "D2::", 32, [VppRoutePath(self.ip6_dst, self.pg1.sw_if_index)]
189         )
190         route.add_vpp_config()
191         self.logger.info(self.vapi.cli("show ip6 fib"))
192         self.logger.info(self.vapi.cli("show sr steer"))
193         self.logger.info(self.vapi.cli("show sr policies"))
194
195         self.vapi.cli("clear errors")
196
197         self.pg0.add_stream(pkts)
198         self.pg_enable_capture(self.pg_interfaces)
199         self.pg_start()
200
201         self.logger.info(self.vapi.cli("show errors"))
202         self.logger.info(self.vapi.cli("show int address"))
203
204         capture = self.pg1.get_capture(len(pkts))
205
206         for pkt in capture:
207             self.logger.info(pkt.show2(dump=True))
208             self.logger.info(
209                 "GTP4.D Address={}".format(
210                     str(pkt[IPv6ExtHdrSegmentRouting].addresses[0])
211                 )
212             )
213             self.assertEqual(
214                 str(pkt[IPv6ExtHdrSegmentRouting].addresses[0]), "d4:0:101:101::c800:0"
215             )
216
217
218 class TestSRv6EndMGTP6E(VppTestCase):
219     """SRv6 End.M.GTP6.E"""
220
221     @classmethod
222     def setUpClass(cls):
223         super(TestSRv6EndMGTP6E, cls).setUpClass()
224         try:
225             cls.create_pg_interfaces(range(2))
226             cls.pg_if_i = cls.pg_interfaces[0]
227             cls.pg_if_o = cls.pg_interfaces[1]
228
229             cls.pg_if_i.config_ip6()
230             cls.pg_if_o.config_ip6()
231
232             cls.ip6_nhop = cls.pg_if_o.remote_ip6
233
234             for pg_if in cls.pg_interfaces:
235                 pg_if.admin_up()
236                 pg_if.resolve_ndp(timeout=5)
237
238         except Exception:
239             super(TestSRv6EndMGTP6E, cls).tearDownClass()
240             raise
241
242     def create_packets(self, inner):
243         # 64bit prefix + 8bit QFI + 32bit TEID + 24bit
244         dst = b"\xaa" * 8 + b"\x00" + b"\xbb" * 4 + b"\x00" * 3
245         ip6_dst = IPv6Address(dst)
246
247         self.ip6_dst = ip6_dst
248
249         src = b"\xcc" * 8 + b"\xdd" * 4 + b"\x11" * 4
250         ip6_src = IPv6Address(src)
251
252         self.ip6_src = ip6_src
253
254         pkts = list()
255         for d, s in inner:
256             pkt = (
257                 Ether()
258                 / IPv6(dst=str(ip6_dst), src=str(ip6_src))
259                 / IPv6ExtHdrSegmentRouting(
260                     segleft=1, lastentry=0, tag=0, addresses=["a1::1"]
261                 )
262                 / IPv6(dst=d, src=s)
263                 / UDP(sport=1000, dport=23)
264             )
265             self.logger.info(pkt.show2(dump=True))
266             pkts.append(pkt)
267
268         return pkts
269
270     def test_srv6_mobile(self):
271         """test_srv6_mobile"""
272         pkts = self.create_packets([("A::1", "B::1"), ("C::1", "D::1")])
273
274         # "sr localsid prefix {}/64 behavior end.m.gtp6.e fib-table 0"
275         # .format(pkts[0]["IPv6"].dst)
276         localsid = VppSRv6MobileLocalSID(
277             self,
278             localsid_prefix="{}/{}".format(pkts[0]["IPv6"].dst, 64),
279             behavior="end.m.gtp6.e",
280             fib_table=0,
281         )
282         localsid.add_vpp_config()
283
284         # "ip route add a1::/64 via {}".format(self.ip6_nhop)
285         route = VppIpRoute(
286             self, "a1::", 64, [VppRoutePath(self.ip6_nhop, self.pg1.sw_if_index)]
287         )
288         route.add_vpp_config()
289         self.logger.info(self.vapi.cli("show sr localsid"))
290
291         self.vapi.cli("clear errors")
292
293         self.pg0.add_stream(pkts)
294         self.pg_enable_capture(self.pg_interfaces)
295         self.pg_start()
296
297         self.logger.info(self.vapi.cli("show errors"))
298         self.logger.info(self.vapi.cli("show int address"))
299
300         capture = self.pg1.get_capture(len(pkts))
301
302         for pkt in capture:
303             self.logger.info(pkt.show2(dump=True))
304             self.assertEqual(pkt[IPv6].dst, "a1::1")
305             self.assertEqual(pkt[IPv6].src, str(self.ip6_src))
306             self.assertEqual(pkt[GTP_U_Header].teid, 0xBBBBBBBB)
307
308
309 class TestSRv6EndMGTP6D(VppTestCase):
310     """SRv6 End.M.GTP6.D"""
311
312     @classmethod
313     def setUpClass(cls):
314         super(TestSRv6EndMGTP6D, cls).setUpClass()
315         try:
316             cls.create_pg_interfaces(range(2))
317             cls.pg_if_i = cls.pg_interfaces[0]
318             cls.pg_if_o = cls.pg_interfaces[1]
319
320             cls.pg_if_i.config_ip6()
321             cls.pg_if_o.config_ip6()
322
323             cls.ip6_nhop = cls.pg_if_o.remote_ip6
324
325             cls.ip6_dst = "2001::1"
326             cls.ip6_src = "2002::1"
327
328             for pg_if in cls.pg_interfaces:
329                 pg_if.admin_up()
330                 pg_if.resolve_ndp(timeout=5)
331
332         except Exception:
333             super(TestSRv6EndMGTP6D, cls).tearDownClass()
334             raise
335
336     def create_packets(self, inner):
337         ip6_dst = IPv6Address(str(self.ip6_dst))
338
339         ip6_src = IPv6Address(str(self.ip6_src))
340
341         self.logger.info("ip6 dst: {}".format(ip6_dst))
342         self.logger.info("ip6 src: {}".format(ip6_src))
343
344         pkts = list()
345         for d, s in inner:
346             pkt = (
347                 Ether()
348                 / IPv6(dst=str(ip6_dst), src=str(ip6_src))
349                 / UDP(sport=2152, dport=2152)
350                 / GTP_U_Header(gtp_type="g_pdu", teid=200)
351                 / IPv6(dst=d, src=s)
352                 / UDP(sport=1000, dport=23)
353             )
354             self.logger.info(pkt.show2(dump=True))
355             pkts.append(pkt)
356
357         return pkts
358
359     def test_srv6_mobile(self):
360         """test_srv6_mobile"""
361         pkts = self.create_packets([("A::1", "B::1"), ("C::1", "D::1")])
362
363         self.vapi.cli("set sr encaps source addr A1::1")
364         self.vapi.cli("sr policy add bsid D4:: next D2:: next D3::")
365
366         # "sr localsid prefix 2001::/64 behavior end.m.gtp6.d 4::/64 fib-table 0 drop-in"
367         # .format(self.ip6_nhop)
368         localsid = VppSRv6MobileLocalSID(
369             self,
370             localsid_prefix="{}/{}".format("2001::", 64),
371             behavior="end.m.gtp6.d",
372             fib_table=0,
373             drop_in=1,
374             sr_prefix="{}/{}".format("D4::", 64),
375         )
376         localsid.add_vpp_config()
377
378         # "ip route add D2::/64 via {}"
379         # .format(self.ip6_nhop))
380         route = VppIpRoute(
381             self, "D2::", 64, [VppRoutePath(self.ip6_nhop, self.pg1.sw_if_index)]
382         )
383         route.add_vpp_config()
384         self.logger.info(self.vapi.cli("show sr policies"))
385         self.logger.info(self.vapi.cli("show sr localsid"))
386
387         self.vapi.cli("clear errors")
388
389         self.pg0.add_stream(pkts)
390         self.pg_enable_capture(self.pg_interfaces)
391         self.pg_start()
392
393         self.logger.info(self.vapi.cli("show errors"))
394         self.logger.info(self.vapi.cli("show int address"))
395
396         capture = self.pg1.get_capture(len(pkts))
397
398         for pkt in capture:
399             self.logger.info(pkt.show2(dump=True))
400             self.logger.info(
401                 "GTP6.D SID0={}".format(str(pkt[IPv6ExtHdrSegmentRouting].addresses[0]))
402             )
403             self.logger.info(
404                 "GTP6.D SID1={}".format(str(pkt[IPv6ExtHdrSegmentRouting].addresses[1]))
405             )
406             self.assertEqual(str(pkt[IPv6ExtHdrSegmentRouting].addresses[0]), "2001::1")
407             self.assertEqual(
408                 str(pkt[IPv6ExtHdrSegmentRouting].addresses[1]), "d4::c800:0"
409             )