tests: replace pycodestyle with black
[vpp.git] / test / test_srv6_mobile.py
1 #!/usr/bin/env python3
2
3 from framework import VppTestCase
4 from ipaddress import IPv4Address
5 from ipaddress import IPv6Address
6 from scapy.contrib.gtp import *
7 from scapy.all import *
8
9
10 class TestSRv6EndMGTP4E(VppTestCase):
11     """SRv6 End.M.GTP4.E (SRv6 -> GTP-U)"""
12
13     @classmethod
14     def setUpClass(cls):
15         super(TestSRv6EndMGTP4E, cls).setUpClass()
16         try:
17             cls.create_pg_interfaces(range(2))
18             cls.pg_if_i = cls.pg_interfaces[0]
19             cls.pg_if_o = cls.pg_interfaces[1]
20
21             cls.pg_if_i.config_ip6()
22             cls.pg_if_o.config_ip4()
23
24             cls.ip4_dst = cls.pg_if_o.remote_ip4
25             # cls.ip4_src = cls.pg_if_o.local_ip4
26             cls.ip4_src = "192.168.192.10"
27
28             for pg_if in cls.pg_interfaces:
29                 pg_if.admin_up()
30                 pg_if.resolve_arp()
31
32         except Exception:
33             super(TestSRv6EndMGTP4E, cls).tearDownClass()
34             raise
35
36     def create_packets(self, inner):
37
38         ip4_dst = IPv4Address(str(self.ip4_dst))
39         # 32bit prefix + 32bit IPv4 DA + 8bit + 32bit TEID + 24bit
40         dst = b"\xaa" * 4 + ip4_dst.packed + b"\x11" + b"\xbb" * 4 + b"\x11" * 3
41         ip6_dst = IPv6Address(dst)
42
43         ip4_src = IPv4Address(str(self.ip4_src))
44         # 64bit prefix + 32bit IPv4 SA + 16 bit port + 16bit
45         src = b"\xcc" * 8 + ip4_src.packed + b"\xdd" * 2 + b"\x11" * 2
46         ip6_src = IPv6Address(src)
47
48         self.logger.info("ip4 dst: {}".format(ip4_dst))
49         self.logger.info("ip4 src: {}".format(ip4_src))
50         self.logger.info("ip6 dst (remote srgw): {}".format(ip6_dst))
51         self.logger.info("ip6 src (local  srgw): {}".format(ip6_src))
52
53         pkts = list()
54         for d, s in inner:
55             pkt = (
56                 Ether()
57                 / IPv6(dst=str(ip6_dst), src=str(ip6_src))
58                 / IPv6ExtHdrSegmentRouting()
59                 / IPv6(dst=d, src=s)
60                 / UDP(sport=1000, dport=23)
61             )
62             self.logger.info(pkt.show2(dump=True))
63             pkts.append(pkt)
64
65         return pkts
66
67     def test_srv6_mobile(self):
68         """test_srv6_mobile"""
69         pkts = self.create_packets([("A::1", "B::1"), ("C::1", "D::1")])
70
71         self.vapi.cli(
72             "sr localsid address {} behavior end.m.gtp4.e ".format(pkts[0]["IPv6"].dst)
73             + "v4src_position 64 fib-table 0"
74         )
75         self.logger.info(self.vapi.cli("show sr localsid"))
76
77         self.vapi.cli("clear errors")
78
79         self.pg0.add_stream(pkts)
80         self.pg_enable_capture(self.pg_interfaces)
81         self.pg_start()
82
83         self.logger.info(self.vapi.cli("show errors"))
84         self.logger.info(self.vapi.cli("show int address"))
85
86         capture = self.pg1.get_capture(len(pkts))
87
88         for pkt in capture:
89             self.logger.info(pkt.show2(dump=True))
90             self.assertEqual(pkt[IP].dst, self.ip4_dst)
91             self.assertEqual(pkt[IP].src, self.ip4_src)
92             self.assertEqual(pkt[GTP_U_Header].teid, 0xBBBBBBBB)
93
94
95 class TestSRv6TMGTP4D(VppTestCase):
96     """SRv6 T.M.GTP4.D (GTP-U -> SRv6)"""
97
98     @classmethod
99     def setUpClass(cls):
100         super(TestSRv6TMGTP4D, cls).setUpClass()
101         try:
102             cls.create_pg_interfaces(range(2))
103             cls.pg_if_i = cls.pg_interfaces[0]
104             cls.pg_if_o = cls.pg_interfaces[1]
105
106             cls.pg_if_i.config_ip4()
107             cls.pg_if_i.config_ip6()
108             cls.pg_if_o.config_ip4()
109             cls.pg_if_o.config_ip6()
110
111             cls.ip4_dst = "1.1.1.1"
112             cls.ip4_src = "2.2.2.2"
113
114             cls.ip6_dst = cls.pg_if_o.remote_ip6
115
116             for pg_if in cls.pg_interfaces:
117                 pg_if.admin_up()
118                 pg_if.resolve_arp()
119                 pg_if.resolve_ndp(timeout=5)
120
121         except Exception:
122             super(TestSRv6TMGTP4D, cls).tearDownClass()
123             raise
124
125     def create_packets(self, inner):
126
127         ip4_dst = IPv4Address(str(self.ip4_dst))
128
129         ip4_src = IPv4Address(str(self.ip4_src))
130
131         self.logger.info("ip4 dst: {}".format(ip4_dst))
132         self.logger.info("ip4 src: {}".format(ip4_src))
133
134         pkts = list()
135         for d, s in inner:
136             pkt = (
137                 Ether()
138                 / IP(dst=str(ip4_dst), src=str(ip4_src))
139                 / UDP(sport=2152, dport=2152)
140                 / GTP_U_Header(gtp_type="g_pdu", teid=200)
141                 / IPv6(dst=d, src=s)
142                 / UDP(sport=1000, dport=23)
143             )
144             self.logger.info(pkt.show2(dump=True))
145             pkts.append(pkt)
146
147         return pkts
148
149     def test_srv6_mobile(self):
150         """test_srv6_mobile"""
151         pkts = self.create_packets([("A::1", "B::1"), ("C::1", "D::1")])
152
153         self.vapi.cli("set sr encaps source addr A1::1")
154         self.vapi.cli("sr policy add bsid D4:: next D2:: next D3::")
155         self.vapi.cli(
156             "sr policy add bsid D5:: behavior t.m.gtp4.d D4::/32 "
157             + "v6src_prefix C1::/64 nhtype ipv6 fib-table 0 drop-in"
158         )
159         self.vapi.cli("sr steer l3 {}/32 via bsid D5::".format(self.ip4_dst))
160         self.vapi.cli("ip route add D2::/32 via {}".format(self.ip6_dst))
161
162         self.logger.info(self.vapi.cli("show sr steer"))
163         self.logger.info(self.vapi.cli("show sr policies"))
164
165         self.vapi.cli("clear errors")
166
167         self.pg0.add_stream(pkts)
168         self.pg_enable_capture(self.pg_interfaces)
169         self.pg_start()
170
171         self.logger.info(self.vapi.cli("show errors"))
172         self.logger.info(self.vapi.cli("show int address"))
173
174         capture = self.pg1.get_capture(len(pkts))
175
176         for pkt in capture:
177             self.logger.info(pkt.show2(dump=True))
178             self.logger.info(
179                 "GTP4.D Address={}".format(
180                     str(pkt[IPv6ExtHdrSegmentRouting].addresses[0])
181                 )
182             )
183             self.assertEqual(
184                 str(pkt[IPv6ExtHdrSegmentRouting].addresses[0]), "d4:0:101:101::c800:0"
185             )
186
187
188 class TestSRv6EndMGTP6E(VppTestCase):
189     """SRv6 End.M.GTP6.E"""
190
191     @classmethod
192     def setUpClass(cls):
193         super(TestSRv6EndMGTP6E, cls).setUpClass()
194         try:
195             cls.create_pg_interfaces(range(2))
196             cls.pg_if_i = cls.pg_interfaces[0]
197             cls.pg_if_o = cls.pg_interfaces[1]
198
199             cls.pg_if_i.config_ip6()
200             cls.pg_if_o.config_ip6()
201
202             cls.ip6_nhop = cls.pg_if_o.remote_ip6
203
204             for pg_if in cls.pg_interfaces:
205                 pg_if.admin_up()
206                 pg_if.resolve_ndp(timeout=5)
207
208         except Exception:
209             super(TestSRv6EndMGTP6E, cls).tearDownClass()
210             raise
211
212     def create_packets(self, inner):
213         # 64bit prefix + 8bit QFI + 32bit TEID + 24bit
214         dst = b"\xaa" * 8 + b"\x00" + b"\xbb" * 4 + b"\x00" * 3
215         ip6_dst = IPv6Address(dst)
216
217         self.ip6_dst = ip6_dst
218
219         src = b"\xcc" * 8 + b"\xdd" * 4 + b"\x11" * 4
220         ip6_src = IPv6Address(src)
221
222         self.ip6_src = ip6_src
223
224         pkts = list()
225         for d, s in inner:
226             pkt = (
227                 Ether()
228                 / IPv6(dst=str(ip6_dst), src=str(ip6_src))
229                 / IPv6ExtHdrSegmentRouting(
230                     segleft=1, lastentry=0, tag=0, addresses=["a1::1"]
231                 )
232                 / IPv6(dst=d, src=s)
233                 / UDP(sport=1000, dport=23)
234             )
235             self.logger.info(pkt.show2(dump=True))
236             pkts.append(pkt)
237
238         return pkts
239
240     def test_srv6_mobile(self):
241         """test_srv6_mobile"""
242         pkts = self.create_packets([("A::1", "B::1"), ("C::1", "D::1")])
243
244         self.vapi.cli(
245             "sr localsid prefix {}/64 behavior end.m.gtp6.e fib-table 0".format(
246                 pkts[0]["IPv6"].dst
247             )
248         )
249         self.vapi.cli("ip route add a1::/64 via {}".format(self.ip6_nhop))
250         self.logger.info(self.vapi.cli("show sr localsid"))
251
252         self.vapi.cli("clear errors")
253
254         self.pg0.add_stream(pkts)
255         self.pg_enable_capture(self.pg_interfaces)
256         self.pg_start()
257
258         self.logger.info(self.vapi.cli("show errors"))
259         self.logger.info(self.vapi.cli("show int address"))
260
261         capture = self.pg1.get_capture(len(pkts))
262
263         for pkt in capture:
264             self.logger.info(pkt.show2(dump=True))
265             self.assertEqual(pkt[IPv6].dst, "a1::1")
266             self.assertEqual(pkt[IPv6].src, str(self.ip6_src))
267             self.assertEqual(pkt[GTP_U_Header].teid, 0xBBBBBBBB)
268
269
270 class TestSRv6EndMGTP6D(VppTestCase):
271     """SRv6 End.M.GTP6.D"""
272
273     @classmethod
274     def setUpClass(cls):
275         super(TestSRv6EndMGTP6D, cls).setUpClass()
276         try:
277             cls.create_pg_interfaces(range(2))
278             cls.pg_if_i = cls.pg_interfaces[0]
279             cls.pg_if_o = cls.pg_interfaces[1]
280
281             cls.pg_if_i.config_ip6()
282             cls.pg_if_o.config_ip6()
283
284             cls.ip6_nhop = cls.pg_if_o.remote_ip6
285
286             cls.ip6_dst = "2001::1"
287             cls.ip6_src = "2002::1"
288
289             for pg_if in cls.pg_interfaces:
290                 pg_if.admin_up()
291                 pg_if.resolve_ndp(timeout=5)
292
293         except Exception:
294             super(TestSRv6EndMGTP6D, cls).tearDownClass()
295             raise
296
297     def create_packets(self, inner):
298
299         ip6_dst = IPv6Address(str(self.ip6_dst))
300
301         ip6_src = IPv6Address(str(self.ip6_src))
302
303         self.logger.info("ip6 dst: {}".format(ip6_dst))
304         self.logger.info("ip6 src: {}".format(ip6_src))
305
306         pkts = list()
307         for d, s in inner:
308             pkt = (
309                 Ether()
310                 / IPv6(dst=str(ip6_dst), src=str(ip6_src))
311                 / UDP(sport=2152, dport=2152)
312                 / GTP_U_Header(gtp_type="g_pdu", teid=200)
313                 / IPv6(dst=d, src=s)
314                 / UDP(sport=1000, dport=23)
315             )
316             self.logger.info(pkt.show2(dump=True))
317             pkts.append(pkt)
318
319         return pkts
320
321     def test_srv6_mobile(self):
322         """test_srv6_mobile"""
323         pkts = self.create_packets([("A::1", "B::1"), ("C::1", "D::1")])
324
325         self.vapi.cli("set sr encaps source addr A1::1")
326         self.vapi.cli("sr policy add bsid D4:: next D2:: next D3::")
327         self.vapi.cli(
328             "sr localsid prefix 2001::/64 behavior end.m.gtp6.d "
329             + "D4::/64 fib-table 0 drop-in"
330         )
331         self.vapi.cli("ip route add D2::/64 via {}".format(self.ip6_nhop))
332
333         self.logger.info(self.vapi.cli("show sr policies"))
334         self.logger.info(self.vapi.cli("show sr localsid"))
335
336         self.vapi.cli("clear errors")
337
338         self.pg0.add_stream(pkts)
339         self.pg_enable_capture(self.pg_interfaces)
340         self.pg_start()
341
342         self.logger.info(self.vapi.cli("show errors"))
343         self.logger.info(self.vapi.cli("show int address"))
344
345         capture = self.pg1.get_capture(len(pkts))
346
347         for pkt in capture:
348             self.logger.info(pkt.show2(dump=True))
349             self.logger.info(
350                 "GTP6.D SID0={}".format(str(pkt[IPv6ExtHdrSegmentRouting].addresses[0]))
351             )
352             self.logger.info(
353                 "GTP6.D SID1={}".format(str(pkt[IPv6ExtHdrSegmentRouting].addresses[1]))
354             )
355             self.assertEqual(str(pkt[IPv6ExtHdrSegmentRouting].addresses[0]), "2001::1")
356             self.assertEqual(
357                 str(pkt[IPv6ExtHdrSegmentRouting].addresses[1]), "d4::c800:0"
358             )