srv6-mobile
[vpp.git] / src / plugins / srv6-mobile / test / test_srv6_end.py
diff --git a/src/plugins/srv6-mobile/test/test_srv6_end.py b/src/plugins/srv6-mobile/test/test_srv6_end.py
new file mode 100644 (file)
index 0000000..4e2a576
--- /dev/null
@@ -0,0 +1,161 @@
+#!/usr/bin/env python
+
+from framework import VppTestCase
+from ipaddress import IPv4Address
+from ipaddress import IPv6Address
+from scapy.contrib.gtp import *
+from scapy.all import *
+
+
+class TestSRv6EndMGTP4E(VppTestCase):
+    """ SRv6 End.M.GTP4.E (SRv6 -> GTP-U) """
+
+    @classmethod
+    def setUpClass(cls):
+        super(TestSRv6EndMGTP4E, cls).setUpClass()
+        try:
+            cls.create_pg_interfaces(range(2))
+            cls.pg_if_i = cls.pg_interfaces[0]
+            cls.pg_if_o = cls.pg_interfaces[1]
+
+            cls.pg_if_i.config_ip6()
+            cls.pg_if_o.config_ip4()
+
+            cls.ip4_dst = cls.pg_if_o.remote_ip4
+            # cls.ip4_src = cls.pg_if_o.local_ip4
+            cls.ip4_src = "192.168.192.10"
+
+            for pg_if in cls.pg_interfaces:
+                pg_if.admin_up()
+                pg_if.resolve_arp()
+
+        except Exception:
+            super(TestSRv6EndMGTP4E, cls).tearDownClass()
+            raise
+
+    def create_packets(self, inner):
+
+        ip4_dst = IPv4Address(str(self.ip4_dst))
+        # 32bit prefix + 32bit IPv4 DA + 8bit + 32bit TEID + 24bit
+        dst = b'\xaa' * 4 + ip4_dst.packed + \
+            b'\x11' + b'\xbb' * 4 + b'\x11' * 3
+        ip6_dst = IPv6Address(dst)
+
+        ip4_src = IPv4Address(str(self.ip4_src))
+        # 64bit prefix + 32bit IPv4 SA + 16 bit port + 16bit
+        src = b'\xcc' * 8 + ip4_src.packed + \
+            b'\xdd' * 2 + b'\x11' * 2
+        ip6_src = IPv6Address(src)
+
+        self.logger.info("ip4 dst: {}".format(ip4_dst))
+        self.logger.info("ip4 src: {}".format(ip4_src))
+        self.logger.info("ip6 dst (remote srgw): {}".format(ip6_dst))
+        self.logger.info("ip6 src (local  srgw): {}".format(ip6_src))
+
+        pkts = list()
+        for d, s in inner:
+            pkt = (Ether() /
+                   IPv6(dst=str(ip6_dst), src=str(ip6_src)) /
+                   IPv6ExtHdrSegmentRouting() /
+                   IPv6(dst=d, src=s) /
+                   UDP(sport=1000, dport=23))
+
+            self.logger.info(pkt.show2())
+            pkts.append(pkt)
+
+        return pkts
+
+    def test_srv6_end(self):
+        """ test_srv6_end """
+        pkts = self.create_packets([("A::1", "B::1"), ("C::1", "D::1")])
+
+        self.vapi.cli(
+            "sr localsid address {} behavior end.m.gtp4.e v4src_position 64"
+            .format(pkts[0]['IPv6'].dst))
+        self.logger.info(self.vapi.cli("show sr localsids"))
+
+        self.vapi.cli("clear errors")
+
+        self.pg0.add_stream(pkts)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+
+        self.logger.info(self.vapi.cli("show errors"))
+        self.logger.info(self.vapi.cli("show int address"))
+
+        capture = self.pg1.get_capture(len(pkts))
+
+        for pkt in capture:
+            self.logger.info(pkt.show2())
+            self.assertEqual(pkt[IP].dst, self.ip4_dst)
+            self.assertEqual(pkt[IP].src, self.ip4_src)
+            self.assertEqual(pkt[GTP_U_Header].teid, 0xbbbbbbbb)
+
+
+class TestSRv6TMTmap(VppTestCase):
+    """ SRv6 T.M.Tmap (GTP-U -> SRv6) """
+
+    @classmethod
+    def setUpClass(cls):
+        super(TestSRv6TMTmap, cls).setUpClass()
+        try:
+            cls.create_pg_interfaces(range(2))
+            cls.pg_if_i = cls.pg_interfaces[0]
+            cls.pg_if_o = cls.pg_interfaces[1]
+
+            cls.pg_if_i.config_ip4()
+            cls.pg_if_o.config_ip6()
+
+            for pg_if in cls.pg_interfaces:
+                pg_if.admin_up()
+                pg_if.resolve_arp()
+
+        except Exception:
+            super(TestSRv6TMTmap, cls).tearDownClass()
+            raise
+
+
+class TestSRv6EndMGTP6E(VppTestCase):
+    """ SRv6 End.M.GTP6.E """
+
+    @classmethod
+    def setUpClass(cls):
+        super(TestSRv6EndMGTP6E, cls).setUpClass()
+        try:
+            cls.create_pg_interfaces(range(2))
+            cls.pg_if_i = cls.pg_interfaces[0]
+            cls.pg_if_o = cls.pg_interfaces[1]
+
+            cls.pg_if_i.config_ip4()
+            cls.pg_if_o.config_ip6()
+
+            for pg_if in cls.pg_interfaces:
+                pg_if.admin_up()
+                pg_if.resolve_arp()
+
+        except Exception:
+            super(TestSRv6EndMGTP6E, cls).tearDownClass()
+            raise
+
+
+class TestSRv6EndMGTP6D(VppTestCase):
+    """ SRv6 End.M.GTP6.D """
+
+    @classmethod
+    def setUpClass(cls):
+        super(TestSRv6EndMGTP6D, cls).setUpClass()
+        try:
+            cls.create_pg_interfaces(range(2))
+            cls.pg_if_i = cls.pg_interfaces[0]
+            cls.pg_if_o = cls.pg_interfaces[1]
+
+            cls.pg_if_i.config_ip4()
+            cls.pg_if_o.config_ip6()
+
+            for pg_if in cls.pg_interfaces:
+                pg_if.admin_up()
+                pg_if.resolve_arp()
+
+        except Exception:
+            super(TestSRv6EndMGTP6D, cls).tearDownClass()
+            raise