tests: Fix the FIB UT
[vpp.git] / test / test_srv6_mobile.py
1 #!/usr/bin/env python3
2
3 from framework import VppTestCase
4 from ipaddress import IPv4Address
5 from ipaddress import IPv6Address
6 from scapy.contrib.gtp import *
7 from scapy.all import *
8
9
10 class TestSRv6EndMGTP4E(VppTestCase):
11     """ SRv6 End.M.GTP4.E (SRv6 -> GTP-U) """
12
13     @classmethod
14     def setUpClass(cls):
15         super(TestSRv6EndMGTP4E, cls).setUpClass()
16         try:
17             cls.create_pg_interfaces(range(2))
18             cls.pg_if_i = cls.pg_interfaces[0]
19             cls.pg_if_o = cls.pg_interfaces[1]
20
21             cls.pg_if_i.config_ip6()
22             cls.pg_if_o.config_ip4()
23
24             cls.ip4_dst = cls.pg_if_o.remote_ip4
25             # cls.ip4_src = cls.pg_if_o.local_ip4
26             cls.ip4_src = "192.168.192.10"
27
28             for pg_if in cls.pg_interfaces:
29                 pg_if.admin_up()
30                 pg_if.resolve_arp()
31
32         except Exception:
33             super(TestSRv6EndMGTP4E, cls).tearDownClass()
34             raise
35
36     def create_packets(self, inner):
37
38         ip4_dst = IPv4Address(str(self.ip4_dst))
39         # 32bit prefix + 32bit IPv4 DA + 8bit + 32bit TEID + 24bit
40         dst = b'\xaa' * 4 + ip4_dst.packed + \
41             b'\x11' + b'\xbb' * 4 + b'\x11' * 3
42         ip6_dst = IPv6Address(dst)
43
44         ip4_src = IPv4Address(str(self.ip4_src))
45         # 64bit prefix + 32bit IPv4 SA + 16 bit port + 16bit
46         src = b'\xcc' * 8 + ip4_src.packed + \
47             b'\xdd' * 2 + b'\x11' * 2
48         ip6_src = IPv6Address(src)
49
50         self.logger.info("ip4 dst: {}".format(ip4_dst))
51         self.logger.info("ip4 src: {}".format(ip4_src))
52         self.logger.info("ip6 dst (remote srgw): {}".format(ip6_dst))
53         self.logger.info("ip6 src (local  srgw): {}".format(ip6_src))
54
55         pkts = list()
56         for d, s in inner:
57             pkt = (Ether() /
58                    IPv6(dst=str(ip6_dst), src=str(ip6_src)) /
59                    IPv6ExtHdrSegmentRouting() /
60                    IPv6(dst=d, src=s) /
61                    UDP(sport=1000, dport=23))
62             self.logger.info(pkt.show2(dump=True))
63             pkts.append(pkt)
64
65         return pkts
66
67     def test_srv6_mobile(self):
68         """ test_srv6_mobile """
69         pkts = self.create_packets([("A::1", "B::1"), ("C::1", "D::1")])
70
71         self.vapi.cli(
72             "sr localsid address {} behavior end.m.gtp4.e "
73             .format(pkts[0]['IPv6'].dst) +
74             "v4src_position 64 fib-table 0")
75         self.logger.info(self.vapi.cli("show sr localsid"))
76
77         self.vapi.cli("clear errors")
78
79         self.pg0.add_stream(pkts)
80         self.pg_enable_capture(self.pg_interfaces)
81         self.pg_start()
82
83         self.logger.info(self.vapi.cli("show errors"))
84         self.logger.info(self.vapi.cli("show int address"))
85
86         capture = self.pg1.get_capture(len(pkts))
87
88         for pkt in capture:
89             self.logger.info(pkt.show2(dump=True))
90             self.assertEqual(pkt[IP].dst, self.ip4_dst)
91             self.assertEqual(pkt[IP].src, self.ip4_src)
92             self.assertEqual(pkt[GTP_U_Header].teid, 0xbbbbbbbb)
93
94
95 class TestSRv6TMGTP4D(VppTestCase):
96     """ SRv6 T.M.GTP4.D (GTP-U -> SRv6) """
97
98     @classmethod
99     def setUpClass(cls):
100         super(TestSRv6TMGTP4D, cls).setUpClass()
101         try:
102             cls.create_pg_interfaces(range(2))
103             cls.pg_if_i = cls.pg_interfaces[0]
104             cls.pg_if_o = cls.pg_interfaces[1]
105
106             cls.pg_if_i.config_ip4()
107             cls.pg_if_i.config_ip6()
108             cls.pg_if_o.config_ip4()
109             cls.pg_if_o.config_ip6()
110
111             cls.ip4_dst = "1.1.1.1"
112             cls.ip4_src = "2.2.2.2"
113
114             cls.ip6_dst = cls.pg_if_o.remote_ip6
115
116             for pg_if in cls.pg_interfaces:
117                 pg_if.admin_up()
118                 pg_if.resolve_arp()
119                 pg_if.resolve_ndp(timeout=5)
120
121         except Exception:
122             super(TestSRv6TMGTP4D, cls).tearDownClass()
123             raise
124
125     def create_packets(self, inner):
126
127         ip4_dst = IPv4Address(str(self.ip4_dst))
128
129         ip4_src = IPv4Address(str(self.ip4_src))
130
131         self.logger.info("ip4 dst: {}".format(ip4_dst))
132         self.logger.info("ip4 src: {}".format(ip4_src))
133
134         pkts = list()
135         for d, s in inner:
136             pkt = (Ether() /
137                    IP(dst=str(ip4_dst), src=str(ip4_src)) /
138                    UDP(sport=2152, dport=2152) /
139                    GTP_U_Header(gtp_type="g_pdu", teid=200) /
140                    IPv6(dst=d, src=s) /
141                    UDP(sport=1000, dport=23))
142             self.logger.info(pkt.show2(dump=True))
143             pkts.append(pkt)
144
145         return pkts
146
147     def test_srv6_mobile(self):
148         """ test_srv6_mobile """
149         pkts = self.create_packets([("A::1", "B::1"), ("C::1", "D::1")])
150
151         self.vapi.cli("set sr encaps source addr A1::1")
152         self.vapi.cli("sr policy add bsid D4:: next D2:: next D3::")
153         self.vapi.cli(
154             "sr policy add bsid D5:: behavior t.m.gtp4.d D4::/32 " +
155             "v6src_prefix C1::/64 nhtype ipv6 fib-table 0 drop-in")
156         self.vapi.cli("sr steer l3 {}/32 via bsid D5::".format(self.ip4_dst))
157         self.vapi.cli("ip route add D2::/32 via {}".format(self.ip6_dst))
158
159         self.logger.info(self.vapi.cli("show sr steer"))
160         self.logger.info(self.vapi.cli("show sr policies"))
161
162         self.vapi.cli("clear errors")
163
164         self.pg0.add_stream(pkts)
165         self.pg_enable_capture(self.pg_interfaces)
166         self.pg_start()
167
168         self.logger.info(self.vapi.cli("show errors"))
169         self.logger.info(self.vapi.cli("show int address"))
170
171         capture = self.pg1.get_capture(len(pkts))
172
173         for pkt in capture:
174             self.logger.info(pkt.show2(dump=True))
175             self.logger.info("GTP4.D Address={}".format(
176                 str(pkt[IPv6ExtHdrSegmentRouting].addresses[0])))
177             self.assertEqual(
178                 str(pkt[IPv6ExtHdrSegmentRouting].addresses[0]),
179                 "d4:0:101:101::c800:0")
180
181
182 class TestSRv6EndMGTP6E(VppTestCase):
183     """ SRv6 End.M.GTP6.E """
184
185     @classmethod
186     def setUpClass(cls):
187         super(TestSRv6EndMGTP6E, cls).setUpClass()
188         try:
189             cls.create_pg_interfaces(range(2))
190             cls.pg_if_i = cls.pg_interfaces[0]
191             cls.pg_if_o = cls.pg_interfaces[1]
192
193             cls.pg_if_i.config_ip6()
194             cls.pg_if_o.config_ip6()
195
196             cls.ip6_nhop = cls.pg_if_o.remote_ip6
197
198             for pg_if in cls.pg_interfaces:
199                 pg_if.admin_up()
200                 pg_if.resolve_ndp(timeout=5)
201
202         except Exception:
203             super(TestSRv6EndMGTP6E, cls).tearDownClass()
204             raise
205
206     def create_packets(self, inner):
207         # 64bit prefix + 8bit QFI + 32bit TEID + 24bit
208         dst = b'\xaa' * 8 + b'\x00' + \
209             b'\xbb' * 4 + b'\x00' * 3
210         ip6_dst = IPv6Address(dst)
211
212         self.ip6_dst = ip6_dst
213
214         src = b'\xcc' * 8 + \
215             b'\xdd' * 4 + b'\x11' * 4
216         ip6_src = IPv6Address(src)
217
218         self.ip6_src = ip6_src
219
220         pkts = list()
221         for d, s in inner:
222             pkt = (Ether() /
223                    IPv6(dst=str(ip6_dst),
224                         src=str(ip6_src)) /
225                    IPv6ExtHdrSegmentRouting(segleft=1,
226                                             lastentry=0,
227                                             tag=0,
228                                             addresses=["a1::1"]) /
229                    IPv6(dst=d, src=s) / UDP(sport=1000, dport=23))
230             self.logger.info(pkt.show2(dump=True))
231             pkts.append(pkt)
232
233         return pkts
234
235     def test_srv6_mobile(self):
236         """ test_srv6_mobile """
237         pkts = self.create_packets([("A::1", "B::1"), ("C::1", "D::1")])
238
239         self.vapi.cli(
240             "sr localsid prefix {}/64 behavior end.m.gtp6.e fib-table 0"
241             .format(pkts[0]['IPv6'].dst))
242         self.vapi.cli(
243             "ip route add a1::/64 via {}".format(self.ip6_nhop))
244         self.logger.info(self.vapi.cli("show sr localsid"))
245
246         self.vapi.cli("clear errors")
247
248         self.pg0.add_stream(pkts)
249         self.pg_enable_capture(self.pg_interfaces)
250         self.pg_start()
251
252         self.logger.info(self.vapi.cli("show errors"))
253         self.logger.info(self.vapi.cli("show int address"))
254
255         capture = self.pg1.get_capture(len(pkts))
256
257         for pkt in capture:
258             self.logger.info(pkt.show2(dump=True))
259             self.assertEqual(pkt[IPv6].dst, "a1::1")
260             self.assertEqual(pkt[IPv6].src, str(self.ip6_src))
261             self.assertEqual(pkt[GTP_U_Header].teid, 0xbbbbbbbb)
262
263
264 class TestSRv6EndMGTP6D(VppTestCase):
265     """ SRv6 End.M.GTP6.D """
266
267     @classmethod
268     def setUpClass(cls):
269         super(TestSRv6EndMGTP6D, cls).setUpClass()
270         try:
271             cls.create_pg_interfaces(range(2))
272             cls.pg_if_i = cls.pg_interfaces[0]
273             cls.pg_if_o = cls.pg_interfaces[1]
274
275             cls.pg_if_i.config_ip6()
276             cls.pg_if_o.config_ip6()
277
278             cls.ip6_nhop = cls.pg_if_o.remote_ip6
279
280             cls.ip6_dst = "2001::1"
281             cls.ip6_src = "2002::1"
282
283             for pg_if in cls.pg_interfaces:
284                 pg_if.admin_up()
285                 pg_if.resolve_ndp(timeout=5)
286
287         except Exception:
288             super(TestSRv6EndMGTP6D, cls).tearDownClass()
289             raise
290
291     def create_packets(self, inner):
292
293         ip6_dst = IPv6Address(str(self.ip6_dst))
294
295         ip6_src = IPv6Address(str(self.ip6_src))
296
297         self.logger.info("ip6 dst: {}".format(ip6_dst))
298         self.logger.info("ip6 src: {}".format(ip6_src))
299
300         pkts = list()
301         for d, s in inner:
302             pkt = (Ether() /
303                    IPv6(dst=str(ip6_dst), src=str(ip6_src)) /
304                    UDP(sport=2152, dport=2152) /
305                    GTP_U_Header(gtp_type="g_pdu", teid=200) /
306                    IPv6(dst=d, src=s) /
307                    UDP(sport=1000, dport=23))
308             self.logger.info(pkt.show2(dump=True))
309             pkts.append(pkt)
310
311         return pkts
312
313     def test_srv6_mobile(self):
314         """ test_srv6_mobile """
315         pkts = self.create_packets([("A::1", "B::1"), ("C::1", "D::1")])
316
317         self.vapi.cli("set sr encaps source addr A1::1")
318         self.vapi.cli("sr policy add bsid D4:: next D2:: next D3::")
319         self.vapi.cli(
320             "sr localsid prefix 2001::/64 behavior end.m.gtp6.d " +
321             "D4::/64 fib-table 0 drop-in")
322         self.vapi.cli("ip route add D2::/64 via {}".format(self.ip6_nhop))
323
324         self.logger.info(self.vapi.cli("show sr policies"))
325         self.logger.info(self.vapi.cli("show sr localsid"))
326
327         self.vapi.cli("clear errors")
328
329         self.pg0.add_stream(pkts)
330         self.pg_enable_capture(self.pg_interfaces)
331         self.pg_start()
332
333         self.logger.info(self.vapi.cli("show errors"))
334         self.logger.info(self.vapi.cli("show int address"))
335
336         capture = self.pg1.get_capture(len(pkts))
337
338         for pkt in capture:
339             self.logger.info(pkt.show2(dump=True))
340             self.logger.info("GTP6.D SID0={}".format(
341                 str(pkt[IPv6ExtHdrSegmentRouting].addresses[0])))
342             self.logger.info("GTP6.D SID1={}".format(
343                 str(pkt[IPv6ExtHdrSegmentRouting].addresses[1])))
344             self.assertEqual(
345                 str(pkt[IPv6ExtHdrSegmentRouting].addresses[0]), "2001::1")
346             self.assertEqual(
347                 str(pkt[IPv6ExtHdrSegmentRouting].addresses[1]), "d4::c800:0")