gtpu: unit-test check udp ports 77/19777/4
authorJakub Grajciar <jgrajcia@cisco.com>
Thu, 23 May 2019 11:01:41 +0000 (13:01 +0200)
committerOle Trøan <otroan@employees.org>
Wed, 29 May 2019 18:30:18 +0000 (18:30 +0000)
Change-Id: I3efbbb1aefb43ecdf7016541b97343f9ca41f842
Signed-off-by: Jakub Grajciar <jgrajcia@cisco.com>
test/test_gtpu.py

index 199bd7d..9f3c090 100644 (file)
@@ -8,10 +8,97 @@ from template_bd import BridgeDomain
 
 from scapy.layers.l2 import Ether, Raw
 from scapy.layers.inet import IP, UDP
+from scapy.layers.inet6 import IPv6
 from scapy.contrib.gtp import GTP_U_Header
 from scapy.utils import atol
 
 
+class TestGtpuUDP(VppTestCase):
+    """ GTPU UDP ports Test Case """
+
+    def setUp(self):
+        super(TestGtpuUDP, self).tearDown()
+
+        self.dport = 2152
+
+        self.ip4_err = 0
+        self.ip6_err = 0
+
+        self.create_pg_interfaces(range(1))
+        for pg in self.pg_interfaces:
+            pg.admin_up()
+        self.pg0.config_ip4()
+        self.pg0.config_ip6()
+
+    def _check_udp_port_ip4(self, enabled=True):
+
+        pkt = (Ether(src=self.pg0.local_mac, dst=self.pg0.remote_mac) /
+               IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4) /
+               UDP(sport=self.dport, dport=self.dport, chksum=0))
+
+        self.pg0.add_stream(pkt)
+        self.pg_start()
+
+        err = self.statistics.get_counter(
+            '/err/ip4-udp-lookup/no listener for dst port')[0]
+
+        if enabled:
+            self.assertEqual(err, self.ip4_err)
+        else:
+            self.assertEqual(err, self.ip4_err + 1)
+
+        self.ip4_err = err
+
+    def _check_udp_port_ip6(self, enabled=True):
+
+        pkt = (Ether(src=self.pg0.local_mac, dst=self.pg0.remote_mac) /
+               IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6) /
+               UDP(sport=self.dport, dport=self.dport, chksum=0))
+
+        self.pg0.add_stream(pkt)
+        self.pg_start()
+
+        err = self.statistics.get_counter(
+            '/err/ip6-udp-lookup/no listener for dst port')[0]
+
+        if enabled:
+            self.assertEqual(err, self.ip6_err)
+        else:
+            self.assertEqual(err, self.ip6_err + 1)
+
+        self.ip6_err = err
+
+    def test_udp_port(self):
+        """ test UDP ports
+        Check if there are no udp listeners before gtpu is enabled
+        """
+
+        # UDP ports should be disabled unless a tunnel is configured
+        self._check_udp_port_ip4(False)
+        self._check_udp_port_ip6(False)
+
+        r = self.vapi.gtpu_add_del_tunnel(src_addr=self.pg0.local_ip4n,
+                                          dst_addr=self.pg0.remote_ip4n)
+
+        # UDP port 2152 enabled for ip4
+        self._check_udp_port_ip4()
+
+        r = self.vapi.gtpu_add_del_tunnel(is_ipv6=1,
+                                          src_addr=self.pg0.local_ip6n,
+                                          dst_addr=self.pg0.remote_ip6n)
+
+        # UDP port 2152 enabled for ip6
+        self._check_udp_port_ip6()
+
+        r = self.vapi.gtpu_add_del_tunnel(is_add=0,
+                                          src_addr=self.pg0.local_ip4n,
+                                          dst_addr=self.pg0.remote_ip4n)
+
+        r = self.vapi.gtpu_add_del_tunnel(is_add=0, is_ipv6=1,
+                                          src_addr=self.pg0.local_ip6n,
+                                          dst_addr=self.pg0.remote_ip6n)
+
+
 class TestGtpu(BridgeDomain, VppTestCase):
     """ GTPU Test Case """