2ac37cefca94ece121d91d2b90455e44b8323788
[vpp.git] / src / plugins / srv6-mobile / test / test_srv6_mobile.py
1 #!/usr/bin/env python
2
3 from framework import VppTestCase
4 from ipaddress import IPv4Address
5 from ipaddress import IPv6Address
6 from scapy.contrib.gtp import *
7 from scapy.all import *
8
9
10 class TestSRv6EndMGTP4E(VppTestCase):
11     """ SRv6 End.M.GTP4.E (SRv6 -> GTP-U) """
12
13     @classmethod
14     def setUpClass(cls):
15         super(TestSRv6EndMGTP4E, cls).setUpClass()
16         try:
17             cls.create_pg_interfaces(range(2))
18             cls.pg_if_i = cls.pg_interfaces[0]
19             cls.pg_if_o = cls.pg_interfaces[1]
20
21             cls.pg_if_i.config_ip6()
22             cls.pg_if_o.config_ip4()
23
24             cls.ip4_dst = cls.pg_if_o.remote_ip4
25             # cls.ip4_src = cls.pg_if_o.local_ip4
26             cls.ip4_src = "192.168.192.10"
27
28             for pg_if in cls.pg_interfaces:
29                 pg_if.admin_up()
30                 pg_if.resolve_arp()
31
32         except Exception:
33             super(TestSRv6EndMGTP4E, cls).tearDownClass()
34             raise
35
36     def create_packets(self, inner):
37
38         ip4_dst = IPv4Address(str(self.ip4_dst))
39         # 32bit prefix + 32bit IPv4 DA + 8bit + 32bit TEID + 24bit
40         dst = b'\xaa' * 4 + ip4_dst.packed + \
41             b'\x11' + b'\xbb' * 4 + b'\x11' * 3
42         ip6_dst = IPv6Address(dst)
43
44         ip4_src = IPv4Address(str(self.ip4_src))
45         # 64bit prefix + 32bit IPv4 SA + 16 bit port + 16bit
46         src = b'\xcc' * 8 + ip4_src.packed + \
47             b'\xdd' * 2 + b'\x11' * 2
48         ip6_src = IPv6Address(src)
49
50         self.logger.info("ip4 dst: {}".format(ip4_dst))
51         self.logger.info("ip4 src: {}".format(ip4_src))
52         self.logger.info("ip6 dst (remote srgw): {}".format(ip6_dst))
53         self.logger.info("ip6 src (local  srgw): {}".format(ip6_src))
54
55         pkts = list()
56         for d, s in inner:
57             pkt = (Ether() /
58                    IPv6(dst=str(ip6_dst), src=str(ip6_src)) /
59                    IPv6ExtHdrSegmentRouting() /
60                    IPv6(dst=d, src=s) /
61                    UDP(sport=1000, dport=23))
62             self.logger.info(pkt.show2(dump=True))
63             pkts.append(pkt)
64
65         return pkts
66
67     def test_srv6_mobile(self):
68         """ test_srv6_mobile """
69         pkts = self.create_packets([("A::1", "B::1"), ("C::1", "D::1")])
70
71         self.vapi.cli(
72             "sr localsid address {} behavior end.m.gtp4.e v4src_position 64"
73             .format(pkts[0]['IPv6'].dst))
74         self.logger.info(self.vapi.cli("show sr localsids"))
75
76         self.vapi.cli("clear errors")
77
78         self.pg0.add_stream(pkts)
79         self.pg_enable_capture(self.pg_interfaces)
80         self.pg_start()
81
82         self.logger.info(self.vapi.cli("show errors"))
83         self.logger.info(self.vapi.cli("show int address"))
84
85         capture = self.pg1.get_capture(len(pkts))
86
87         for pkt in capture:
88             self.logger.info(pkt.show2(dump=True))
89             self.assertEqual(pkt[IP].dst, self.ip4_dst)
90             self.assertEqual(pkt[IP].src, self.ip4_src)
91             self.assertEqual(pkt[GTP_U_Header].teid, 0xbbbbbbbb)
92
93
94 class TestSRv6TMGTP4D(VppTestCase):
95     """ SRv6 T.M.GTP4.D (GTP-U -> SRv6) """
96
97     @classmethod
98     def setUpClass(cls):
99         super(TestSRv6TMGTP4D, cls).setUpClass()
100         try:
101             cls.create_pg_interfaces(range(2))
102             cls.pg_if_i = cls.pg_interfaces[0]
103             cls.pg_if_o = cls.pg_interfaces[1]
104
105             cls.pg_if_i.config_ip4()
106             cls.pg_if_i.config_ip6()
107             cls.pg_if_o.config_ip4()
108             cls.pg_if_o.config_ip6()
109
110             cls.ip4_dst = "1.1.1.1"
111             cls.ip4_src = "2.2.2.2"
112
113             cls.ip6_dst = cls.pg_if_o.remote_ip6
114
115             for pg_if in cls.pg_interfaces:
116                 pg_if.admin_up()
117                 pg_if.resolve_arp()
118                 pg_if.resolve_ndp(timeout=5)
119
120         except Exception:
121             super(TestSRv6TMGTP4D, cls).tearDownClass()
122             raise
123
124     def create_packets(self, inner):
125
126         ip4_dst = IPv4Address(str(self.ip4_dst))
127
128         ip4_src = IPv4Address(str(self.ip4_src))
129
130         self.logger.info("ip4 dst: {}".format(ip4_dst))
131         self.logger.info("ip4 src: {}".format(ip4_src))
132
133         pkts = list()
134         for d, s in inner:
135             pkt = (Ether() /
136                    IP(dst=str(ip4_dst), src=str(ip4_src)) /
137                    UDP(sport=2152, dport=2152) /
138                    GTP_U_Header(gtp_type="g_pdu", teid=200) /
139                    IPv6(dst=d, src=s) /
140                    UDP(sport=1000, dport=23))
141             self.logger.info(pkt.show2(dump=True))
142             pkts.append(pkt)
143
144         return pkts
145
146     def test_srv6_mobile(self):
147         """ test_srv6_mobile """
148         pkts = self.create_packets([("A::1", "B::1"), ("C::1", "D::1")])
149
150         self.vapi.cli("set sr encaps source addr A1::1")
151         self.vapi.cli("sr policy add bsid D4:: next D2:: next D3::")
152         self.vapi.cli(
153             "sr policy add bsid D5:: behavior t.m.gtp4.d"
154             "D4::/32 v6src_prefix C1::/64 nhtype ipv6")
155         self.vapi.cli("sr steer l3 {}/32 via bsid D5::".format(self.ip4_dst))
156         self.vapi.cli("ip route add D2::/32 via {}".format(self.ip6_dst))
157
158         self.logger.info(self.vapi.cli("show sr steer"))
159         self.logger.info(self.vapi.cli("show sr policies"))
160
161         self.vapi.cli("clear errors")
162
163         self.pg0.add_stream(pkts)
164         self.pg_enable_capture(self.pg_interfaces)
165         self.pg_start()
166
167         self.logger.info(self.vapi.cli("show errors"))
168         self.logger.info(self.vapi.cli("show int address"))
169
170         capture = self.pg1.get_capture(len(pkts))
171
172         for pkt in capture:
173             self.logger.info(pkt.show2(dump=True))
174             self.logger.info("GTP4.D Address={}".format(
175                 str(pkt[IPv6ExtHdrSegmentRouting].addresses[0])))
176             self.assertEqual(
177                 str(pkt[IPv6ExtHdrSegmentRouting].addresses[0]),
178                 "d4:0:101:101::c800:0")
179
180
181 class TestSRv6EndMGTP6E(VppTestCase):
182     """ SRv6 End.M.GTP6.E """
183
184     @classmethod
185     def setUpClass(cls):
186         super(TestSRv6EndMGTP6E, cls).setUpClass()
187         try:
188             cls.create_pg_interfaces(range(2))
189             cls.pg_if_i = cls.pg_interfaces[0]
190             cls.pg_if_o = cls.pg_interfaces[1]
191
192             cls.pg_if_i.config_ip6()
193             cls.pg_if_o.config_ip6()
194
195             cls.ip6_nhop = cls.pg_if_o.remote_ip6
196
197             for pg_if in cls.pg_interfaces:
198                 pg_if.admin_up()
199                 pg_if.resolve_ndp(timeout=5)
200
201         except Exception:
202             super(TestSRv6EndMGTP6E, cls).tearDownClass()
203             raise
204
205     def create_packets(self, inner):
206         # 64bit prefix + 32bit TEID + 8bit QFI + 24bit
207         dst = b'\xaa' * 8 + b'\x00' + \
208             b'\xbb' * 4 + b'\x00' * 3
209         ip6_dst = IPv6Address(dst)
210
211         self.ip6_dst = ip6_dst
212
213         src = b'\xcc' * 8 + \
214             b'\xdd' * 4 + b'\x11' * 4
215         ip6_src = IPv6Address(src)
216
217         self.ip6_src = ip6_src
218
219         pkts = list()
220         for d, s in inner:
221             pkt = (Ether() /
222                    IPv6(dst=str(ip6_dst),
223                         src=str(ip6_src)) /
224                    IPv6ExtHdrSegmentRouting(segleft=1,
225                                             lastentry=0,
226                                             tag=0,
227                                             addresses=["a1::1"]) /
228                    IPv6(dst=d, src=s) / UDP(sport=1000, dport=23))
229             self.logger.info(pkt.show2(dump=True))
230             pkts.append(pkt)
231
232         return pkts
233
234     def test_srv6_mobile(self):
235         """ test_srv6_mobile """
236         pkts = self.create_packets([("A::1", "B::1"), ("C::1", "D::1")])
237
238         self.vapi.cli(
239             "sr localsid address {}/64 behavior end.m.gtp6.e"
240             .format(pkts[0]['IPv6'].dst))
241         self.vapi.cli(
242             "ip route add a1::/64 via {}".format(self.ip6_nhop))
243         self.logger.info(self.vapi.cli("show sr localsids"))
244
245         self.vapi.cli("clear errors")
246
247         self.pg0.add_stream(pkts)
248         self.pg_enable_capture(self.pg_interfaces)
249         self.pg_start()
250
251         self.logger.info(self.vapi.cli("show errors"))
252         self.logger.info(self.vapi.cli("show int address"))
253
254         capture = self.pg1.get_capture(len(pkts))
255
256         for pkt in capture:
257             self.logger.info(pkt.show2(dump=True))
258             self.assertEqual(pkt[IPv6].dst, "a1::1")
259             self.assertEqual(pkt[IPv6].src, str(self.ip6_src))
260             self.assertEqual(pkt[GTP_U_Header].teid, 0xbbbbbbbb)
261
262
263 class TestSRv6EndMGTP6D(VppTestCase):
264     """ SRv6 End.M.GTP6.D """
265
266     @classmethod
267     def setUpClass(cls):
268         super(TestSRv6EndMGTP6D, cls).setUpClass()
269         try:
270             cls.create_pg_interfaces(range(2))
271             cls.pg_if_i = cls.pg_interfaces[0]
272             cls.pg_if_o = cls.pg_interfaces[1]
273
274             cls.pg_if_i.config_ip6()
275             cls.pg_if_o.config_ip6()
276
277             cls.ip6_nhop = cls.pg_if_o.remote_ip6
278
279             cls.ip6_dst = "2001::1"
280             cls.ip6_src = "2002::1"
281
282             for pg_if in cls.pg_interfaces:
283                 pg_if.admin_up()
284                 pg_if.resolve_ndp(timeout=5)
285
286         except Exception:
287             super(TestSRv6EndMGTP6D, cls).tearDownClass()
288             raise
289
290     def create_packets(self, inner):
291
292         ip6_dst = IPv6Address(str(self.ip6_dst))
293
294         ip6_src = IPv6Address(str(self.ip6_src))
295
296         self.logger.info("ip6 dst: {}".format(ip6_dst))
297         self.logger.info("ip6 src: {}".format(ip6_src))
298
299         pkts = list()
300         for d, s in inner:
301             pkt = (Ether() /
302                    IPv6(dst=str(ip6_dst), src=str(ip6_src)) /
303                    UDP(sport=2152, dport=2152) /
304                    GTP_U_Header(gtp_type="g_pdu", teid=200) /
305                    IPv6(dst=d, src=s) /
306                    UDP(sport=1000, dport=23))
307             self.logger.info(pkt.show2(dump=True))
308             pkts.append(pkt)
309
310         return pkts
311
312     def test_srv6_mobile(self):
313         """ test_srv6_mobile """
314         pkts = self.create_packets([("A::1", "B::1"), ("C::1", "D::1")])
315
316         self.vapi.cli("set sr encaps source addr A1::1")
317         self.vapi.cli("sr policy add bsid D4:: next D2:: next D3::")
318         self.vapi.cli(
319             "sr localsid address 2001::/64 behavior end.m.gtp6.d D4::/64")
320         self.vapi.cli("ip route add D2::/64 via {}".format(self.ip6_nhop))
321
322         self.logger.info(self.vapi.cli("show sr policies"))
323
324         self.vapi.cli("clear errors")
325
326         self.pg0.add_stream(pkts)
327         self.pg_enable_capture(self.pg_interfaces)
328         self.pg_start()
329
330         self.logger.info(self.vapi.cli("show errors"))
331         self.logger.info(self.vapi.cli("show int address"))
332
333         capture = self.pg1.get_capture(len(pkts))
334
335         for pkt in capture:
336             self.logger.info(pkt.show2(dump=True))
337             self.logger.info("GTP6.D Address={}".format(
338                 str(pkt[IPv6ExtHdrSegmentRouting].addresses[0])))
339             self.assertEqual(
340                 str(pkt[IPv6ExtHdrSegmentRouting].addresses[0]), "d4::c800:0")