3 from framework import VppTestCase
4 from ipaddress import IPv4Address
5 from ipaddress import IPv6Address
6 from vpp_ip_route import VppIpRoute, VppRoutePath, FibPathProto, VppIpTable
8 from vpp_srv6_mobile import (
11 VppSRv6MobileLocalSID,
14 from scapy.contrib.gtp import *
15 from scapy.all import *
18 class TestSRv6EndMGTP4E(VppTestCase):
19 """SRv6 End.M.GTP4.E (SRv6 -> GTP-U)"""
23 super(TestSRv6EndMGTP4E, cls).setUpClass()
25 cls.create_pg_interfaces(range(2))
26 cls.pg_if_i = cls.pg_interfaces[0]
27 cls.pg_if_o = cls.pg_interfaces[1]
29 cls.pg_if_i.config_ip6()
30 cls.pg_if_o.config_ip4()
32 cls.ip4_dst = cls.pg_if_o.remote_ip4
33 # cls.ip4_src = cls.pg_if_o.local_ip4
34 cls.ip4_src = "192.168.192.10"
36 for pg_if in cls.pg_interfaces:
41 super(TestSRv6EndMGTP4E, cls).tearDownClass()
44 def create_packets(self, inner):
45 ip4_dst = IPv4Address(str(self.ip4_dst))
46 # 32bit prefix + 32bit IPv4 DA + 8bit + 32bit TEID + 24bit
47 dst = b"\xaa" * 4 + ip4_dst.packed + b"\x11" + b"\xbb" * 4 + b"\x11" * 3
48 ip6_dst = IPv6Address(dst)
50 ip4_src = IPv4Address(str(self.ip4_src))
51 # 64bit prefix + 32bit IPv4 SA + 16 bit port + 16bit
52 src = b"\xcc" * 8 + ip4_src.packed + b"\xdd" * 2 + b"\x11" * 2
53 ip6_src = IPv6Address(src)
55 self.logger.info("ip4 dst: {}".format(ip4_dst))
56 self.logger.info("ip4 src: {}".format(ip4_src))
57 self.logger.info("ip6 dst (remote srgw): {}".format(ip6_dst))
58 self.logger.info("ip6 src (local srgw): {}".format(ip6_src))
64 / IPv6(dst=str(ip6_dst), src=str(ip6_src))
65 / IPv6ExtHdrSegmentRouting()
67 / UDP(sport=1000, dport=23)
69 self.logger.info(pkt.show2(dump=True))
74 def test_srv6_mobile(self):
75 """test_srv6_mobile"""
76 pkts = self.create_packets([("A::1", "B::1"), ("C::1", "D::1")])
78 # "sr localsid address {} behavior end.m.gtp4.e v4src_position 64 fib-table 0"
79 # ".format(pkts[0]["IPv6"].dst)
80 localsid = VppSRv6MobileLocalSID(
82 # address params case is length 0
83 localsid_prefix="{}/{}".format(pkts[0]["IPv6"].dst, 0),
84 behavior="end.m.gtp4.e",
88 localsid.add_vpp_config()
91 self.logger.info(self.vapi.cli("show sr localsid"))
93 self.vapi.cli("clear errors")
95 self.pg0.add_stream(pkts)
96 self.pg_enable_capture(self.pg_interfaces)
99 self.logger.info(self.vapi.cli("show errors"))
100 self.logger.info(self.vapi.cli("show int address"))
102 capture = self.pg1.get_capture(len(pkts))
105 self.logger.info(pkt.show2(dump=True))
106 self.assertEqual(pkt[IP].dst, self.ip4_dst)
107 self.assertEqual(pkt[IP].src, self.ip4_src)
108 self.assertEqual(pkt[GTP_U_Header].teid, 0xBBBBBBBB)
111 class TestSRv6TMGTP4D(VppTestCase):
112 """SRv6 T.M.GTP4.D (GTP-U -> SRv6)"""
116 super(TestSRv6TMGTP4D, cls).setUpClass()
118 cls.create_pg_interfaces(range(2))
119 cls.pg_if_i = cls.pg_interfaces[0]
120 cls.pg_if_o = cls.pg_interfaces[1]
122 cls.pg_if_i.config_ip4()
123 cls.pg_if_i.config_ip6()
124 cls.pg_if_o.config_ip4()
125 cls.pg_if_o.config_ip6()
127 cls.ip4_dst = "1.1.1.1"
128 cls.ip4_src = "2.2.2.2"
130 cls.ip6_dst = cls.pg_if_o.remote_ip6
132 for pg_if in cls.pg_interfaces:
135 pg_if.resolve_ndp(timeout=5)
138 super(TestSRv6TMGTP4D, cls).tearDownClass()
141 def create_packets(self, inner):
142 ip4_dst = IPv4Address(str(self.ip4_dst))
144 ip4_src = IPv4Address(str(self.ip4_src))
146 self.logger.info("ip4 dst: {}".format(ip4_dst))
147 self.logger.info("ip4 src: {}".format(ip4_src))
153 / IP(dst=str(ip4_dst), src=str(ip4_src))
154 / UDP(sport=2152, dport=2152)
155 / GTP_U_Header(gtp_type="g_pdu", teid=200)
157 / UDP(sport=1000, dport=23)
159 self.logger.info(pkt.show2(dump=True))
164 def test_srv6_mobile(self):
165 """test_srv6_mobile"""
166 pkts = self.create_packets([("A::1", "B::1"), ("C::1", "D::1")])
168 self.vapi.cli("set sr encaps source addr A1::1")
169 self.vapi.cli("sr policy add bsid D4:: next D2:: next D3::")
171 # sr policy add bsid D5:: behavior t.m.gtp4.d D4::/32 v6src_prefix C1::/64 nhtype ipv6 fib-table 0 drop-in
172 policy = VppSRv6MobilePolicy(
175 behavior="t.m.gtp4.d",
176 sr_prefix="{}/{}".format("D4::", 32),
177 v6src_prefix="{}/{}".format("C1::", 64),
178 nhtype=SRv6MobileNhtype.SRV6_NHTYPE_API_IPV6,
182 policy.add_vpp_config()
184 self.vapi.cli("sr steer l3 {}/32 via bsid D5::".format(self.ip4_dst))
186 # "ip route add D2::/32 via {}".format(self.ip6_dst)
188 self, "D2::", 32, [VppRoutePath(self.ip6_dst, self.pg1.sw_if_index)]
190 route.add_vpp_config()
191 self.logger.info(self.vapi.cli("show ip6 fib"))
192 self.logger.info(self.vapi.cli("show sr steer"))
193 self.logger.info(self.vapi.cli("show sr policies"))
195 self.vapi.cli("clear errors")
197 self.pg0.add_stream(pkts)
198 self.pg_enable_capture(self.pg_interfaces)
201 self.logger.info(self.vapi.cli("show errors"))
202 self.logger.info(self.vapi.cli("show int address"))
204 capture = self.pg1.get_capture(len(pkts))
207 self.logger.info(pkt.show2(dump=True))
209 "GTP4.D Address={}".format(
210 str(pkt[IPv6ExtHdrSegmentRouting].addresses[0])
214 str(pkt[IPv6ExtHdrSegmentRouting].addresses[0]), "d4:0:101:101::c800:0"
218 class TestSRv6EndMGTP6E(VppTestCase):
219 """SRv6 End.M.GTP6.E"""
223 super(TestSRv6EndMGTP6E, cls).setUpClass()
225 cls.create_pg_interfaces(range(2))
226 cls.pg_if_i = cls.pg_interfaces[0]
227 cls.pg_if_o = cls.pg_interfaces[1]
229 cls.pg_if_i.config_ip6()
230 cls.pg_if_o.config_ip6()
232 cls.ip6_nhop = cls.pg_if_o.remote_ip6
234 for pg_if in cls.pg_interfaces:
236 pg_if.resolve_ndp(timeout=5)
239 super(TestSRv6EndMGTP6E, cls).tearDownClass()
242 def create_packets(self, inner):
243 # 64bit prefix + 8bit QFI + 32bit TEID + 24bit
244 dst = b"\xaa" * 8 + b"\x00" + b"\xbb" * 4 + b"\x00" * 3
245 ip6_dst = IPv6Address(dst)
247 self.ip6_dst = ip6_dst
249 src = b"\xcc" * 8 + b"\xdd" * 4 + b"\x11" * 4
250 ip6_src = IPv6Address(src)
252 self.ip6_src = ip6_src
258 / IPv6(dst=str(ip6_dst), src=str(ip6_src))
259 / IPv6ExtHdrSegmentRouting(
260 segleft=1, lastentry=0, tag=0, addresses=["a1::1"]
263 / UDP(sport=1000, dport=23)
265 self.logger.info(pkt.show2(dump=True))
270 def test_srv6_mobile(self):
271 """test_srv6_mobile"""
272 pkts = self.create_packets([("A::1", "B::1"), ("C::1", "D::1")])
274 # "sr localsid prefix {}/64 behavior end.m.gtp6.e fib-table 0"
275 # .format(pkts[0]["IPv6"].dst)
276 localsid = VppSRv6MobileLocalSID(
278 localsid_prefix="{}/{}".format(pkts[0]["IPv6"].dst, 64),
279 behavior="end.m.gtp6.e",
282 localsid.add_vpp_config()
284 # "ip route add a1::/64 via {}".format(self.ip6_nhop)
286 self, "a1::", 64, [VppRoutePath(self.ip6_nhop, self.pg1.sw_if_index)]
288 route.add_vpp_config()
289 self.logger.info(self.vapi.cli("show sr localsid"))
291 self.vapi.cli("clear errors")
293 self.pg0.add_stream(pkts)
294 self.pg_enable_capture(self.pg_interfaces)
297 self.logger.info(self.vapi.cli("show errors"))
298 self.logger.info(self.vapi.cli("show int address"))
300 capture = self.pg1.get_capture(len(pkts))
303 self.logger.info(pkt.show2(dump=True))
304 self.assertEqual(pkt[IPv6].dst, "a1::1")
305 self.assertEqual(pkt[IPv6].src, str(self.ip6_src))
306 self.assertEqual(pkt[GTP_U_Header].teid, 0xBBBBBBBB)
309 class TestSRv6EndMGTP6D(VppTestCase):
310 """SRv6 End.M.GTP6.D"""
314 super(TestSRv6EndMGTP6D, cls).setUpClass()
316 cls.create_pg_interfaces(range(2))
317 cls.pg_if_i = cls.pg_interfaces[0]
318 cls.pg_if_o = cls.pg_interfaces[1]
320 cls.pg_if_i.config_ip6()
321 cls.pg_if_o.config_ip6()
323 cls.ip6_nhop = cls.pg_if_o.remote_ip6
325 cls.ip6_dst = "2001::1"
326 cls.ip6_src = "2002::1"
328 for pg_if in cls.pg_interfaces:
330 pg_if.resolve_ndp(timeout=5)
333 super(TestSRv6EndMGTP6D, cls).tearDownClass()
336 def create_packets(self, inner):
337 ip6_dst = IPv6Address(str(self.ip6_dst))
339 ip6_src = IPv6Address(str(self.ip6_src))
341 self.logger.info("ip6 dst: {}".format(ip6_dst))
342 self.logger.info("ip6 src: {}".format(ip6_src))
348 / IPv6(dst=str(ip6_dst), src=str(ip6_src))
349 / UDP(sport=2152, dport=2152)
350 / GTP_U_Header(gtp_type="g_pdu", teid=200)
352 / UDP(sport=1000, dport=23)
354 self.logger.info(pkt.show2(dump=True))
359 def test_srv6_mobile(self):
360 """test_srv6_mobile"""
361 pkts = self.create_packets([("A::1", "B::1"), ("C::1", "D::1")])
363 self.vapi.cli("set sr encaps source addr A1::1")
364 self.vapi.cli("sr policy add bsid D4:: next D2:: next D3::")
366 # "sr localsid prefix 2001::/64 behavior end.m.gtp6.d 4::/64 fib-table 0 drop-in"
367 # .format(self.ip6_nhop)
368 localsid = VppSRv6MobileLocalSID(
370 localsid_prefix="{}/{}".format("2001::", 64),
371 behavior="end.m.gtp6.d",
374 sr_prefix="{}/{}".format("D4::", 64),
376 localsid.add_vpp_config()
378 # "ip route add D2::/64 via {}"
379 # .format(self.ip6_nhop))
381 self, "D2::", 64, [VppRoutePath(self.ip6_nhop, self.pg1.sw_if_index)]
383 route.add_vpp_config()
384 self.logger.info(self.vapi.cli("show sr policies"))
385 self.logger.info(self.vapi.cli("show sr localsid"))
387 self.vapi.cli("clear errors")
389 self.pg0.add_stream(pkts)
390 self.pg_enable_capture(self.pg_interfaces)
393 self.logger.info(self.vapi.cli("show errors"))
394 self.logger.info(self.vapi.cli("show int address"))
396 capture = self.pg1.get_capture(len(pkts))
399 self.logger.info(pkt.show2(dump=True))
401 "GTP6.D SID0={}".format(str(pkt[IPv6ExtHdrSegmentRouting].addresses[0]))
404 "GTP6.D SID1={}".format(str(pkt[IPv6ExtHdrSegmentRouting].addresses[1]))
406 self.assertEqual(str(pkt[IPv6ExtHdrSegmentRouting].addresses[0]), "2001::1")
408 str(pkt[IPv6ExtHdrSegmentRouting].addresses[1]), "d4::c800:0"