tests: replace pycodestyle with black
[vpp.git] / test / test_abf.py
index 097476b..856d02a 100644 (file)
@@ -5,8 +5,13 @@ import unittest
 
 from framework import VppTestCase, VppTestRunner
 from vpp_ip import DpoProto
-from vpp_ip_route import VppIpRoute, VppRoutePath, VppMplsLabel, \
-    VppIpTable, FibPathProto
+from vpp_ip_route import (
+    VppIpRoute,
+    VppRoutePath,
+    VppMplsLabel,
+    VppIpTable,
+    FibPathProto,
+)
 from vpp_acl import AclRule, VppAcl
 
 from scapy.packet import Raw
@@ -31,19 +36,13 @@ def find_abf_policy(test, id):
 def find_abf_itf_attach(test, id, sw_if_index):
     attachs = test.vapi.abf_itf_attach_dump()
     for a in attachs:
-        if id == a.attach.policy_id and \
-           sw_if_index == a.attach.sw_if_index:
+        if id == a.attach.policy_id and sw_if_index == a.attach.sw_if_index:
             return True
     return False
 
 
 class VppAbfPolicy(VppObject):
-
-    def __init__(self,
-                 test,
-                 policy_id,
-                 acl,
-                 paths):
+    def __init__(self, test, policy_id, acl, paths):
         self._test = test
         self.policy_id = policy_id
         self.acl = acl
@@ -55,35 +54,35 @@ class VppAbfPolicy(VppObject):
     def add_vpp_config(self):
         self._test.vapi.abf_policy_add_del(
             1,
-            {'policy_id': self.policy_id,
-             'acl_index': self.acl.acl_index,
-             'n_paths': len(self.paths),
-             'paths': self.encoded_paths})
+            {
+                "policy_id": self.policy_id,
+                "acl_index": self.acl.acl_index,
+                "n_paths": len(self.paths),
+                "paths": self.encoded_paths,
+            },
+        )
         self._test.registry.register(self, self._test.logger)
 
     def remove_vpp_config(self):
         self._test.vapi.abf_policy_add_del(
             0,
-            {'policy_id': self.policy_id,
-             'acl_index': self.acl.acl_index,
-             'n_paths': len(self.paths),
-             'paths': self.encoded_paths})
+            {
+                "policy_id": self.policy_id,
+                "acl_index": self.acl.acl_index,
+                "n_paths": len(self.paths),
+                "paths": self.encoded_paths,
+            },
+        )
 
     def query_vpp_config(self):
         return find_abf_policy(self._test, self.policy_id)
 
     def object_id(self):
-        return ("abf-policy-%d" % self.policy_id)
+        return "abf-policy-%d" % self.policy_id
 
 
 class VppAbfAttach(VppObject):
-
-    def __init__(self,
-                 test,
-                 policy_id,
-                 sw_if_index,
-                 priority,
-                 is_ipv6=0):
+    def __init__(self, test, policy_id, sw_if_index, priority, is_ipv6=0):
         self._test = test
         self.policy_id = policy_id
         self.sw_if_index = sw_if_index
@@ -93,31 +92,35 @@ class VppAbfAttach(VppObject):
     def add_vpp_config(self):
         self._test.vapi.abf_itf_attach_add_del(
             1,
-            {'policy_id': self.policy_id,
-             'sw_if_index': self.sw_if_index,
-             'priority': self.priority,
-             'is_ipv6': self.is_ipv6})
+            {
+                "policy_id": self.policy_id,
+                "sw_if_index": self.sw_if_index,
+                "priority": self.priority,
+                "is_ipv6": self.is_ipv6,
+            },
+        )
         self._test.registry.register(self, self._test.logger)
 
     def remove_vpp_config(self):
         self._test.vapi.abf_itf_attach_add_del(
             0,
-            {'policy_id': self.policy_id,
-             'sw_if_index': self.sw_if_index,
-             'priority': self.priority,
-             'is_ipv6': self.is_ipv6})
+            {
+                "policy_id": self.policy_id,
+                "sw_if_index": self.sw_if_index,
+                "priority": self.priority,
+                "is_ipv6": self.is_ipv6,
+            },
+        )
 
     def query_vpp_config(self):
-        return find_abf_itf_attach(self._test,
-                                   self.policy_id,
-                                   self.sw_if_index)
+        return find_abf_itf_attach(self._test, self.policy_id, self.sw_if_index)
 
     def object_id(self):
-        return ("abf-attach-%d-%d" % (self.policy_id, self.sw_if_index))
+        return "abf-attach-%d-%d" % (self.policy_id, self.sw_if_index)
 
 
 class TestAbf(VppTestCase):
-    """ ABF Test Case """
+    """ABF Test Case"""
 
     @classmethod
     def setUpClass(cls):
@@ -147,8 +150,7 @@ class TestAbf(VppTestCase):
         super(TestAbf, self).tearDown()
 
     def test_abf4(self):
-        """ IPv4 ACL Based Forwarding
-        """
+        """IPv4 ACL Based Forwarding"""
 
         #
         # We are not testing the various matching capabilities
@@ -163,18 +165,22 @@ class TestAbf(VppTestCase):
         #
         # Rule 1
         #
-        rule_1 = AclRule(is_permit=1, proto=17, ports=1234,
-                         src_prefix=IPv4Network("1.1.1.1/32"),
-                         dst_prefix=IPv4Network("1.1.1.2/32"))
+        rule_1 = AclRule(
+            is_permit=1,
+            proto=17,
+            ports=1234,
+            src_prefix=IPv4Network("1.1.1.1/32"),
+            dst_prefix=IPv4Network("1.1.1.2/32"),
+        )
         acl_1 = VppAcl(self, rules=[rule_1])
         acl_1.add_vpp_config()
 
         #
         # ABF policy for ACL 1 - path via interface 1
         #
-        abf_1 = VppAbfPolicy(self, 10, acl_1,
-                             [VppRoutePath(self.pg1.remote_ip4,
-                                           self.pg1.sw_if_index)])
+        abf_1 = VppAbfPolicy(
+            self, 10, acl_1, [VppRoutePath(self.pg1.remote_ip4, self.pg1.sw_if_index)]
+        )
         abf_1.add_vpp_config()
 
         #
@@ -187,42 +193,43 @@ class TestAbf(VppTestCase):
         # fire in packet matching the ACL src,dst. If it's forwarded
         # then the ABF was successful, since default routing will drop it
         #
-        p_1 = (Ether(src=self.pg0.remote_mac,
-                     dst=self.pg0.local_mac) /
-               IP(src="1.1.1.1", dst="1.1.1.2") /
-               UDP(sport=1234, dport=1234) /
-               Raw(b'\xa5' * 100))
-        self.send_and_expect(self.pg0, p_1*NUM_PKTS, self.pg1)
+        p_1 = (
+            Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
+            / IP(src="1.1.1.1", dst="1.1.1.2")
+            / UDP(sport=1234, dport=1234)
+            / Raw(b"\xa5" * 100)
+        )
+        self.send_and_expect(self.pg0, p_1 * NUM_PKTS, self.pg1)
 
         #
         # Attach a 'better' priority policy to the same interface
         #
-        abf_2 = VppAbfPolicy(self, 11, acl_1,
-                             [VppRoutePath(self.pg2.remote_ip4,
-                                           self.pg2.sw_if_index)])
+        abf_2 = VppAbfPolicy(
+            self, 11, acl_1, [VppRoutePath(self.pg2.remote_ip4, self.pg2.sw_if_index)]
+        )
         abf_2.add_vpp_config()
         attach_2 = VppAbfAttach(self, 11, self.pg0.sw_if_index, 40)
         attach_2.add_vpp_config()
 
-        self.send_and_expect(self.pg0, p_1*NUM_PKTS, self.pg2)
+        self.send_and_expect(self.pg0, p_1 * NUM_PKTS, self.pg2)
 
         #
         # Attach a policy with priority in the middle
         #
-        abf_3 = VppAbfPolicy(self, 12, acl_1,
-                             [VppRoutePath(self.pg3.remote_ip4,
-                                           self.pg3.sw_if_index)])
+        abf_3 = VppAbfPolicy(
+            self, 12, acl_1, [VppRoutePath(self.pg3.remote_ip4, self.pg3.sw_if_index)]
+        )
         abf_3.add_vpp_config()
         attach_3 = VppAbfAttach(self, 12, self.pg0.sw_if_index, 45)
         attach_3.add_vpp_config()
 
-        self.send_and_expect(self.pg0, p_1*NUM_PKTS, self.pg2)
+        self.send_and_expect(self.pg0, p_1 * NUM_PKTS, self.pg2)
 
         #
         # remove the best priority
         #
         attach_2.remove_vpp_config()
-        self.send_and_expect(self.pg0, p_1*NUM_PKTS, self.pg3)
+        self.send_and_expect(self.pg0, p_1 * NUM_PKTS, self.pg3)
 
         #
         # Attach one of the same policies to Pg1
@@ -230,11 +237,12 @@ class TestAbf(VppTestCase):
         attach_4 = VppAbfAttach(self, 12, self.pg1.sw_if_index, 45)
         attach_4.add_vpp_config()
 
-        p_2 = (Ether(src=self.pg1.remote_mac,
-                     dst=self.pg1.local_mac) /
-               IP(src="1.1.1.1", dst="1.1.1.2") /
-               UDP(sport=1234, dport=1234) /
-               Raw(b'\xa5' * 100))
+        p_2 = (
+            Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
+            / IP(src="1.1.1.1", dst="1.1.1.2")
+            / UDP(sport=1234, dport=1234)
+            / Raw(b"\xa5" * 100)
+        )
         self.send_and_expect(self.pg1, p_2 * NUM_PKTS, self.pg3)
 
         #
@@ -255,22 +263,27 @@ class TestAbf(VppTestCase):
         self.pg4.config_ip4()
         self.pg4.resolve_arp()
 
-        abf_13 = VppAbfPolicy(self, 13, acl_1,
-                              [VppRoutePath(self.pg4.remote_ip4,
-                                            0xffffffff,
-                                            nh_table_id=table_20.table_id)])
+        abf_13 = VppAbfPolicy(
+            self,
+            13,
+            acl_1,
+            [
+                VppRoutePath(
+                    self.pg4.remote_ip4, 0xFFFFFFFF, nh_table_id=table_20.table_id
+                )
+            ],
+        )
         abf_13.add_vpp_config()
         attach_5 = VppAbfAttach(self, 13, self.pg0.sw_if_index, 30)
         attach_5.add_vpp_config()
 
-        self.send_and_expect(self.pg0, p_1*NUM_PKTS, self.pg4)
+        self.send_and_expect(self.pg0, p_1 * NUM_PKTS, self.pg4)
 
         self.pg4.unconfig_ip4()
         self.pg4.set_table_ip4(0)
 
     def test_abf6(self):
-        """ IPv6 ACL Based Forwarding
-        """
+        """IPv6 ACL Based Forwarding"""
 
         #
         # Simple test for matching IPv6 packets
@@ -279,32 +292,34 @@ class TestAbf(VppTestCase):
         #
         # Rule 1
         #
-        rule_1 = AclRule(is_permit=1, proto=17, ports=1234,
-                         src_prefix=IPv6Network("2001::2/128"),
-                         dst_prefix=IPv6Network("2001::1/128"))
+        rule_1 = AclRule(
+            is_permit=1,
+            proto=17,
+            ports=1234,
+            src_prefix=IPv6Network("2001::2/128"),
+            dst_prefix=IPv6Network("2001::1/128"),
+        )
         acl_1 = VppAcl(self, rules=[rule_1])
         acl_1.add_vpp_config()
 
         #
         # ABF policy for ACL 1 - path via interface 1
         #
-        abf_1 = VppAbfPolicy(self, 10, acl_1,
-                             [VppRoutePath("3001::1",
-                                           0xffffffff)])
+        abf_1 = VppAbfPolicy(self, 10, acl_1, [VppRoutePath("3001::1", 0xFFFFFFFF)])
         abf_1.add_vpp_config()
 
-        attach_1 = VppAbfAttach(self, 10, self.pg0.sw_if_index,
-                                45, is_ipv6=True)
+        attach_1 = VppAbfAttach(self, 10, self.pg0.sw_if_index, 45, is_ipv6=True)
         attach_1.add_vpp_config()
 
         #
         # a packet matching the rule
         #
-        p = (Ether(src=self.pg0.remote_mac,
-                   dst=self.pg0.local_mac) /
-             IPv6(src="2001::2", dst="2001::1") /
-             UDP(sport=1234, dport=1234) /
-             Raw(b'\xa5' * 100))
+        p = (
+            Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
+            / IPv6(src="2001::2", dst="2001::1")
+            / UDP(sport=1234, dport=1234)
+            / Raw(b"\xa5" * 100)
+        )
 
         #
         # packets are dropped because there is no route to the policy's
@@ -315,9 +330,12 @@ class TestAbf(VppTestCase):
         #
         # add a route resolving the next-hop
         #
-        route = VppIpRoute(self, "3001::1", 32,
-                           [VppRoutePath(self.pg1.remote_ip6,
-                                         self.pg1.sw_if_index)])
+        route = VppIpRoute(
+            self,
+            "3001::1",
+            32,
+            [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)],
+        )
         route.add_vpp_config()
 
         #
@@ -326,5 +344,5 @@ class TestAbf(VppTestCase):
         self.send_and_expect(self.pg0, p * NUM_PKTS, self.pg1)
 
 
-if __name__ == '__main__':
+if __name__ == "__main__":
     unittest.main(testRunner=VppTestRunner)