tests docs: update python3 venv packages
[vpp.git] / test / test_srv6_mobile.py
1 #!/usr/bin/env python3
2
3 from framework import VppTestCase
4 from ipaddress import IPv4Address
5 from ipaddress import IPv6Address
6 from scapy.contrib.gtp import *
7 from scapy.all import *
8
9
10 class TestSRv6EndMGTP4E(VppTestCase):
11     """SRv6 End.M.GTP4.E (SRv6 -> GTP-U)"""
12
13     @classmethod
14     def setUpClass(cls):
15         super(TestSRv6EndMGTP4E, cls).setUpClass()
16         try:
17             cls.create_pg_interfaces(range(2))
18             cls.pg_if_i = cls.pg_interfaces[0]
19             cls.pg_if_o = cls.pg_interfaces[1]
20
21             cls.pg_if_i.config_ip6()
22             cls.pg_if_o.config_ip4()
23
24             cls.ip4_dst = cls.pg_if_o.remote_ip4
25             # cls.ip4_src = cls.pg_if_o.local_ip4
26             cls.ip4_src = "192.168.192.10"
27
28             for pg_if in cls.pg_interfaces:
29                 pg_if.admin_up()
30                 pg_if.resolve_arp()
31
32         except Exception:
33             super(TestSRv6EndMGTP4E, cls).tearDownClass()
34             raise
35
36     def create_packets(self, inner):
37         ip4_dst = IPv4Address(str(self.ip4_dst))
38         # 32bit prefix + 32bit IPv4 DA + 8bit + 32bit TEID + 24bit
39         dst = b"\xaa" * 4 + ip4_dst.packed + b"\x11" + b"\xbb" * 4 + b"\x11" * 3
40         ip6_dst = IPv6Address(dst)
41
42         ip4_src = IPv4Address(str(self.ip4_src))
43         # 64bit prefix + 32bit IPv4 SA + 16 bit port + 16bit
44         src = b"\xcc" * 8 + ip4_src.packed + b"\xdd" * 2 + b"\x11" * 2
45         ip6_src = IPv6Address(src)
46
47         self.logger.info("ip4 dst: {}".format(ip4_dst))
48         self.logger.info("ip4 src: {}".format(ip4_src))
49         self.logger.info("ip6 dst (remote srgw): {}".format(ip6_dst))
50         self.logger.info("ip6 src (local  srgw): {}".format(ip6_src))
51
52         pkts = list()
53         for d, s in inner:
54             pkt = (
55                 Ether()
56                 / IPv6(dst=str(ip6_dst), src=str(ip6_src))
57                 / IPv6ExtHdrSegmentRouting()
58                 / IPv6(dst=d, src=s)
59                 / UDP(sport=1000, dport=23)
60             )
61             self.logger.info(pkt.show2(dump=True))
62             pkts.append(pkt)
63
64         return pkts
65
66     def test_srv6_mobile(self):
67         """test_srv6_mobile"""
68         pkts = self.create_packets([("A::1", "B::1"), ("C::1", "D::1")])
69
70         self.vapi.cli(
71             "sr localsid address {} behavior end.m.gtp4.e ".format(pkts[0]["IPv6"].dst)
72             + "v4src_position 64 fib-table 0"
73         )
74         self.logger.info(self.vapi.cli("show sr localsid"))
75
76         self.vapi.cli("clear errors")
77
78         self.pg0.add_stream(pkts)
79         self.pg_enable_capture(self.pg_interfaces)
80         self.pg_start()
81
82         self.logger.info(self.vapi.cli("show errors"))
83         self.logger.info(self.vapi.cli("show int address"))
84
85         capture = self.pg1.get_capture(len(pkts))
86
87         for pkt in capture:
88             self.logger.info(pkt.show2(dump=True))
89             self.assertEqual(pkt[IP].dst, self.ip4_dst)
90             self.assertEqual(pkt[IP].src, self.ip4_src)
91             self.assertEqual(pkt[GTP_U_Header].teid, 0xBBBBBBBB)
92
93
94 class TestSRv6TMGTP4D(VppTestCase):
95     """SRv6 T.M.GTP4.D (GTP-U -> SRv6)"""
96
97     @classmethod
98     def setUpClass(cls):
99         super(TestSRv6TMGTP4D, cls).setUpClass()
100         try:
101             cls.create_pg_interfaces(range(2))
102             cls.pg_if_i = cls.pg_interfaces[0]
103             cls.pg_if_o = cls.pg_interfaces[1]
104
105             cls.pg_if_i.config_ip4()
106             cls.pg_if_i.config_ip6()
107             cls.pg_if_o.config_ip4()
108             cls.pg_if_o.config_ip6()
109
110             cls.ip4_dst = "1.1.1.1"
111             cls.ip4_src = "2.2.2.2"
112
113             cls.ip6_dst = cls.pg_if_o.remote_ip6
114
115             for pg_if in cls.pg_interfaces:
116                 pg_if.admin_up()
117                 pg_if.resolve_arp()
118                 pg_if.resolve_ndp(timeout=5)
119
120         except Exception:
121             super(TestSRv6TMGTP4D, cls).tearDownClass()
122             raise
123
124     def create_packets(self, inner):
125         ip4_dst = IPv4Address(str(self.ip4_dst))
126
127         ip4_src = IPv4Address(str(self.ip4_src))
128
129         self.logger.info("ip4 dst: {}".format(ip4_dst))
130         self.logger.info("ip4 src: {}".format(ip4_src))
131
132         pkts = list()
133         for d, s in inner:
134             pkt = (
135                 Ether()
136                 / IP(dst=str(ip4_dst), src=str(ip4_src))
137                 / UDP(sport=2152, dport=2152)
138                 / GTP_U_Header(gtp_type="g_pdu", teid=200)
139                 / IPv6(dst=d, src=s)
140                 / UDP(sport=1000, dport=23)
141             )
142             self.logger.info(pkt.show2(dump=True))
143             pkts.append(pkt)
144
145         return pkts
146
147     def test_srv6_mobile(self):
148         """test_srv6_mobile"""
149         pkts = self.create_packets([("A::1", "B::1"), ("C::1", "D::1")])
150
151         self.vapi.cli("set sr encaps source addr A1::1")
152         self.vapi.cli("sr policy add bsid D4:: next D2:: next D3::")
153         self.vapi.cli(
154             "sr policy add bsid D5:: behavior t.m.gtp4.d D4::/32 "
155             + "v6src_prefix C1::/64 nhtype ipv6 fib-table 0 drop-in"
156         )
157         self.vapi.cli("sr steer l3 {}/32 via bsid D5::".format(self.ip4_dst))
158         self.vapi.cli("ip route add D2::/32 via {}".format(self.ip6_dst))
159
160         self.logger.info(self.vapi.cli("show sr steer"))
161         self.logger.info(self.vapi.cli("show sr policies"))
162
163         self.vapi.cli("clear errors")
164
165         self.pg0.add_stream(pkts)
166         self.pg_enable_capture(self.pg_interfaces)
167         self.pg_start()
168
169         self.logger.info(self.vapi.cli("show errors"))
170         self.logger.info(self.vapi.cli("show int address"))
171
172         capture = self.pg1.get_capture(len(pkts))
173
174         for pkt in capture:
175             self.logger.info(pkt.show2(dump=True))
176             self.logger.info(
177                 "GTP4.D Address={}".format(
178                     str(pkt[IPv6ExtHdrSegmentRouting].addresses[0])
179                 )
180             )
181             self.assertEqual(
182                 str(pkt[IPv6ExtHdrSegmentRouting].addresses[0]), "d4:0:101:101::c800:0"
183             )
184
185
186 class TestSRv6EndMGTP6E(VppTestCase):
187     """SRv6 End.M.GTP6.E"""
188
189     @classmethod
190     def setUpClass(cls):
191         super(TestSRv6EndMGTP6E, cls).setUpClass()
192         try:
193             cls.create_pg_interfaces(range(2))
194             cls.pg_if_i = cls.pg_interfaces[0]
195             cls.pg_if_o = cls.pg_interfaces[1]
196
197             cls.pg_if_i.config_ip6()
198             cls.pg_if_o.config_ip6()
199
200             cls.ip6_nhop = cls.pg_if_o.remote_ip6
201
202             for pg_if in cls.pg_interfaces:
203                 pg_if.admin_up()
204                 pg_if.resolve_ndp(timeout=5)
205
206         except Exception:
207             super(TestSRv6EndMGTP6E, cls).tearDownClass()
208             raise
209
210     def create_packets(self, inner):
211         # 64bit prefix + 8bit QFI + 32bit TEID + 24bit
212         dst = b"\xaa" * 8 + b"\x00" + b"\xbb" * 4 + b"\x00" * 3
213         ip6_dst = IPv6Address(dst)
214
215         self.ip6_dst = ip6_dst
216
217         src = b"\xcc" * 8 + b"\xdd" * 4 + b"\x11" * 4
218         ip6_src = IPv6Address(src)
219
220         self.ip6_src = ip6_src
221
222         pkts = list()
223         for d, s in inner:
224             pkt = (
225                 Ether()
226                 / IPv6(dst=str(ip6_dst), src=str(ip6_src))
227                 / IPv6ExtHdrSegmentRouting(
228                     segleft=1, lastentry=0, tag=0, addresses=["a1::1"]
229                 )
230                 / IPv6(dst=d, src=s)
231                 / UDP(sport=1000, dport=23)
232             )
233             self.logger.info(pkt.show2(dump=True))
234             pkts.append(pkt)
235
236         return pkts
237
238     def test_srv6_mobile(self):
239         """test_srv6_mobile"""
240         pkts = self.create_packets([("A::1", "B::1"), ("C::1", "D::1")])
241
242         self.vapi.cli(
243             "sr localsid prefix {}/64 behavior end.m.gtp6.e fib-table 0".format(
244                 pkts[0]["IPv6"].dst
245             )
246         )
247         self.vapi.cli("ip route add a1::/64 via {}".format(self.ip6_nhop))
248         self.logger.info(self.vapi.cli("show sr localsid"))
249
250         self.vapi.cli("clear errors")
251
252         self.pg0.add_stream(pkts)
253         self.pg_enable_capture(self.pg_interfaces)
254         self.pg_start()
255
256         self.logger.info(self.vapi.cli("show errors"))
257         self.logger.info(self.vapi.cli("show int address"))
258
259         capture = self.pg1.get_capture(len(pkts))
260
261         for pkt in capture:
262             self.logger.info(pkt.show2(dump=True))
263             self.assertEqual(pkt[IPv6].dst, "a1::1")
264             self.assertEqual(pkt[IPv6].src, str(self.ip6_src))
265             self.assertEqual(pkt[GTP_U_Header].teid, 0xBBBBBBBB)
266
267
268 class TestSRv6EndMGTP6D(VppTestCase):
269     """SRv6 End.M.GTP6.D"""
270
271     @classmethod
272     def setUpClass(cls):
273         super(TestSRv6EndMGTP6D, cls).setUpClass()
274         try:
275             cls.create_pg_interfaces(range(2))
276             cls.pg_if_i = cls.pg_interfaces[0]
277             cls.pg_if_o = cls.pg_interfaces[1]
278
279             cls.pg_if_i.config_ip6()
280             cls.pg_if_o.config_ip6()
281
282             cls.ip6_nhop = cls.pg_if_o.remote_ip6
283
284             cls.ip6_dst = "2001::1"
285             cls.ip6_src = "2002::1"
286
287             for pg_if in cls.pg_interfaces:
288                 pg_if.admin_up()
289                 pg_if.resolve_ndp(timeout=5)
290
291         except Exception:
292             super(TestSRv6EndMGTP6D, cls).tearDownClass()
293             raise
294
295     def create_packets(self, inner):
296         ip6_dst = IPv6Address(str(self.ip6_dst))
297
298         ip6_src = IPv6Address(str(self.ip6_src))
299
300         self.logger.info("ip6 dst: {}".format(ip6_dst))
301         self.logger.info("ip6 src: {}".format(ip6_src))
302
303         pkts = list()
304         for d, s in inner:
305             pkt = (
306                 Ether()
307                 / IPv6(dst=str(ip6_dst), src=str(ip6_src))
308                 / UDP(sport=2152, dport=2152)
309                 / GTP_U_Header(gtp_type="g_pdu", teid=200)
310                 / IPv6(dst=d, src=s)
311                 / UDP(sport=1000, dport=23)
312             )
313             self.logger.info(pkt.show2(dump=True))
314             pkts.append(pkt)
315
316         return pkts
317
318     def test_srv6_mobile(self):
319         """test_srv6_mobile"""
320         pkts = self.create_packets([("A::1", "B::1"), ("C::1", "D::1")])
321
322         self.vapi.cli("set sr encaps source addr A1::1")
323         self.vapi.cli("sr policy add bsid D4:: next D2:: next D3::")
324         self.vapi.cli(
325             "sr localsid prefix 2001::/64 behavior end.m.gtp6.d "
326             + "D4::/64 fib-table 0 drop-in"
327         )
328         self.vapi.cli("ip route add D2::/64 via {}".format(self.ip6_nhop))
329
330         self.logger.info(self.vapi.cli("show sr policies"))
331         self.logger.info(self.vapi.cli("show sr localsid"))
332
333         self.vapi.cli("clear errors")
334
335         self.pg0.add_stream(pkts)
336         self.pg_enable_capture(self.pg_interfaces)
337         self.pg_start()
338
339         self.logger.info(self.vapi.cli("show errors"))
340         self.logger.info(self.vapi.cli("show int address"))
341
342         capture = self.pg1.get_capture(len(pkts))
343
344         for pkt in capture:
345             self.logger.info(pkt.show2(dump=True))
346             self.logger.info(
347                 "GTP6.D SID0={}".format(str(pkt[IPv6ExtHdrSegmentRouting].addresses[0]))
348             )
349             self.logger.info(
350                 "GTP6.D SID1={}".format(str(pkt[IPv6ExtHdrSegmentRouting].addresses[1]))
351             )
352             self.assertEqual(str(pkt[IPv6ExtHdrSegmentRouting].addresses[0]), "2001::1")
353             self.assertEqual(
354                 str(pkt[IPv6ExtHdrSegmentRouting].addresses[1]), "d4::c800:0"
355             )