from framework import VppTestCase
from ipaddress import IPv4Address
from ipaddress import IPv6Address
+from vpp_ip_route import VppIpRoute, VppRoutePath, FibPathProto, VppIpTable
+
+from vpp_srv6_mobile import (
+ SRv6MobileNhtype,
+ VppSRv6MobilePolicy,
+ VppSRv6MobileLocalSID,
+)
+
from scapy.contrib.gtp import *
from scapy.all import *
class TestSRv6EndMGTP4E(VppTestCase):
- """ SRv6 End.M.GTP4.E (SRv6 -> GTP-U) """
+ """SRv6 End.M.GTP4.E (SRv6 -> GTP-U)"""
@classmethod
def setUpClass(cls):
raise
def create_packets(self, inner):
-
ip4_dst = IPv4Address(str(self.ip4_dst))
# 32bit prefix + 32bit IPv4 DA + 8bit + 32bit TEID + 24bit
- dst = b'\xaa' * 4 + ip4_dst.packed + \
- b'\x11' + b'\xbb' * 4 + b'\x11' * 3
+ dst = b"\xaa" * 4 + ip4_dst.packed + b"\x11" + b"\xbb" * 4 + b"\x11" * 3
ip6_dst = IPv6Address(dst)
ip4_src = IPv4Address(str(self.ip4_src))
# 64bit prefix + 32bit IPv4 SA + 16 bit port + 16bit
- src = b'\xcc' * 8 + ip4_src.packed + \
- b'\xdd' * 2 + b'\x11' * 2
+ src = b"\xcc" * 8 + ip4_src.packed + b"\xdd" * 2 + b"\x11" * 2
ip6_src = IPv6Address(src)
self.logger.info("ip4 dst: {}".format(ip4_dst))
pkts = list()
for d, s in inner:
- pkt = (Ether() /
- IPv6(dst=str(ip6_dst), src=str(ip6_src)) /
- IPv6ExtHdrSegmentRouting() /
- IPv6(dst=d, src=s) /
- UDP(sport=1000, dport=23))
+ pkt = (
+ Ether()
+ / IPv6(dst=str(ip6_dst), src=str(ip6_src))
+ / IPv6ExtHdrSegmentRouting()
+ / IPv6(dst=d, src=s)
+ / UDP(sport=1000, dport=23)
+ )
self.logger.info(pkt.show2(dump=True))
pkts.append(pkt)
return pkts
def test_srv6_mobile(self):
- """ test_srv6_mobile """
+ """test_srv6_mobile"""
pkts = self.create_packets([("A::1", "B::1"), ("C::1", "D::1")])
- self.vapi.cli(
- "sr localsid address {} behavior end.m.gtp4.e v4src_position 64"
- .format(pkts[0]['IPv6'].dst))
- self.logger.info(self.vapi.cli("show sr localsids"))
+ # "sr localsid address {} behavior end.m.gtp4.e v4src_position 64 fib-table 0"
+ # ".format(pkts[0]["IPv6"].dst)
+ localsid = VppSRv6MobileLocalSID(
+ self,
+ # address params case is length 0
+ localsid_prefix="{}/{}".format(pkts[0]["IPv6"].dst, 0),
+ behavior="end.m.gtp4.e",
+ v4src_position=64,
+ fib_table=0,
+ )
+ localsid.add_vpp_config()
+
+ # log the localsids
+ self.logger.info(self.vapi.cli("show sr localsid"))
self.vapi.cli("clear errors")
self.logger.info(pkt.show2(dump=True))
self.assertEqual(pkt[IP].dst, self.ip4_dst)
self.assertEqual(pkt[IP].src, self.ip4_src)
- self.assertEqual(pkt[GTP_U_Header].teid, 0xbbbbbbbb)
+ self.assertEqual(pkt[GTP_U_Header].teid, 0xBBBBBBBB)
class TestSRv6TMGTP4D(VppTestCase):
- """ SRv6 T.M.GTP4.D (GTP-U -> SRv6) """
+ """SRv6 T.M.GTP4.D (GTP-U -> SRv6)"""
@classmethod
def setUpClass(cls):
raise
def create_packets(self, inner):
-
ip4_dst = IPv4Address(str(self.ip4_dst))
ip4_src = IPv4Address(str(self.ip4_src))
pkts = list()
for d, s in inner:
- pkt = (Ether() /
- IP(dst=str(ip4_dst), src=str(ip4_src)) /
- UDP(sport=2152, dport=2152) /
- GTP_U_Header(gtp_type="g_pdu", teid=200) /
- IPv6(dst=d, src=s) /
- UDP(sport=1000, dport=23))
+ pkt = (
+ Ether()
+ / IP(dst=str(ip4_dst), src=str(ip4_src))
+ / UDP(sport=2152, dport=2152)
+ / GTP_U_Header(gtp_type="g_pdu", teid=200)
+ / IPv6(dst=d, src=s)
+ / UDP(sport=1000, dport=23)
+ )
self.logger.info(pkt.show2(dump=True))
pkts.append(pkt)
return pkts
def test_srv6_mobile(self):
- """ test_srv6_mobile """
+ """test_srv6_mobile"""
pkts = self.create_packets([("A::1", "B::1"), ("C::1", "D::1")])
self.vapi.cli("set sr encaps source addr A1::1")
self.vapi.cli("sr policy add bsid D4:: next D2:: next D3::")
- self.vapi.cli(
- "sr policy add bsid D5:: behavior t.m.gtp4.d"
- "D4::/32 v6src_prefix C1::/64 nhtype ipv6")
+
+ # sr policy add bsid D5:: behavior t.m.gtp4.d D4::/32 v6src_prefix C1::/64 nhtype ipv6 fib-table 0 drop-in
+ policy = VppSRv6MobilePolicy(
+ self,
+ bsid_addr="D5::",
+ behavior="t.m.gtp4.d",
+ sr_prefix="{}/{}".format("D4::", 32),
+ v6src_prefix="{}/{}".format("C1::", 64),
+ nhtype=SRv6MobileNhtype.SRV6_NHTYPE_API_IPV6,
+ fib_table=0,
+ drop_in=1,
+ )
+ policy.add_vpp_config()
+
self.vapi.cli("sr steer l3 {}/32 via bsid D5::".format(self.ip4_dst))
- self.vapi.cli("ip route add D2::/32 via {}".format(self.ip6_dst))
+ # "ip route add D2::/32 via {}".format(self.ip6_dst)
+ route = VppIpRoute(
+ self, "D2::", 32, [VppRoutePath(self.ip6_dst, self.pg1.sw_if_index)]
+ )
+ route.add_vpp_config()
+ self.logger.info(self.vapi.cli("show ip6 fib"))
self.logger.info(self.vapi.cli("show sr steer"))
self.logger.info(self.vapi.cli("show sr policies"))
for pkt in capture:
self.logger.info(pkt.show2(dump=True))
- self.logger.info("GTP4.D Address={}".format(
- str(pkt[IPv6ExtHdrSegmentRouting].addresses[0])))
+ self.logger.info(
+ "GTP4.D Address={}".format(
+ str(pkt[IPv6ExtHdrSegmentRouting].addresses[0])
+ )
+ )
self.assertEqual(
- str(pkt[IPv6ExtHdrSegmentRouting].addresses[0]),
- "d4:0:101:101::c800:0")
+ str(pkt[IPv6ExtHdrSegmentRouting].addresses[0]), "d4:0:101:101::c800:0"
+ )
class TestSRv6EndMGTP6E(VppTestCase):
- """ SRv6 End.M.GTP6.E """
+ """SRv6 End.M.GTP6.E"""
@classmethod
def setUpClass(cls):
def create_packets(self, inner):
# 64bit prefix + 8bit QFI + 32bit TEID + 24bit
- dst = b'\xaa' * 8 + b'\x00' + \
- b'\xbb' * 4 + b'\x00' * 3
+ dst = b"\xaa" * 8 + b"\x00" + b"\xbb" * 4 + b"\x00" * 3
ip6_dst = IPv6Address(dst)
self.ip6_dst = ip6_dst
- src = b'\xcc' * 8 + \
- b'\xdd' * 4 + b'\x11' * 4
+ src = b"\xcc" * 8 + b"\xdd" * 4 + b"\x11" * 4
ip6_src = IPv6Address(src)
self.ip6_src = ip6_src
pkts = list()
for d, s in inner:
- pkt = (Ether() /
- IPv6(dst=str(ip6_dst),
- src=str(ip6_src)) /
- IPv6ExtHdrSegmentRouting(segleft=1,
- lastentry=0,
- tag=0,
- addresses=["a1::1"]) /
- IPv6(dst=d, src=s) / UDP(sport=1000, dport=23))
+ pkt = (
+ Ether()
+ / IPv6(dst=str(ip6_dst), src=str(ip6_src))
+ / IPv6ExtHdrSegmentRouting(
+ segleft=1, lastentry=0, tag=0, addresses=["a1::1"]
+ )
+ / IPv6(dst=d, src=s)
+ / UDP(sport=1000, dport=23)
+ )
self.logger.info(pkt.show2(dump=True))
pkts.append(pkt)
return pkts
def test_srv6_mobile(self):
- """ test_srv6_mobile """
+ """test_srv6_mobile"""
pkts = self.create_packets([("A::1", "B::1"), ("C::1", "D::1")])
- self.vapi.cli(
- "sr localsid prefix {}/64 behavior end.m.gtp6.e"
- .format(pkts[0]['IPv6'].dst))
- self.vapi.cli(
- "ip route add a1::/64 via {}".format(self.ip6_nhop))
- self.logger.info(self.vapi.cli("show sr localsids"))
+ # "sr localsid prefix {}/64 behavior end.m.gtp6.e fib-table 0"
+ # .format(pkts[0]["IPv6"].dst)
+ localsid = VppSRv6MobileLocalSID(
+ self,
+ localsid_prefix="{}/{}".format(pkts[0]["IPv6"].dst, 64),
+ behavior="end.m.gtp6.e",
+ fib_table=0,
+ )
+ localsid.add_vpp_config()
+
+ # "ip route add a1::/64 via {}".format(self.ip6_nhop)
+ route = VppIpRoute(
+ self, "a1::", 64, [VppRoutePath(self.ip6_nhop, self.pg1.sw_if_index)]
+ )
+ route.add_vpp_config()
+ self.logger.info(self.vapi.cli("show sr localsid"))
self.vapi.cli("clear errors")
self.logger.info(pkt.show2(dump=True))
self.assertEqual(pkt[IPv6].dst, "a1::1")
self.assertEqual(pkt[IPv6].src, str(self.ip6_src))
- self.assertEqual(pkt[GTP_U_Header].teid, 0xbbbbbbbb)
+ self.assertEqual(pkt[GTP_U_Header].teid, 0xBBBBBBBB)
class TestSRv6EndMGTP6D(VppTestCase):
- """ SRv6 End.M.GTP6.D """
+ """SRv6 End.M.GTP6.D"""
@classmethod
def setUpClass(cls):
raise
def create_packets(self, inner):
-
ip6_dst = IPv6Address(str(self.ip6_dst))
ip6_src = IPv6Address(str(self.ip6_src))
pkts = list()
for d, s in inner:
- pkt = (Ether() /
- IPv6(dst=str(ip6_dst), src=str(ip6_src)) /
- UDP(sport=2152, dport=2152) /
- GTP_U_Header(gtp_type="g_pdu", teid=200) /
- IPv6(dst=d, src=s) /
- UDP(sport=1000, dport=23))
+ pkt = (
+ Ether()
+ / IPv6(dst=str(ip6_dst), src=str(ip6_src))
+ / UDP(sport=2152, dport=2152)
+ / GTP_U_Header(gtp_type="g_pdu", teid=200)
+ / IPv6(dst=d, src=s)
+ / UDP(sport=1000, dport=23)
+ )
self.logger.info(pkt.show2(dump=True))
pkts.append(pkt)
return pkts
def test_srv6_mobile(self):
- """ test_srv6_mobile """
+ """test_srv6_mobile"""
pkts = self.create_packets([("A::1", "B::1"), ("C::1", "D::1")])
self.vapi.cli("set sr encaps source addr A1::1")
self.vapi.cli("sr policy add bsid D4:: next D2:: next D3::")
- self.vapi.cli(
- "sr localsid prefix 2001::/64 behavior end.m.gtp6.d D4::/64")
- self.vapi.cli("ip route add D2::/64 via {}".format(self.ip6_nhop))
+ # "sr localsid prefix 2001::/64 behavior end.m.gtp6.d 4::/64 fib-table 0 drop-in"
+ # .format(self.ip6_nhop)
+ localsid = VppSRv6MobileLocalSID(
+ self,
+ localsid_prefix="{}/{}".format("2001::", 64),
+ behavior="end.m.gtp6.d",
+ fib_table=0,
+ drop_in=1,
+ sr_prefix="{}/{}".format("D4::", 64),
+ )
+ localsid.add_vpp_config()
+
+ # "ip route add D2::/64 via {}"
+ # .format(self.ip6_nhop))
+ route = VppIpRoute(
+ self, "D2::", 64, [VppRoutePath(self.ip6_nhop, self.pg1.sw_if_index)]
+ )
+ route.add_vpp_config()
self.logger.info(self.vapi.cli("show sr policies"))
+ self.logger.info(self.vapi.cli("show sr localsid"))
self.vapi.cli("clear errors")
for pkt in capture:
self.logger.info(pkt.show2(dump=True))
- self.logger.info("GTP6.D Address={}".format(
- str(pkt[IPv6ExtHdrSegmentRouting].addresses[0])))
+ self.logger.info(
+ "GTP6.D SID0={}".format(str(pkt[IPv6ExtHdrSegmentRouting].addresses[0]))
+ )
+ self.logger.info(
+ "GTP6.D SID1={}".format(str(pkt[IPv6ExtHdrSegmentRouting].addresses[1]))
+ )
+ self.assertEqual(str(pkt[IPv6ExtHdrSegmentRouting].addresses[0]), "2001::1")
self.assertEqual(
- str(pkt[IPv6ExtHdrSegmentRouting].addresses[0]), "d4::c800:0")
+ str(pkt[IPv6ExtHdrSegmentRouting].addresses[1]), "d4::c800:0"
+ )