vapi: implement vapi_wait() for reads
[vpp.git] / test / test_abf.py
index ecd8c53..856d02a 100644 (file)
@@ -1,19 +1,29 @@
-#!/usr/bin/env python
+#!/usr/bin/env python3
 
 from socket import inet_pton, inet_ntop, AF_INET, AF_INET6
 import unittest
 
 from framework import VppTestCase, VppTestRunner
 from vpp_ip import DpoProto
 
 from socket import inet_pton, inet_ntop, AF_INET, AF_INET6
 import unittest
 
 from framework import VppTestCase, VppTestRunner
 from vpp_ip import DpoProto
-from vpp_ip_route import VppIpRoute, VppRoutePath, VppMplsLabel, VppIpTable
+from vpp_ip_route import (
+    VppIpRoute,
+    VppRoutePath,
+    VppMplsLabel,
+    VppIpTable,
+    FibPathProto,
+)
+from vpp_acl import AclRule, VppAcl
 
 from scapy.packet import Raw
 from scapy.layers.l2 import Ether
 from scapy.layers.inet import IP, UDP
 from scapy.layers.inet6 import IPv6
 
 from scapy.packet import Raw
 from scapy.layers.l2 import Ether
 from scapy.layers.inet import IP, UDP
 from scapy.layers.inet6 import IPv6
+from ipaddress import IPv4Network, IPv6Network
 
 from vpp_object import VppObject
 
 
 from vpp_object import VppObject
 
+NUM_PKTS = 67
+
 
 def find_abf_policy(test, id):
     policies = test.vapi.abf_policy_dump()
 
 def find_abf_policy(test, id):
     policies = test.vapi.abf_policy_dump()
@@ -26,83 +36,53 @@ def find_abf_policy(test, id):
 def find_abf_itf_attach(test, id, sw_if_index):
     attachs = test.vapi.abf_itf_attach_dump()
     for a in attachs:
 def find_abf_itf_attach(test, id, sw_if_index):
     attachs = test.vapi.abf_itf_attach_dump()
     for a in attachs:
-        if id == a.attach.policy_id and \
-           sw_if_index == a.attach.sw_if_index:
+        if id == a.attach.policy_id and sw_if_index == a.attach.sw_if_index:
             return True
     return False
 
 
 class VppAbfPolicy(VppObject):
             return True
     return False
 
 
 class VppAbfPolicy(VppObject):
-
-    def __init__(self,
-                 test,
-                 policy_id,
-                 acl,
-                 paths):
+    def __init__(self, test, policy_id, acl, paths):
         self._test = test
         self.policy_id = policy_id
         self.acl = acl
         self.paths = paths
         self._test = test
         self.policy_id = policy_id
         self.acl = acl
         self.paths = paths
-
-    def encode_paths(self):
-        br_paths = []
-        for p in self.paths:
-            lstack = []
-            for l in p.nh_labels:
-                if type(l) == VppMplsLabel:
-                    lstack.append(l.encode())
-                else:
-                    lstack.append({'label': l, 'ttl': 255})
-            n_labels = len(lstack)
-            while (len(lstack) < 16):
-                lstack.append({})
-            br_paths.append({'next_hop': p.nh_addr,
-                             'weight': 1,
-                             'afi': p.proto,
-                             'sw_if_index': 0xffffffff,
-                             'preference': 0,
-                             'table_id': p.nh_table_id,
-                             'next_hop_id': p.next_hop_id,
-                             'is_udp_encap': p.is_udp_encap,
-                             'n_labels': n_labels,
-                             'label_stack': lstack})
-        return br_paths
+        self.encoded_paths = []
+        for path in self.paths:
+            self.encoded_paths.append(path.encode())
 
     def add_vpp_config(self):
         self._test.vapi.abf_policy_add_del(
             1,
 
     def add_vpp_config(self):
         self._test.vapi.abf_policy_add_del(
             1,
-            {'policy_id': self.policy_id,
-             'acl_index': self.acl.acl_index,
-             'n_paths': len(self.paths),
-             'paths': self.encode_paths()})
+            {
+                "policy_id": self.policy_id,
+                "acl_index": self.acl.acl_index,
+                "n_paths": len(self.paths),
+                "paths": self.encoded_paths,
+            },
+        )
         self._test.registry.register(self, self._test.logger)
 
     def remove_vpp_config(self):
         self._test.vapi.abf_policy_add_del(
             0,
         self._test.registry.register(self, self._test.logger)
 
     def remove_vpp_config(self):
         self._test.vapi.abf_policy_add_del(
             0,
-            {'policy_id': self.policy_id,
-             'acl_index': self.acl.acl_index,
-             'n_paths': len(self.paths),
-             'paths': self.encode_paths()})
+            {
+                "policy_id": self.policy_id,
+                "acl_index": self.acl.acl_index,
+                "n_paths": len(self.paths),
+                "paths": self.encoded_paths,
+            },
+        )
 
     def query_vpp_config(self):
         return find_abf_policy(self._test, self.policy_id)
 
 
     def query_vpp_config(self):
         return find_abf_policy(self._test, self.policy_id)
 
-    def __str__(self):
-        return self.object_id()
-
     def object_id(self):
     def object_id(self):
-        return ("abf-policy-%d" % self.policy_id)
+        return "abf-policy-%d" % self.policy_id
 
 
 class VppAbfAttach(VppObject):
 
 
 class VppAbfAttach(VppObject):
-
-    def __init__(self,
-                 test,
-                 policy_id,
-                 sw_if_index,
-                 priority,
-                 is_ipv6=0):
+    def __init__(self, test, policy_id, sw_if_index, priority, is_ipv6=0):
         self._test = test
         self.policy_id = policy_id
         self.sw_if_index = sw_if_index
         self._test = test
         self.policy_id = policy_id
         self.sw_if_index = sw_if_index
@@ -112,34 +92,43 @@ class VppAbfAttach(VppObject):
     def add_vpp_config(self):
         self._test.vapi.abf_itf_attach_add_del(
             1,
     def add_vpp_config(self):
         self._test.vapi.abf_itf_attach_add_del(
             1,
-            {'policy_id': self.policy_id,
-             'sw_if_index': self.sw_if_index,
-             'priority': self.priority,
-             'is_ipv6': self.is_ipv6})
+            {
+                "policy_id": self.policy_id,
+                "sw_if_index": self.sw_if_index,
+                "priority": self.priority,
+                "is_ipv6": self.is_ipv6,
+            },
+        )
         self._test.registry.register(self, self._test.logger)
 
     def remove_vpp_config(self):
         self._test.vapi.abf_itf_attach_add_del(
             0,
         self._test.registry.register(self, self._test.logger)
 
     def remove_vpp_config(self):
         self._test.vapi.abf_itf_attach_add_del(
             0,
-            {'policy_id': self.policy_id,
-             'sw_if_index': self.sw_if_index,
-             'priority': self.priority,
-             'is_ipv6': self.is_ipv6})
+            {
+                "policy_id": self.policy_id,
+                "sw_if_index": self.sw_if_index,
+                "priority": self.priority,
+                "is_ipv6": self.is_ipv6,
+            },
+        )
 
     def query_vpp_config(self):
 
     def query_vpp_config(self):
-        return find_abf_itf_attach(self._test,
-                                   self.policy_id,
-                                   self.sw_if_index)
-
-    def __str__(self):
-        return self.object_id()
+        return find_abf_itf_attach(self._test, self.policy_id, self.sw_if_index)
 
     def object_id(self):
 
     def object_id(self):
-        return ("abf-attach-%d-%d" % (self.policy_id, self.sw_if_index))
+        return "abf-attach-%d-%d" % (self.policy_id, self.sw_if_index)
 
 
 class TestAbf(VppTestCase):
 
 
 class TestAbf(VppTestCase):
-    """ ABF Test Case """
+    """ABF Test Case"""
+
+    @classmethod
+    def setUpClass(cls):
+        super(TestAbf, cls).setUpClass()
+
+    @classmethod
+    def tearDownClass(cls):
+        super(TestAbf, cls).tearDownClass()
 
     def setUp(self):
         super(TestAbf, self).setUp()
 
     def setUp(self):
         super(TestAbf, self).setUp()
@@ -157,13 +146,11 @@ class TestAbf(VppTestCase):
         for i in self.pg_interfaces:
             i.unconfig_ip4()
             i.unconfig_ip6()
         for i in self.pg_interfaces:
             i.unconfig_ip4()
             i.unconfig_ip6()
-            i.ip6_disable()
             i.admin_down()
         super(TestAbf, self).tearDown()
 
     def test_abf4(self):
             i.admin_down()
         super(TestAbf, self).tearDown()
 
     def test_abf4(self):
-        """ IPv4 ACL Based Forwarding
-        """
+        """IPv4 ACL Based Forwarding"""
 
         #
         # We are not testing the various matching capabilities
 
         #
         # We are not testing the various matching capabilities
@@ -178,25 +165,22 @@ class TestAbf(VppTestCase):
         #
         # Rule 1
         #
         #
         # Rule 1
         #
-        rule_1 = ({'is_permit': 1,
-                   'is_ipv6': 0,
-                   'proto': 17,
-                   'srcport_or_icmptype_first': 1234,
-                   'srcport_or_icmptype_last': 1234,
-                   'src_ip_prefix_len': 32,
-                   'src_ip_addr': inet_pton(AF_INET, "1.1.1.1"),
-                   'dstport_or_icmpcode_first': 1234,
-                   'dstport_or_icmpcode_last': 1234,
-                   'dst_ip_prefix_len': 32,
-                   'dst_ip_addr': inet_pton(AF_INET, "1.1.1.2")})
-        acl_1 = self.vapi.acl_add_replace(acl_index=4294967295, r=[rule_1])
+        rule_1 = AclRule(
+            is_permit=1,
+            proto=17,
+            ports=1234,
+            src_prefix=IPv4Network("1.1.1.1/32"),
+            dst_prefix=IPv4Network("1.1.1.2/32"),
+        )
+        acl_1 = VppAcl(self, rules=[rule_1])
+        acl_1.add_vpp_config()
 
         #
         # ABF policy for ACL 1 - path via interface 1
         #
 
         #
         # ABF policy for ACL 1 - path via interface 1
         #
-        abf_1 = VppAbfPolicy(self, 10, acl_1,
-                             [VppRoutePath(self.pg1.remote_ip4,
-                                           self.pg1.sw_if_index)])
+        abf_1 = VppAbfPolicy(
+            self, 10, acl_1, [VppRoutePath(self.pg1.remote_ip4, self.pg1.sw_if_index)]
+        )
         abf_1.add_vpp_config()
 
         #
         abf_1.add_vpp_config()
 
         #
@@ -209,42 +193,43 @@ class TestAbf(VppTestCase):
         # fire in packet matching the ACL src,dst. If it's forwarded
         # then the ABF was successful, since default routing will drop it
         #
         # fire in packet matching the ACL src,dst. If it's forwarded
         # then the ABF was successful, since default routing will drop it
         #
-        p_1 = (Ether(src=self.pg0.remote_mac,
-                     dst=self.pg0.local_mac) /
-               IP(src="1.1.1.1", dst="1.1.1.2") /
-               UDP(sport=1234, dport=1234) /
-               Raw('\xa5' * 100))
-        self.send_and_expect(self.pg0, p_1*65, self.pg1)
+        p_1 = (
+            Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
+            / IP(src="1.1.1.1", dst="1.1.1.2")
+            / UDP(sport=1234, dport=1234)
+            / Raw(b"\xa5" * 100)
+        )
+        self.send_and_expect(self.pg0, p_1 * NUM_PKTS, self.pg1)
 
         #
         # Attach a 'better' priority policy to the same interface
         #
 
         #
         # Attach a 'better' priority policy to the same interface
         #
-        abf_2 = VppAbfPolicy(self, 11, acl_1,
-                             [VppRoutePath(self.pg2.remote_ip4,
-                                           self.pg2.sw_if_index)])
+        abf_2 = VppAbfPolicy(
+            self, 11, acl_1, [VppRoutePath(self.pg2.remote_ip4, self.pg2.sw_if_index)]
+        )
         abf_2.add_vpp_config()
         attach_2 = VppAbfAttach(self, 11, self.pg0.sw_if_index, 40)
         attach_2.add_vpp_config()
 
         abf_2.add_vpp_config()
         attach_2 = VppAbfAttach(self, 11, self.pg0.sw_if_index, 40)
         attach_2.add_vpp_config()
 
-        self.send_and_expect(self.pg0, p_1*65, self.pg2)
+        self.send_and_expect(self.pg0, p_1 * NUM_PKTS, self.pg2)
 
         #
         # Attach a policy with priority in the middle
         #
 
         #
         # Attach a policy with priority in the middle
         #
-        abf_3 = VppAbfPolicy(self, 12, acl_1,
-                             [VppRoutePath(self.pg3.remote_ip4,
-                                           self.pg3.sw_if_index)])
+        abf_3 = VppAbfPolicy(
+            self, 12, acl_1, [VppRoutePath(self.pg3.remote_ip4, self.pg3.sw_if_index)]
+        )
         abf_3.add_vpp_config()
         attach_3 = VppAbfAttach(self, 12, self.pg0.sw_if_index, 45)
         attach_3.add_vpp_config()
 
         abf_3.add_vpp_config()
         attach_3 = VppAbfAttach(self, 12, self.pg0.sw_if_index, 45)
         attach_3.add_vpp_config()
 
-        self.send_and_expect(self.pg0, p_1*65, self.pg2)
+        self.send_and_expect(self.pg0, p_1 * NUM_PKTS, self.pg2)
 
         #
         # remove the best priority
         #
         attach_2.remove_vpp_config()
 
         #
         # remove the best priority
         #
         attach_2.remove_vpp_config()
-        self.send_and_expect(self.pg0, p_1*65, self.pg3)
+        self.send_and_expect(self.pg0, p_1 * NUM_PKTS, self.pg3)
 
         #
         # Attach one of the same policies to Pg1
 
         #
         # Attach one of the same policies to Pg1
@@ -252,19 +237,20 @@ class TestAbf(VppTestCase):
         attach_4 = VppAbfAttach(self, 12, self.pg1.sw_if_index, 45)
         attach_4.add_vpp_config()
 
         attach_4 = VppAbfAttach(self, 12, self.pg1.sw_if_index, 45)
         attach_4.add_vpp_config()
 
-        p_2 = (Ether(src=self.pg1.remote_mac,
-                     dst=self.pg1.local_mac) /
-               IP(src="1.1.1.1", dst="1.1.1.2") /
-               UDP(sport=1234, dport=1234) /
-               Raw('\xa5' * 100))
-        self.send_and_expect(self.pg1, p_2 * 65, self.pg3)
+        p_2 = (
+            Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
+            / IP(src="1.1.1.1", dst="1.1.1.2")
+            / UDP(sport=1234, dport=1234)
+            / Raw(b"\xa5" * 100)
+        )
+        self.send_and_expect(self.pg1, p_2 * NUM_PKTS, self.pg3)
 
         #
         # detach the policy from PG1, now expect traffic to be dropped
         #
         attach_4.remove_vpp_config()
 
 
         #
         # detach the policy from PG1, now expect traffic to be dropped
         #
         attach_4.remove_vpp_config()
 
-        self.send_and_assert_no_replies(self.pg1, p_2 * 65, "Detached")
+        self.send_and_assert_no_replies(self.pg1, p_2 * NUM_PKTS, "Detached")
 
         #
         # Swap to route via a next-hop in the non-default table
 
         #
         # Swap to route via a next-hop in the non-default table
@@ -277,22 +263,27 @@ class TestAbf(VppTestCase):
         self.pg4.config_ip4()
         self.pg4.resolve_arp()
 
         self.pg4.config_ip4()
         self.pg4.resolve_arp()
 
-        abf_13 = VppAbfPolicy(self, 13, acl_1,
-                              [VppRoutePath(self.pg4.remote_ip4,
-                                            0xffffffff,
-                                            nh_table_id=table_20.table_id)])
+        abf_13 = VppAbfPolicy(
+            self,
+            13,
+            acl_1,
+            [
+                VppRoutePath(
+                    self.pg4.remote_ip4, 0xFFFFFFFF, nh_table_id=table_20.table_id
+                )
+            ],
+        )
         abf_13.add_vpp_config()
         attach_5 = VppAbfAttach(self, 13, self.pg0.sw_if_index, 30)
         attach_5.add_vpp_config()
 
         abf_13.add_vpp_config()
         attach_5 = VppAbfAttach(self, 13, self.pg0.sw_if_index, 30)
         attach_5.add_vpp_config()
 
-        self.send_and_expect(self.pg0, p_1*65, self.pg4)
+        self.send_and_expect(self.pg0, p_1 * NUM_PKTS, self.pg4)
 
         self.pg4.unconfig_ip4()
         self.pg4.set_table_ip4(0)
 
     def test_abf6(self):
 
         self.pg4.unconfig_ip4()
         self.pg4.set_table_ip4(0)
 
     def test_abf6(self):
-        """ IPv6 ACL Based Forwarding
-        """
+        """IPv6 ACL Based Forwarding"""
 
         #
         # Simple test for matching IPv6 packets
 
         #
         # Simple test for matching IPv6 packets
@@ -301,63 +292,57 @@ class TestAbf(VppTestCase):
         #
         # Rule 1
         #
         #
         # Rule 1
         #
-        rule_1 = ({'is_permit': 1,
-                   'is_ipv6': 1,
-                   'proto': 17,
-                   'srcport_or_icmptype_first': 1234,
-                   'srcport_or_icmptype_last': 1234,
-                   'src_ip_prefix_len': 128,
-                   'src_ip_addr': inet_pton(AF_INET6, "2001::2"),
-                   'dstport_or_icmpcode_first': 1234,
-                   'dstport_or_icmpcode_last': 1234,
-                   'dst_ip_prefix_len': 128,
-                   'dst_ip_addr': inet_pton(AF_INET6, "2001::1")})
-        acl_1 = self.vapi.acl_add_replace(acl_index=4294967295,
-                                          r=[rule_1])
+        rule_1 = AclRule(
+            is_permit=1,
+            proto=17,
+            ports=1234,
+            src_prefix=IPv6Network("2001::2/128"),
+            dst_prefix=IPv6Network("2001::1/128"),
+        )
+        acl_1 = VppAcl(self, rules=[rule_1])
+        acl_1.add_vpp_config()
 
         #
         # ABF policy for ACL 1 - path via interface 1
         #
 
         #
         # ABF policy for ACL 1 - path via interface 1
         #
-        abf_1 = VppAbfPolicy(self, 10, acl_1,
-                             [VppRoutePath("3001::1",
-                                           0xffffffff,
-                                           proto=DpoProto.DPO_PROTO_IP6)])
+        abf_1 = VppAbfPolicy(self, 10, acl_1, [VppRoutePath("3001::1", 0xFFFFFFFF)])
         abf_1.add_vpp_config()
 
         abf_1.add_vpp_config()
 
-        attach_1 = VppAbfAttach(self, 10, self.pg0.sw_if_index,
-                                45, is_ipv6=True)
+        attach_1 = VppAbfAttach(self, 10, self.pg0.sw_if_index, 45, is_ipv6=True)
         attach_1.add_vpp_config()
 
         #
         # a packet matching the rule
         #
         attach_1.add_vpp_config()
 
         #
         # a packet matching the rule
         #
-        p = (Ether(src=self.pg0.remote_mac,
-                   dst=self.pg0.local_mac) /
-             IPv6(src="2001::2", dst="2001::1") /
-             UDP(sport=1234, dport=1234) /
-             Raw('\xa5' * 100))
+        p = (
+            Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
+            / IPv6(src="2001::2", dst="2001::1")
+            / UDP(sport=1234, dport=1234)
+            / Raw(b"\xa5" * 100)
+        )
 
         #
         # packets are dropped because there is no route to the policy's
         # next hop
         #
 
         #
         # packets are dropped because there is no route to the policy's
         # next hop
         #
-        self.send_and_assert_no_replies(self.pg1, p * 65, "no route")
+        self.send_and_assert_no_replies(self.pg1, p * NUM_PKTS, "no route")
 
         #
         # add a route resolving the next-hop
         #
 
         #
         # add a route resolving the next-hop
         #
-        route = VppIpRoute(self, "3001::1", 32,
-                           [VppRoutePath(self.pg1.remote_ip6,
-                                         self.pg1.sw_if_index,
-                                         proto=DpoProto.DPO_PROTO_IP6)],
-                           is_ip6=1)
+        route = VppIpRoute(
+            self,
+            "3001::1",
+            32,
+            [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)],
+        )
         route.add_vpp_config()
 
         #
         # now expect packets forwarded.
         #
         route.add_vpp_config()
 
         #
         # now expect packets forwarded.
         #
-        self.send_and_expect(self.pg0, p * 65, self.pg1)
+        self.send_and_expect(self.pg0, p * NUM_PKTS, self.pg1)
 
 
 
 
-if __name__ == '__main__':
+if __name__ == "__main__":
     unittest.main(testRunner=VppTestRunner)
     unittest.main(testRunner=VppTestRunner)