tests: move plugin tests to src/plugins/*/test
[vpp.git] / src / plugins / abf / test / test_abf.py
diff --git a/src/plugins/abf/test/test_abf.py b/src/plugins/abf/test/test_abf.py
new file mode 100644 (file)
index 0000000..221a793
--- /dev/null
@@ -0,0 +1,344 @@
+#!/usr/bin/env python
+
+from socket import inet_pton, inet_ntop, AF_INET, AF_INET6
+import unittest
+
+from framework import VppTestCase, VppTestRunner
+from vpp_ip import DpoProto
+from vpp_ip_route import VppIpRoute, VppRoutePath, VppMplsLabel, \
+    VppIpTable, FibPathProto
+
+from scapy.packet import Raw
+from scapy.layers.l2 import Ether
+from scapy.layers.inet import IP, UDP
+from scapy.layers.inet6 import IPv6
+
+from vpp_object import VppObject
+
+NUM_PKTS = 67
+
+
+def find_abf_policy(test, id):
+    policies = test.vapi.abf_policy_dump()
+    for p in policies:
+        if id == p.policy.policy_id:
+            return True
+    return False
+
+
+def find_abf_itf_attach(test, id, sw_if_index):
+    attachs = test.vapi.abf_itf_attach_dump()
+    for a in attachs:
+        if id == a.attach.policy_id and \
+           sw_if_index == a.attach.sw_if_index:
+            return True
+    return False
+
+
+class VppAbfPolicy(VppObject):
+
+    def __init__(self,
+                 test,
+                 policy_id,
+                 acl,
+                 paths):
+        self._test = test
+        self.policy_id = policy_id
+        self.acl = acl
+        self.paths = paths
+        self.encoded_paths = []
+        for path in self.paths:
+            self.encoded_paths.append(path.encode())
+
+    def add_vpp_config(self):
+        self._test.vapi.abf_policy_add_del(
+            1,
+            {'policy_id': self.policy_id,
+             'acl_index': self.acl.acl_index,
+             'n_paths': len(self.paths),
+             'paths': self.encoded_paths})
+        self._test.registry.register(self, self._test.logger)
+
+    def remove_vpp_config(self):
+        self._test.vapi.abf_policy_add_del(
+            0,
+            {'policy_id': self.policy_id,
+             'acl_index': self.acl.acl_index,
+             'n_paths': len(self.paths),
+             'paths': self.encoded_paths})
+
+    def query_vpp_config(self):
+        return find_abf_policy(self._test, self.policy_id)
+
+    def object_id(self):
+        return ("abf-policy-%d" % self.policy_id)
+
+
+class VppAbfAttach(VppObject):
+
+    def __init__(self,
+                 test,
+                 policy_id,
+                 sw_if_index,
+                 priority,
+                 is_ipv6=0):
+        self._test = test
+        self.policy_id = policy_id
+        self.sw_if_index = sw_if_index
+        self.priority = priority
+        self.is_ipv6 = is_ipv6
+
+    def add_vpp_config(self):
+        self._test.vapi.abf_itf_attach_add_del(
+            1,
+            {'policy_id': self.policy_id,
+             'sw_if_index': self.sw_if_index,
+             'priority': self.priority,
+             'is_ipv6': self.is_ipv6})
+        self._test.registry.register(self, self._test.logger)
+
+    def remove_vpp_config(self):
+        self._test.vapi.abf_itf_attach_add_del(
+            0,
+            {'policy_id': self.policy_id,
+             'sw_if_index': self.sw_if_index,
+             'priority': self.priority,
+             'is_ipv6': self.is_ipv6})
+
+    def query_vpp_config(self):
+        return find_abf_itf_attach(self._test,
+                                   self.policy_id,
+                                   self.sw_if_index)
+
+    def object_id(self):
+        return ("abf-attach-%d-%d" % (self.policy_id, self.sw_if_index))
+
+
+class TestAbf(VppTestCase):
+    """ ABF Test Case """
+
+    @classmethod
+    def setUpClass(cls):
+        super(TestAbf, cls).setUpClass()
+
+    @classmethod
+    def tearDownClass(cls):
+        super(TestAbf, cls).tearDownClass()
+
+    def setUp(self):
+        super(TestAbf, self).setUp()
+
+        self.create_pg_interfaces(range(5))
+
+        for i in self.pg_interfaces[:4]:
+            i.admin_up()
+            i.config_ip4()
+            i.resolve_arp()
+            i.config_ip6()
+            i.resolve_ndp()
+
+    def tearDown(self):
+        for i in self.pg_interfaces:
+            i.unconfig_ip4()
+            i.unconfig_ip6()
+            i.ip6_disable()
+            i.admin_down()
+        super(TestAbf, self).tearDown()
+
+    def test_abf4(self):
+        """ IPv4 ACL Based Forwarding
+        """
+
+        #
+        # We are not testing the various matching capabilities
+        # of ACLs, that's done elsewhere. Here ware are testing
+        # the application of ACLs to a forwarding path to achieve
+        # ABF
+        # So we construct just a few ACLs to ensure the ABF policies
+        # are correctly constructed and used. And a few path types
+        # to test the API path decoding.
+        #
+
+        #
+        # Rule 1
+        #
+        rule_1 = ({'is_permit': 1,
+                   'is_ipv6': 0,
+                   'proto': 17,
+                   'srcport_or_icmptype_first': 1234,
+                   'srcport_or_icmptype_last': 1234,
+                   'src_ip_prefix_len': 32,
+                   'src_ip_addr': inet_pton(AF_INET, "1.1.1.1"),
+                   'dstport_or_icmpcode_first': 1234,
+                   'dstport_or_icmpcode_last': 1234,
+                   'dst_ip_prefix_len': 32,
+                   'dst_ip_addr': inet_pton(AF_INET, "1.1.1.2")})
+        acl_1 = self.vapi.acl_add_replace(acl_index=4294967295, r=[rule_1])
+
+        #
+        # ABF policy for ACL 1 - path via interface 1
+        #
+        abf_1 = VppAbfPolicy(self, 10, acl_1,
+                             [VppRoutePath(self.pg1.remote_ip4,
+                                           self.pg1.sw_if_index)])
+        abf_1.add_vpp_config()
+
+        #
+        # Attach the policy to input interface Pg0
+        #
+        attach_1 = VppAbfAttach(self, 10, self.pg0.sw_if_index, 50)
+        attach_1.add_vpp_config()
+
+        #
+        # fire in packet matching the ACL src,dst. If it's forwarded
+        # then the ABF was successful, since default routing will drop it
+        #
+        p_1 = (Ether(src=self.pg0.remote_mac,
+                     dst=self.pg0.local_mac) /
+               IP(src="1.1.1.1", dst="1.1.1.2") /
+               UDP(sport=1234, dport=1234) /
+               Raw('\xa5' * 100))
+        self.send_and_expect(self.pg0, p_1*NUM_PKTS, self.pg1)
+
+        #
+        # Attach a 'better' priority policy to the same interface
+        #
+        abf_2 = VppAbfPolicy(self, 11, acl_1,
+                             [VppRoutePath(self.pg2.remote_ip4,
+                                           self.pg2.sw_if_index)])
+        abf_2.add_vpp_config()
+        attach_2 = VppAbfAttach(self, 11, self.pg0.sw_if_index, 40)
+        attach_2.add_vpp_config()
+
+        self.send_and_expect(self.pg0, p_1*NUM_PKTS, self.pg2)
+
+        #
+        # Attach a policy with priority in the middle
+        #
+        abf_3 = VppAbfPolicy(self, 12, acl_1,
+                             [VppRoutePath(self.pg3.remote_ip4,
+                                           self.pg3.sw_if_index)])
+        abf_3.add_vpp_config()
+        attach_3 = VppAbfAttach(self, 12, self.pg0.sw_if_index, 45)
+        attach_3.add_vpp_config()
+
+        self.send_and_expect(self.pg0, p_1*NUM_PKTS, self.pg2)
+
+        #
+        # remove the best priority
+        #
+        attach_2.remove_vpp_config()
+        self.send_and_expect(self.pg0, p_1*NUM_PKTS, self.pg3)
+
+        #
+        # Attach one of the same policies to Pg1
+        #
+        attach_4 = VppAbfAttach(self, 12, self.pg1.sw_if_index, 45)
+        attach_4.add_vpp_config()
+
+        p_2 = (Ether(src=self.pg1.remote_mac,
+                     dst=self.pg1.local_mac) /
+               IP(src="1.1.1.1", dst="1.1.1.2") /
+               UDP(sport=1234, dport=1234) /
+               Raw('\xa5' * 100))
+        self.send_and_expect(self.pg1, p_2 * NUM_PKTS, self.pg3)
+
+        #
+        # detach the policy from PG1, now expect traffic to be dropped
+        #
+        attach_4.remove_vpp_config()
+
+        self.send_and_assert_no_replies(self.pg1, p_2 * NUM_PKTS, "Detached")
+
+        #
+        # Swap to route via a next-hop in the non-default table
+        #
+        table_20 = VppIpTable(self, 20)
+        table_20.add_vpp_config()
+
+        self.pg4.set_table_ip4(table_20.table_id)
+        self.pg4.admin_up()
+        self.pg4.config_ip4()
+        self.pg4.resolve_arp()
+
+        abf_13 = VppAbfPolicy(self, 13, acl_1,
+                              [VppRoutePath(self.pg4.remote_ip4,
+                                            0xffffffff,
+                                            nh_table_id=table_20.table_id)])
+        abf_13.add_vpp_config()
+        attach_5 = VppAbfAttach(self, 13, self.pg0.sw_if_index, 30)
+        attach_5.add_vpp_config()
+
+        self.send_and_expect(self.pg0, p_1*NUM_PKTS, self.pg4)
+
+        self.pg4.unconfig_ip4()
+        self.pg4.set_table_ip4(0)
+
+    def test_abf6(self):
+        """ IPv6 ACL Based Forwarding
+        """
+
+        #
+        # Simple test for matching IPv6 packets
+        #
+
+        #
+        # Rule 1
+        #
+        rule_1 = ({'is_permit': 1,
+                   'is_ipv6': 1,
+                   'proto': 17,
+                   'srcport_or_icmptype_first': 1234,
+                   'srcport_or_icmptype_last': 1234,
+                   'src_ip_prefix_len': 128,
+                   'src_ip_addr': inet_pton(AF_INET6, "2001::2"),
+                   'dstport_or_icmpcode_first': 1234,
+                   'dstport_or_icmpcode_last': 1234,
+                   'dst_ip_prefix_len': 128,
+                   'dst_ip_addr': inet_pton(AF_INET6, "2001::1")})
+        acl_1 = self.vapi.acl_add_replace(acl_index=4294967295,
+                                          r=[rule_1])
+
+        #
+        # ABF policy for ACL 1 - path via interface 1
+        #
+        abf_1 = VppAbfPolicy(self, 10, acl_1,
+                             [VppRoutePath("3001::1",
+                                           0xffffffff)])
+        abf_1.add_vpp_config()
+
+        attach_1 = VppAbfAttach(self, 10, self.pg0.sw_if_index,
+                                45, is_ipv6=True)
+        attach_1.add_vpp_config()
+
+        #
+        # a packet matching the rule
+        #
+        p = (Ether(src=self.pg0.remote_mac,
+                   dst=self.pg0.local_mac) /
+             IPv6(src="2001::2", dst="2001::1") /
+             UDP(sport=1234, dport=1234) /
+             Raw('\xa5' * 100))
+
+        #
+        # packets are dropped because there is no route to the policy's
+        # next hop
+        #
+        self.send_and_assert_no_replies(self.pg1, p * NUM_PKTS, "no route")
+
+        #
+        # add a route resolving the next-hop
+        #
+        route = VppIpRoute(self, "3001::1", 32,
+                           [VppRoutePath(self.pg1.remote_ip6,
+                                         self.pg1.sw_if_index)])
+        route.add_vpp_config()
+
+        #
+        # now expect packets forwarded.
+        #
+        self.send_and_expect(self.pg0, p * NUM_PKTS, self.pg1)
+
+
+if __name__ == '__main__':
+    unittest.main(testRunner=VppTestRunner)