tests: replace pycodestyle with black
[vpp.git] / test / test_ipsec_spd_flow_cache_output.py
index 54571c6..9852b37 100644 (file)
@@ -11,16 +11,14 @@ class SpdFlowCacheOutbound(SpdFlowCacheTemplate):
     @classmethod
     def setUpConstants(cls):
         super(SpdFlowCacheOutbound, cls).setUpConstants()
-        cls.vpp_cmdline.extend(["ipsec", "{",
-                                "ipv4-outbound-spd-flow-cache on",
-                                "}"])
-        cls.logger.info("VPP modified cmdline is %s" % " "
-                        .join(cls.vpp_cmdline))
+        cls.vpp_cmdline.extend(["ipsec", "{", "ipv4-outbound-spd-flow-cache on", "}"])
+        cls.logger.info("VPP modified cmdline is %s" % " ".join(cls.vpp_cmdline))
 
 
 class IPSec4SpdTestCaseAdd(SpdFlowCacheOutbound):
     """ IPSec/IPv4 outbound: Policy mode test case with flow cache \
         (add rule)"""
+
     def test_ipsec_spd_outbound_add(self):
         # In this test case, packets in IPv4 FWD path are configured
         # to go through IPSec outbound SPD policy lookup.
@@ -33,11 +31,23 @@ class IPSec4SpdTestCaseAdd(SpdFlowCacheOutbound):
         pkt_count = 5
         self.spd_create_and_intf_add(1, [self.pg1])
         policy_0 = self.spd_add_rem_policy(  # outbound, priority 10
-            1, self.pg0, self.pg1, socket.IPPROTO_UDP,
-            is_out=1, priority=10, policy_type="bypass")
+            1,
+            self.pg0,
+            self.pg1,
+            socket.IPPROTO_UDP,
+            is_out=1,
+            priority=10,
+            policy_type="bypass",
+        )
         policy_1 = self.spd_add_rem_policy(  # outbound, priority 5
-            1, self.pg0, self.pg1, socket.IPPROTO_UDP,
-            is_out=1, priority=5, policy_type="discard")
+            1,
+            self.pg0,
+            self.pg1,
+            socket.IPPROTO_UDP,
+            is_out=1,
+            priority=5,
+            policy_type="discard",
+        )
 
         # check flow cache is empty before sending traffic
         self.verify_num_outbound_flow_cache_entries(0)
@@ -75,6 +85,7 @@ class IPSec4SpdTestCaseAdd(SpdFlowCacheOutbound):
 class IPSec4SpdTestCaseRemove(SpdFlowCacheOutbound):
     """ IPSec/IPv4 outbound: Policy mode test case with flow cache \
         (remove rule)"""
+
     def test_ipsec_spd_outbound_remove(self):
         # In this test case, packets in IPv4 FWD path are configured
         # to go through IPSec outbound SPD policy lookup.
@@ -88,11 +99,23 @@ class IPSec4SpdTestCaseRemove(SpdFlowCacheOutbound):
         pkt_count = 5
         self.spd_create_and_intf_add(1, [self.pg1])
         policy_0 = self.spd_add_rem_policy(  # outbound, priority 10
-            1, self.pg0, self.pg1, socket.IPPROTO_UDP,
-            is_out=1, priority=10, policy_type="bypass")
+            1,
+            self.pg0,
+            self.pg1,
+            socket.IPPROTO_UDP,
+            is_out=1,
+            priority=10,
+            policy_type="bypass",
+        )
         policy_1 = self.spd_add_rem_policy(  # outbound, priority 5
-            1, self.pg0, self.pg1, socket.IPPROTO_UDP,
-            is_out=1, priority=5, policy_type="discard")
+            1,
+            self.pg0,
+            self.pg1,
+            socket.IPPROTO_UDP,
+            is_out=1,
+            priority=5,
+            policy_type="discard",
+        )
 
         # check flow cache is empty before sending traffic
         self.verify_num_outbound_flow_cache_entries(0)
@@ -128,9 +151,15 @@ class IPSec4SpdTestCaseRemove(SpdFlowCacheOutbound):
 
         # now remove the bypass rule
         self.spd_add_rem_policy(  # outbound, priority 10
-            1, self.pg0, self.pg1, socket.IPPROTO_UDP,
-            is_out=1, priority=10, policy_type="bypass",
-            remove=True)
+            1,
+            self.pg0,
+            self.pg1,
+            socket.IPPROTO_UDP,
+            is_out=1,
+            priority=10,
+            policy_type="bypass",
+            remove=True,
+        )
         # verify flow cache counter has been reset by rule removal
         self.verify_num_outbound_flow_cache_entries(0)
 
@@ -154,6 +183,7 @@ class IPSec4SpdTestCaseRemove(SpdFlowCacheOutbound):
 class IPSec4SpdTestCaseReadd(SpdFlowCacheOutbound):
     """ IPSec/IPv4 outbound: Policy mode test case with flow cache \
         (add, remove, re-add)"""
+
     def test_ipsec_spd_outbound_readd(self):
         # In this test case, packets in IPv4 FWD path are configured
         # to go through IPSec outbound SPD policy lookup.
@@ -172,11 +202,23 @@ class IPSec4SpdTestCaseReadd(SpdFlowCacheOutbound):
         pkt_count = 5
         self.spd_create_and_intf_add(1, [self.pg1])
         policy_0 = self.spd_add_rem_policy(  # outbound, priority 10
-            1, self.pg0, self.pg1, socket.IPPROTO_UDP,
-            is_out=1, priority=10, policy_type="bypass")
+            1,
+            self.pg0,
+            self.pg1,
+            socket.IPPROTO_UDP,
+            is_out=1,
+            priority=10,
+            policy_type="bypass",
+        )
         policy_1 = self.spd_add_rem_policy(  # outbound, priority 5
-            1, self.pg0, self.pg1, socket.IPPROTO_UDP,
-            is_out=1, priority=5, policy_type="discard")
+            1,
+            self.pg0,
+            self.pg1,
+            socket.IPPROTO_UDP,
+            is_out=1,
+            priority=5,
+            policy_type="discard",
+        )
 
         # check flow cache is empty before sending traffic
         self.verify_num_outbound_flow_cache_entries(0)
@@ -212,9 +254,15 @@ class IPSec4SpdTestCaseReadd(SpdFlowCacheOutbound):
 
         # now remove the bypass rule, leaving only the discard rule
         self.spd_add_rem_policy(  # outbound, priority 10
-            1, self.pg0, self.pg1, socket.IPPROTO_UDP,
-            is_out=1, priority=10, policy_type="bypass",
-            remove=True)
+            1,
+            self.pg0,
+            self.pg1,
+            socket.IPPROTO_UDP,
+            is_out=1,
+            priority=10,
+            policy_type="bypass",
+            remove=True,
+        )
         # verify flow cache counter has been reset by rule removal
         self.verify_num_outbound_flow_cache_entries(0)
 
@@ -236,8 +284,14 @@ class IPSec4SpdTestCaseReadd(SpdFlowCacheOutbound):
 
         # now readd the bypass rule
         policy_0 = self.spd_add_rem_policy(  # outbound, priority 10
-            1, self.pg0, self.pg1, socket.IPPROTO_UDP,
-            is_out=1, priority=10, policy_type="bypass")
+            1,
+            self.pg0,
+            self.pg1,
+            socket.IPPROTO_UDP,
+            is_out=1,
+            priority=10,
+            policy_type="bypass",
+        )
         # verify flow cache counter has been reset by rule addition
         self.verify_num_outbound_flow_cache_entries(0)
 
@@ -271,6 +325,7 @@ class IPSec4SpdTestCaseReadd(SpdFlowCacheOutbound):
 class IPSec4SpdTestCaseMultiple(SpdFlowCacheOutbound):
     """ IPSec/IPv4 outbound: Policy mode test case with flow cache \
         (multiple interfaces, multiple rules)"""
+
     def test_ipsec_spd_outbound_multiple(self):
         # In this test case, packets in IPv4 FWD path are configured to go
         # through IPSec outbound SPD policy lookup.
@@ -286,32 +341,75 @@ class IPSec4SpdTestCaseMultiple(SpdFlowCacheOutbound):
         self.spd_create_and_intf_add(1, self.pg_interfaces)
         # add rules on all interfaces
         policy_01 = self.spd_add_rem_policy(  # outbound, priority 10
-            1, self.pg0, self.pg1, socket.IPPROTO_UDP,
-            is_out=1, priority=10, policy_type="bypass")
+            1,
+            self.pg0,
+            self.pg1,
+            socket.IPPROTO_UDP,
+            is_out=1,
+            priority=10,
+            policy_type="bypass",
+        )
         policy_02 = self.spd_add_rem_policy(  # outbound, priority 5
-            1, self.pg0, self.pg1, socket.IPPROTO_UDP,
-            is_out=1, priority=5, policy_type="discard")
+            1,
+            self.pg0,
+            self.pg1,
+            socket.IPPROTO_UDP,
+            is_out=1,
+            priority=5,
+            policy_type="discard",
+        )
 
         policy_11 = self.spd_add_rem_policy(  # outbound, priority 10
-            1, self.pg1, self.pg2, socket.IPPROTO_UDP,
-            is_out=1, priority=10, policy_type="bypass")
+            1,
+            self.pg1,
+            self.pg2,
+            socket.IPPROTO_UDP,
+            is_out=1,
+            priority=10,
+            policy_type="bypass",
+        )
         policy_12 = self.spd_add_rem_policy(  # outbound, priority 5
-            1, self.pg1, self.pg2, socket.IPPROTO_UDP,
-            is_out=1, priority=5, policy_type="discard")
+            1,
+            self.pg1,
+            self.pg2,
+            socket.IPPROTO_UDP,
+            is_out=1,
+            priority=5,
+            policy_type="discard",
+        )
 
         policy_21 = self.spd_add_rem_policy(  # outbound, priority 5
-            1, self.pg2, self.pg0, socket.IPPROTO_UDP,
-            is_out=1, priority=5, policy_type="bypass")
+            1,
+            self.pg2,
+            self.pg0,
+            socket.IPPROTO_UDP,
+            is_out=1,
+            priority=5,
+            policy_type="bypass",
+        )
         policy_22 = self.spd_add_rem_policy(  # outbound, priority 10
-            1, self.pg2, self.pg0, socket.IPPROTO_UDP,
-            is_out=1, priority=10, policy_type="discard")
+            1,
+            self.pg2,
+            self.pg0,
+            socket.IPPROTO_UDP,
+            is_out=1,
+            priority=10,
+            policy_type="discard",
+        )
 
         # interfaces bound to an SPD, will by default drop inbound
         # traffic with no matching policies. add catch-all inbound
         # bypass rule to SPD:
         self.spd_add_rem_policy(  # inbound, all interfaces
-            1, None, None, socket.IPPROTO_UDP, is_out=0, priority=10,
-            policy_type="bypass", all_ips=True)
+            1,
+            None,
+            None,
+            socket.IPPROTO_UDP,
+            is_out=0,
+            priority=10,
+            policy_type="bypass",
+            all_ips=True,
+        )
 
         # check flow cache is empty (0 active elements) before sending traffic
         self.verify_num_outbound_flow_cache_entries(0)
@@ -338,8 +436,7 @@ class IPSec4SpdTestCaseMultiple(SpdFlowCacheOutbound):
                 try:
                     self.logger.debug(ppp("SPD - Got packet:", packet))
                 except Exception:
-                    self.logger.error(
-                        ppp("Unexpected or invalid packet:", packet))
+                    self.logger.error(ppp("Unexpected or invalid packet:", packet))
                     raise
         self.logger.debug("SPD: Num packets: %s", len(if_caps[0].res))
         self.logger.debug("SPD: Num packets: %s", len(if_caps[1].res))
@@ -366,6 +463,7 @@ class IPSec4SpdTestCaseMultiple(SpdFlowCacheOutbound):
 class IPSec4SpdTestCaseOverwriteStale(SpdFlowCacheOutbound):
     """ IPSec/IPv4 outbound: Policy mode test case with flow cache \
         (overwrite stale entries)"""
+
     def test_ipsec_spd_outbound_overwrite(self):
         # The operation of the flow cache is setup so that the entire cache
         # is invalidated when adding or removing an SPD policy rule.
@@ -386,23 +484,48 @@ class IPSec4SpdTestCaseOverwriteStale(SpdFlowCacheOutbound):
         # add output rules on all interfaces
         # pg0 -> pg1
         policy_0 = self.spd_add_rem_policy(  # outbound
-            1, self.pg0, self.pg1, socket.IPPROTO_UDP,
-            is_out=1, priority=10, policy_type="bypass")
+            1,
+            self.pg0,
+            self.pg1,
+            socket.IPPROTO_UDP,
+            is_out=1,
+            priority=10,
+            policy_type="bypass",
+        )
         # pg1 -> pg2
         policy_1 = self.spd_add_rem_policy(  # outbound
-            1, self.pg1, self.pg2, socket.IPPROTO_UDP,
-            is_out=1, priority=10, policy_type="bypass")
+            1,
+            self.pg1,
+            self.pg2,
+            socket.IPPROTO_UDP,
+            is_out=1,
+            priority=10,
+            policy_type="bypass",
+        )
         # pg2 -> pg0
         policy_2 = self.spd_add_rem_policy(  # outbound
-            1, self.pg2, self.pg0, socket.IPPROTO_UDP,
-            is_out=1, priority=10, policy_type="discard")
+            1,
+            self.pg2,
+            self.pg0,
+            socket.IPPROTO_UDP,
+            is_out=1,
+            priority=10,
+            policy_type="discard",
+        )
 
         # interfaces bound to an SPD, will by default drop inbound
         # traffic with no matching policies. add catch-all inbound
         # bypass rule to SPD:
         self.spd_add_rem_policy(  # inbound, all interfaces
-            1, None, None, socket.IPPROTO_UDP, is_out=0, priority=10,
-            policy_type="bypass", all_ips=True)
+            1,
+            None,
+            None,
+            socket.IPPROTO_UDP,
+            is_out=0,
+            priority=10,
+            policy_type="bypass",
+            all_ips=True,
+        )
 
         # check flow cache is empty (0 active elements) before sending traffic
         self.verify_num_outbound_flow_cache_entries(0)
@@ -429,8 +552,7 @@ class IPSec4SpdTestCaseOverwriteStale(SpdFlowCacheOutbound):
                 try:
                     self.logger.debug(ppp("SPD Add - Got packet:", packet))
                 except Exception:
-                    self.logger.error(
-                        ppp("Unexpected or invalid packet:", packet))
+                    self.logger.error(ppp("Unexpected or invalid packet:", packet))
                     raise
 
         # verify captures that matched BYPASS rules
@@ -447,21 +569,39 @@ class IPSec4SpdTestCaseOverwriteStale(SpdFlowCacheOutbound):
 
         # adding an inbound policy should not invalidate output flow cache
         self.spd_add_rem_policy(  # inbound
-            1, self.pg0, self.pg1, socket.IPPROTO_UDP,
-            is_out=0, priority=10, policy_type="bypass")
+            1,
+            self.pg0,
+            self.pg1,
+            socket.IPPROTO_UDP,
+            is_out=0,
+            priority=10,
+            policy_type="bypass",
+        )
         # check flow cache counter has not been reset
         self.verify_num_outbound_flow_cache_entries(3)
 
         # remove a bypass policy - flow cache counter will be reset, and
         # there will be 3x stale entries in flow cache
         self.spd_add_rem_policy(  # outbound
-            1, self.pg0, self.pg1, socket.IPPROTO_UDP,
-            is_out=1, priority=10, policy_type="bypass",
-            remove=True)
+            1,
+            self.pg0,
+            self.pg1,
+            socket.IPPROTO_UDP,
+            is_out=1,
+            priority=10,
+            policy_type="bypass",
+            remove=True,
+        )
         # readd policy
         policy_0 = self.spd_add_rem_policy(  # outbound
-            1, self.pg0, self.pg1, socket.IPPROTO_UDP,
-            is_out=1, priority=10, policy_type="bypass")
+            1,
+            self.pg0,
+            self.pg1,
+            socket.IPPROTO_UDP,
+            is_out=1,
+            priority=10,
+            policy_type="bypass",
+        )
         # check counter was reset with flow cache invalidation
         self.verify_num_outbound_flow_cache_entries(0)
 
@@ -481,8 +621,7 @@ class IPSec4SpdTestCaseOverwriteStale(SpdFlowCacheOutbound):
                 try:
                     self.logger.debug(ppp("SPD Add - Got packet:", packet))
                 except Exception:
-                    self.logger.error(
-                        ppp("Unexpected or invalid packet:", packet))
+                    self.logger.error(ppp("Unexpected or invalid packet:", packet))
                     raise
 
         # verify captures that matched BYPASS rules
@@ -492,8 +631,8 @@ class IPSec4SpdTestCaseOverwriteStale(SpdFlowCacheOutbound):
         self.pg0.assert_nothing_captured()
         # verify all policies matched the expected number of times
         self.verify_policy_match(pkt_count, policy_0)
-        self.verify_policy_match(pkt_count*2, policy_1)
-        self.verify_policy_match(pkt_count*2, policy_2)
+        self.verify_policy_match(pkt_count * 2, policy_1)
+        self.verify_policy_match(pkt_count * 2, policy_2)
         # we are overwriting 3x stale entries - check flow cache counter
         # is correct
         self.verify_num_outbound_flow_cache_entries(3)
@@ -502,18 +641,23 @@ class IPSec4SpdTestCaseOverwriteStale(SpdFlowCacheOutbound):
 class IPSec4SpdTestCaseCollision(SpdFlowCacheOutbound):
     """ IPSec/IPv4 outbound: Policy mode test case with flow cache \
         (hash collision)"""
+
     # Override class setup to restrict vector size to 16 elements.
     # This forces using only the lower 4 bits of the hash as a key,
     # making hash collisions easy to find.
     @classmethod
     def setUpConstants(cls):
         super(SpdFlowCacheOutbound, cls).setUpConstants()
-        cls.vpp_cmdline.extend(["ipsec", "{",
-                                "ipv4-outbound-spd-flow-cache on",
-                                "ipv4-outbound-spd-hash-buckets 16",
-                                "}"])
-        cls.logger.info("VPP modified cmdline is %s" % " "
-                        .join(cls.vpp_cmdline))
+        cls.vpp_cmdline.extend(
+            [
+                "ipsec",
+                "{",
+                "ipv4-outbound-spd-flow-cache on",
+                "ipv4-outbound-spd-hash-buckets 16",
+                "}",
+            ]
+        )
+        cls.logger.info("VPP modified cmdline is %s" % " ".join(cls.vpp_cmdline))
 
     def test_ipsec_spd_outbound_collision(self):
         # The flow cache operation is setup to overwrite an entry
@@ -535,18 +679,37 @@ class IPSec4SpdTestCaseCollision(SpdFlowCacheOutbound):
         self.spd_create_and_intf_add(1, self.pg_interfaces)
         # add rules
         policy_0 = self.spd_add_rem_policy(  # outbound, priority 10
-            1, self.pg1, self.pg2, socket.IPPROTO_UDP,
-            is_out=1, priority=10, policy_type="bypass")
+            1,
+            self.pg1,
+            self.pg2,
+            socket.IPPROTO_UDP,
+            is_out=1,
+            priority=10,
+            policy_type="bypass",
+        )
         policy_1 = self.spd_add_rem_policy(  # outbound, priority 10
-            1, self.pg2, self.pg0, socket.IPPROTO_UDP,
-            is_out=1, priority=10, policy_type="bypass")
+            1,
+            self.pg2,
+            self.pg0,
+            socket.IPPROTO_UDP,
+            is_out=1,
+            priority=10,
+            policy_type="bypass",
+        )
 
         # interfaces bound to an SPD, will by default drop inbound
         # traffic with no matching policies. add catch-all inbound
         # bypass rule to SPD:
         self.spd_add_rem_policy(  # inbound, all interfaces
-            1, None, None, socket.IPPROTO_UDP, is_out=0, priority=10,
-            policy_type="bypass", all_ips=True)
+            1,
+            None,
+            None,
+            socket.IPPROTO_UDP,
+            is_out=0,
+            priority=10,
+            policy_type="bypass",
+            all_ips=True,
+        )
 
         # check flow cache is empty (0 active elements) before sending traffic
         self.verify_num_outbound_flow_cache_entries(0)
@@ -580,11 +743,9 @@ class IPSec4SpdTestCaseCollision(SpdFlowCacheOutbound):
             if_caps.append(pg.get_capture())
             for packet in if_caps[-1]:
                 try:
-                    self.logger.debug(ppp(
-                        "SPD - Got packet:", packet))
+                    self.logger.debug(ppp("SPD - Got packet:", packet))
                 except Exception:
-                    self.logger.error(ppp(
-                        "Unexpected or invalid packet:", packet))
+                    self.logger.error(ppp("Unexpected or invalid packet:", packet))
                     raise
         self.logger.debug("SPD: Num packets: %s", len(if_caps[0].res))
         self.logger.debug("SPD: Num packets: %s", len(if_caps[1].res))
@@ -600,5 +761,5 @@ class IPSec4SpdTestCaseCollision(SpdFlowCacheOutbound):
         self.verify_num_outbound_flow_cache_entries(1)
 
 
-if __name__ == '__main__':
+if __name__ == "__main__":
     unittest.main(testRunner=VppTestRunner)