3 from framework import VppTestCase
4 from ipaddress import IPv4Address
5 from ipaddress import IPv6Address
6 from scapy.contrib.gtp import *
7 from scapy.all import *
10 class TestSRv6EndMGTP4E(VppTestCase):
11 """SRv6 End.M.GTP4.E (SRv6 -> GTP-U)"""
15 super(TestSRv6EndMGTP4E, cls).setUpClass()
17 cls.create_pg_interfaces(range(2))
18 cls.pg_if_i = cls.pg_interfaces[0]
19 cls.pg_if_o = cls.pg_interfaces[1]
21 cls.pg_if_i.config_ip6()
22 cls.pg_if_o.config_ip4()
24 cls.ip4_dst = cls.pg_if_o.remote_ip4
25 # cls.ip4_src = cls.pg_if_o.local_ip4
26 cls.ip4_src = "192.168.192.10"
28 for pg_if in cls.pg_interfaces:
33 super(TestSRv6EndMGTP4E, cls).tearDownClass()
36 def create_packets(self, inner):
38 ip4_dst = IPv4Address(str(self.ip4_dst))
39 # 32bit prefix + 32bit IPv4 DA + 8bit + 32bit TEID + 24bit
40 dst = b"\xaa" * 4 + ip4_dst.packed + b"\x11" + b"\xbb" * 4 + b"\x11" * 3
41 ip6_dst = IPv6Address(dst)
43 ip4_src = IPv4Address(str(self.ip4_src))
44 # 64bit prefix + 32bit IPv4 SA + 16 bit port + 16bit
45 src = b"\xcc" * 8 + ip4_src.packed + b"\xdd" * 2 + b"\x11" * 2
46 ip6_src = IPv6Address(src)
48 self.logger.info("ip4 dst: {}".format(ip4_dst))
49 self.logger.info("ip4 src: {}".format(ip4_src))
50 self.logger.info("ip6 dst (remote srgw): {}".format(ip6_dst))
51 self.logger.info("ip6 src (local srgw): {}".format(ip6_src))
57 / IPv6(dst=str(ip6_dst), src=str(ip6_src))
58 / IPv6ExtHdrSegmentRouting()
60 / UDP(sport=1000, dport=23)
62 self.logger.info(pkt.show2(dump=True))
67 def test_srv6_mobile(self):
68 """test_srv6_mobile"""
69 pkts = self.create_packets([("A::1", "B::1"), ("C::1", "D::1")])
72 "sr localsid address {} behavior end.m.gtp4.e ".format(pkts[0]["IPv6"].dst)
73 + "v4src_position 64 fib-table 0"
75 self.logger.info(self.vapi.cli("show sr localsid"))
77 self.vapi.cli("clear errors")
79 self.pg0.add_stream(pkts)
80 self.pg_enable_capture(self.pg_interfaces)
83 self.logger.info(self.vapi.cli("show errors"))
84 self.logger.info(self.vapi.cli("show int address"))
86 capture = self.pg1.get_capture(len(pkts))
89 self.logger.info(pkt.show2(dump=True))
90 self.assertEqual(pkt[IP].dst, self.ip4_dst)
91 self.assertEqual(pkt[IP].src, self.ip4_src)
92 self.assertEqual(pkt[GTP_U_Header].teid, 0xBBBBBBBB)
95 class TestSRv6TMGTP4D(VppTestCase):
96 """SRv6 T.M.GTP4.D (GTP-U -> SRv6)"""
100 super(TestSRv6TMGTP4D, cls).setUpClass()
102 cls.create_pg_interfaces(range(2))
103 cls.pg_if_i = cls.pg_interfaces[0]
104 cls.pg_if_o = cls.pg_interfaces[1]
106 cls.pg_if_i.config_ip4()
107 cls.pg_if_i.config_ip6()
108 cls.pg_if_o.config_ip4()
109 cls.pg_if_o.config_ip6()
111 cls.ip4_dst = "1.1.1.1"
112 cls.ip4_src = "2.2.2.2"
114 cls.ip6_dst = cls.pg_if_o.remote_ip6
116 for pg_if in cls.pg_interfaces:
119 pg_if.resolve_ndp(timeout=5)
122 super(TestSRv6TMGTP4D, cls).tearDownClass()
125 def create_packets(self, inner):
127 ip4_dst = IPv4Address(str(self.ip4_dst))
129 ip4_src = IPv4Address(str(self.ip4_src))
131 self.logger.info("ip4 dst: {}".format(ip4_dst))
132 self.logger.info("ip4 src: {}".format(ip4_src))
138 / IP(dst=str(ip4_dst), src=str(ip4_src))
139 / UDP(sport=2152, dport=2152)
140 / GTP_U_Header(gtp_type="g_pdu", teid=200)
142 / UDP(sport=1000, dport=23)
144 self.logger.info(pkt.show2(dump=True))
149 def test_srv6_mobile(self):
150 """test_srv6_mobile"""
151 pkts = self.create_packets([("A::1", "B::1"), ("C::1", "D::1")])
153 self.vapi.cli("set sr encaps source addr A1::1")
154 self.vapi.cli("sr policy add bsid D4:: next D2:: next D3::")
156 "sr policy add bsid D5:: behavior t.m.gtp4.d D4::/32 "
157 + "v6src_prefix C1::/64 nhtype ipv6 fib-table 0 drop-in"
159 self.vapi.cli("sr steer l3 {}/32 via bsid D5::".format(self.ip4_dst))
160 self.vapi.cli("ip route add D2::/32 via {}".format(self.ip6_dst))
162 self.logger.info(self.vapi.cli("show sr steer"))
163 self.logger.info(self.vapi.cli("show sr policies"))
165 self.vapi.cli("clear errors")
167 self.pg0.add_stream(pkts)
168 self.pg_enable_capture(self.pg_interfaces)
171 self.logger.info(self.vapi.cli("show errors"))
172 self.logger.info(self.vapi.cli("show int address"))
174 capture = self.pg1.get_capture(len(pkts))
177 self.logger.info(pkt.show2(dump=True))
179 "GTP4.D Address={}".format(
180 str(pkt[IPv6ExtHdrSegmentRouting].addresses[0])
184 str(pkt[IPv6ExtHdrSegmentRouting].addresses[0]), "d4:0:101:101::c800:0"
188 class TestSRv6EndMGTP6E(VppTestCase):
189 """SRv6 End.M.GTP6.E"""
193 super(TestSRv6EndMGTP6E, cls).setUpClass()
195 cls.create_pg_interfaces(range(2))
196 cls.pg_if_i = cls.pg_interfaces[0]
197 cls.pg_if_o = cls.pg_interfaces[1]
199 cls.pg_if_i.config_ip6()
200 cls.pg_if_o.config_ip6()
202 cls.ip6_nhop = cls.pg_if_o.remote_ip6
204 for pg_if in cls.pg_interfaces:
206 pg_if.resolve_ndp(timeout=5)
209 super(TestSRv6EndMGTP6E, cls).tearDownClass()
212 def create_packets(self, inner):
213 # 64bit prefix + 8bit QFI + 32bit TEID + 24bit
214 dst = b"\xaa" * 8 + b"\x00" + b"\xbb" * 4 + b"\x00" * 3
215 ip6_dst = IPv6Address(dst)
217 self.ip6_dst = ip6_dst
219 src = b"\xcc" * 8 + b"\xdd" * 4 + b"\x11" * 4
220 ip6_src = IPv6Address(src)
222 self.ip6_src = ip6_src
228 / IPv6(dst=str(ip6_dst), src=str(ip6_src))
229 / IPv6ExtHdrSegmentRouting(
230 segleft=1, lastentry=0, tag=0, addresses=["a1::1"]
233 / UDP(sport=1000, dport=23)
235 self.logger.info(pkt.show2(dump=True))
240 def test_srv6_mobile(self):
241 """test_srv6_mobile"""
242 pkts = self.create_packets([("A::1", "B::1"), ("C::1", "D::1")])
245 "sr localsid prefix {}/64 behavior end.m.gtp6.e fib-table 0".format(
249 self.vapi.cli("ip route add a1::/64 via {}".format(self.ip6_nhop))
250 self.logger.info(self.vapi.cli("show sr localsid"))
252 self.vapi.cli("clear errors")
254 self.pg0.add_stream(pkts)
255 self.pg_enable_capture(self.pg_interfaces)
258 self.logger.info(self.vapi.cli("show errors"))
259 self.logger.info(self.vapi.cli("show int address"))
261 capture = self.pg1.get_capture(len(pkts))
264 self.logger.info(pkt.show2(dump=True))
265 self.assertEqual(pkt[IPv6].dst, "a1::1")
266 self.assertEqual(pkt[IPv6].src, str(self.ip6_src))
267 self.assertEqual(pkt[GTP_U_Header].teid, 0xBBBBBBBB)
270 class TestSRv6EndMGTP6D(VppTestCase):
271 """SRv6 End.M.GTP6.D"""
275 super(TestSRv6EndMGTP6D, cls).setUpClass()
277 cls.create_pg_interfaces(range(2))
278 cls.pg_if_i = cls.pg_interfaces[0]
279 cls.pg_if_o = cls.pg_interfaces[1]
281 cls.pg_if_i.config_ip6()
282 cls.pg_if_o.config_ip6()
284 cls.ip6_nhop = cls.pg_if_o.remote_ip6
286 cls.ip6_dst = "2001::1"
287 cls.ip6_src = "2002::1"
289 for pg_if in cls.pg_interfaces:
291 pg_if.resolve_ndp(timeout=5)
294 super(TestSRv6EndMGTP6D, cls).tearDownClass()
297 def create_packets(self, inner):
299 ip6_dst = IPv6Address(str(self.ip6_dst))
301 ip6_src = IPv6Address(str(self.ip6_src))
303 self.logger.info("ip6 dst: {}".format(ip6_dst))
304 self.logger.info("ip6 src: {}".format(ip6_src))
310 / IPv6(dst=str(ip6_dst), src=str(ip6_src))
311 / UDP(sport=2152, dport=2152)
312 / GTP_U_Header(gtp_type="g_pdu", teid=200)
314 / UDP(sport=1000, dport=23)
316 self.logger.info(pkt.show2(dump=True))
321 def test_srv6_mobile(self):
322 """test_srv6_mobile"""
323 pkts = self.create_packets([("A::1", "B::1"), ("C::1", "D::1")])
325 self.vapi.cli("set sr encaps source addr A1::1")
326 self.vapi.cli("sr policy add bsid D4:: next D2:: next D3::")
328 "sr localsid prefix 2001::/64 behavior end.m.gtp6.d "
329 + "D4::/64 fib-table 0 drop-in"
331 self.vapi.cli("ip route add D2::/64 via {}".format(self.ip6_nhop))
333 self.logger.info(self.vapi.cli("show sr policies"))
334 self.logger.info(self.vapi.cli("show sr localsid"))
336 self.vapi.cli("clear errors")
338 self.pg0.add_stream(pkts)
339 self.pg_enable_capture(self.pg_interfaces)
342 self.logger.info(self.vapi.cli("show errors"))
343 self.logger.info(self.vapi.cli("show int address"))
345 capture = self.pg1.get_capture(len(pkts))
348 self.logger.info(pkt.show2(dump=True))
350 "GTP6.D SID0={}".format(str(pkt[IPv6ExtHdrSegmentRouting].addresses[0]))
353 "GTP6.D SID1={}".format(str(pkt[IPv6ExtHdrSegmentRouting].addresses[1]))
355 self.assertEqual(str(pkt[IPv6ExtHdrSegmentRouting].addresses[0]), "2001::1")
357 str(pkt[IPv6ExtHdrSegmentRouting].addresses[1]), "d4::c800:0"