tests: replace pycodestyle with black
[vpp.git] / test / test_ipsec_spd_flow_cache_input.py
index 2d70d15..02ecb56 100644 (file)
@@ -12,16 +12,14 @@ class SpdFlowCacheInbound(SpdFlowCacheTemplate):
     @classmethod
     def setUpConstants(cls):
         super(SpdFlowCacheInbound, cls).setUpConstants()
-        cls.vpp_cmdline.extend(["ipsec", "{",
-                                "ipv4-inbound-spd-flow-cache on",
-                                "}"])
-        cls.logger.info("VPP modified cmdline is %s" % " "
-                        .join(cls.vpp_cmdline))
+        cls.vpp_cmdline.extend(["ipsec", "{", "ipv4-inbound-spd-flow-cache on", "}"])
+        cls.logger.info("VPP modified cmdline is %s" % " ".join(cls.vpp_cmdline))
 
 
 class IPSec4SpdTestCaseBypass(SpdFlowCacheInbound):
     """ IPSec/IPv4 inbound: Policy mode test case with flow cache \
         (add bypass)"""
+
     def test_ipsec_spd_inbound_bypass(self):
         # In this test case, packets in IPv4 FWD path are configured
         # to go through IPSec inbound SPD policy lookup.
@@ -42,16 +40,34 @@ class IPSec4SpdTestCaseBypass(SpdFlowCacheInbound):
         # bypass rule should take precedence over discard rule,
         # even though it's lower priority
         policy_0 = self.spd_add_rem_policy(  # inbound, priority 10
-            1, self.pg1, self.pg0, socket.IPPROTO_UDP,
-            is_out=0, priority=10, policy_type="bypass")
+            1,
+            self.pg1,
+            self.pg0,
+            socket.IPPROTO_UDP,
+            is_out=0,
+            priority=10,
+            policy_type="bypass",
+        )
         policy_1 = self.spd_add_rem_policy(  # inbound, priority 15
-            1, self.pg1, self.pg0, socket.IPPROTO_UDP,
-            is_out=0, priority=15, policy_type="discard")
+            1,
+            self.pg1,
+            self.pg0,
+            socket.IPPROTO_UDP,
+            is_out=0,
+            priority=15,
+            policy_type="discard",
+        )
 
         # create output rule so we can capture forwarded packets
         policy_2 = self.spd_add_rem_policy(  # outbound, priority 10
-            1, self.pg0, self.pg1, socket.IPPROTO_UDP,
-            is_out=1, priority=10, policy_type="bypass")
+            1,
+            self.pg0,
+            self.pg1,
+            socket.IPPROTO_UDP,
+            is_out=1,
+            priority=10,
+            policy_type="bypass",
+        )
 
         # check flow cache is empty before sending traffic
         self.verify_num_inbound_flow_cache_entries(0)
@@ -85,6 +101,7 @@ class IPSec4SpdTestCaseBypass(SpdFlowCacheInbound):
 class IPSec4SpdTestCaseDiscard(SpdFlowCacheInbound):
     """ IPSec/IPv4 inbound: Policy mode test case with flow cache \
         (add discard)"""
+
     def test_ipsec_spd_inbound_discard(self):
         # In this test case, packets in IPv4 FWD path are configured
         # to go through IPSec inbound SPD policy lookup.
@@ -96,13 +113,25 @@ class IPSec4SpdTestCaseDiscard(SpdFlowCacheInbound):
 
         # create input rule
         policy_0 = self.spd_add_rem_policy(  # inbound, priority 10
-            1, self.pg1, self.pg0, socket.IPPROTO_UDP,
-            is_out=0, priority=10, policy_type="discard")
+            1,
+            self.pg1,
+            self.pg0,
+            socket.IPPROTO_UDP,
+            is_out=0,
+            priority=10,
+            policy_type="discard",
+        )
 
         # create output rule so we can capture forwarded packets
         policy_1 = self.spd_add_rem_policy(  # outbound, priority 10
-            1, self.pg0, self.pg1, socket.IPPROTO_UDP,
-            is_out=1, priority=10, policy_type="bypass")
+            1,
+            self.pg0,
+            self.pg1,
+            socket.IPPROTO_UDP,
+            is_out=1,
+            priority=10,
+            policy_type="bypass",
+        )
 
         # check flow cache is empty before sending traffic
         self.verify_num_inbound_flow_cache_entries(0)
@@ -124,6 +153,7 @@ class IPSec4SpdTestCaseDiscard(SpdFlowCacheInbound):
 class IPSec4SpdTestCaseRemove(SpdFlowCacheInbound):
     """ IPSec/IPv4 inbound: Policy mode test case with flow cache \
         (remove bypass)"""
+
     def test_ipsec_spd_inbound_remove(self):
         # In this test case, packets in IPv4 FWD path are configured
         # to go through IPSec inbound SPD policy lookup.
@@ -147,16 +177,34 @@ class IPSec4SpdTestCaseRemove(SpdFlowCacheInbound):
         # bypass rule should take precedence over discard rule,
         # even though it's lower priority
         policy_0 = self.spd_add_rem_policy(  # inbound, priority 10
-            1, self.pg1, self.pg0, socket.IPPROTO_UDP,
-            is_out=0, priority=10, policy_type="bypass")
+            1,
+            self.pg1,
+            self.pg0,
+            socket.IPPROTO_UDP,
+            is_out=0,
+            priority=10,
+            policy_type="bypass",
+        )
         policy_1 = self.spd_add_rem_policy(  # inbound, priority 15
-            1, self.pg1, self.pg0, socket.IPPROTO_UDP,
-            is_out=0, priority=15, policy_type="discard")
+            1,
+            self.pg1,
+            self.pg0,
+            socket.IPPROTO_UDP,
+            is_out=0,
+            priority=15,
+            policy_type="discard",
+        )
 
         # create output rule so we can capture forwarded packets
         policy_2 = self.spd_add_rem_policy(  # outbound, priority 10
-            1, self.pg0, self.pg1, socket.IPPROTO_UDP,
-            is_out=1, priority=10, policy_type="bypass")
+            1,
+            self.pg0,
+            self.pg1,
+            socket.IPPROTO_UDP,
+            is_out=1,
+            priority=10,
+            policy_type="bypass",
+        )
 
         # check flow cache is empty before sending traffic
         self.verify_num_inbound_flow_cache_entries(0)
@@ -188,9 +236,15 @@ class IPSec4SpdTestCaseRemove(SpdFlowCacheInbound):
 
         # remove the input bypass rule
         self.spd_add_rem_policy(  # inbound, priority 10
-            1, self.pg1, self.pg0, socket.IPPROTO_UDP,
-            is_out=0, priority=10, policy_type="bypass",
-            remove=True)
+            1,
+            self.pg1,
+            self.pg0,
+            socket.IPPROTO_UDP,
+            is_out=0,
+            priority=10,
+            policy_type="bypass",
+            remove=True,
+        )
         # verify flow cache counter has been reset by rule removal
         self.verify_num_inbound_flow_cache_entries(0)
 
@@ -213,6 +267,7 @@ class IPSec4SpdTestCaseRemove(SpdFlowCacheInbound):
 class IPSec4SpdTestCaseReadd(SpdFlowCacheInbound):
     """ IPSec/IPv4 inbound: Policy mode test case with flow cache \
         (add, remove, re-add bypass)"""
+
     def test_ipsec_spd_inbound_readd(self):
         # In this test case, packets in IPv4 FWD path are configured
         # to go through IPSec inbound SPD policy lookup.
@@ -239,16 +294,34 @@ class IPSec4SpdTestCaseReadd(SpdFlowCacheInbound):
         # bypass rule should take precedence over discard rule,
         # even though it's lower priority
         policy_0 = self.spd_add_rem_policy(  # inbound, priority 10
-            1, self.pg1, self.pg0, socket.IPPROTO_UDP,
-            is_out=0, priority=10, policy_type="bypass")
+            1,
+            self.pg1,
+            self.pg0,
+            socket.IPPROTO_UDP,
+            is_out=0,
+            priority=10,
+            policy_type="bypass",
+        )
         policy_1 = self.spd_add_rem_policy(  # inbound, priority 15
-            1, self.pg1, self.pg0, socket.IPPROTO_UDP,
-            is_out=0, priority=15, policy_type="discard")
+            1,
+            self.pg1,
+            self.pg0,
+            socket.IPPROTO_UDP,
+            is_out=0,
+            priority=15,
+            policy_type="discard",
+        )
 
         # create output rule so we can capture forwarded packets
         policy_2 = self.spd_add_rem_policy(  # outbound, priority 10
-            1, self.pg0, self.pg1, socket.IPPROTO_UDP,
-            is_out=1, priority=10, policy_type="bypass")
+            1,
+            self.pg0,
+            self.pg1,
+            socket.IPPROTO_UDP,
+            is_out=1,
+            priority=10,
+            policy_type="bypass",
+        )
 
         # check flow cache is empty before sending traffic
         self.verify_num_inbound_flow_cache_entries(0)
@@ -280,9 +353,15 @@ class IPSec4SpdTestCaseReadd(SpdFlowCacheInbound):
 
         # remove the input bypass rule
         self.spd_add_rem_policy(  # inbound, priority 10
-            1, self.pg1, self.pg0, socket.IPPROTO_UDP,
-            is_out=0, priority=10, policy_type="bypass",
-            remove=True)
+            1,
+            self.pg1,
+            self.pg0,
+            socket.IPPROTO_UDP,
+            is_out=0,
+            priority=10,
+            policy_type="bypass",
+            remove=True,
+        )
         # verify flow cache counter has been reset by rule removal
         self.verify_num_inbound_flow_cache_entries(0)
 
@@ -303,8 +382,14 @@ class IPSec4SpdTestCaseReadd(SpdFlowCacheInbound):
 
         # readd the input bypass rule
         policy_0 = self.spd_add_rem_policy(  # inbound, priority 10
-            1, self.pg1, self.pg0, socket.IPPROTO_UDP,
-            is_out=0, priority=10, policy_type="bypass")
+            1,
+            self.pg1,
+            self.pg0,
+            socket.IPPROTO_UDP,
+            is_out=0,
+            priority=10,
+            policy_type="bypass",
+        )
         # verify flow cache counter has been reset by rule addition
         self.verify_num_inbound_flow_cache_entries(0)
 
@@ -327,7 +412,7 @@ class IPSec4SpdTestCaseReadd(SpdFlowCacheInbound):
         # verify all policies matched the expected number of times
         self.verify_policy_match(pkt_count, policy_0)
         self.verify_policy_match(pkt_count, policy_1)
-        self.verify_policy_match(pkt_count*2, policy_2)
+        self.verify_policy_match(pkt_count * 2, policy_2)
         # by readding the bypass rule, we reset the flow cache
         # we only expect the bypass rule to now be in the flow cache
         self.verify_num_inbound_flow_cache_entries(1)
@@ -336,6 +421,7 @@ class IPSec4SpdTestCaseReadd(SpdFlowCacheInbound):
 class IPSec4SpdTestCaseMultiple(SpdFlowCacheInbound):
     """ IPSec/IPv4 inbound: Policy mode test case with flow cache \
         (multiple interfaces, multiple rules)"""
+
     def test_ipsec_spd_inbound_multiple(self):
         # In this test case, packets in IPv4 FWD path are configured to go
         # through IPSec outbound SPD policy lookup.
@@ -353,23 +439,47 @@ class IPSec4SpdTestCaseMultiple(SpdFlowCacheInbound):
         # add input rules on all interfaces
         # pg0 -> pg1
         policy_0 = self.spd_add_rem_policy(  # inbound, priority 10
-            1, self.pg1, self.pg0, socket.IPPROTO_UDP,
-            is_out=0, priority=10, policy_type="bypass")
+            1,
+            self.pg1,
+            self.pg0,
+            socket.IPPROTO_UDP,
+            is_out=0,
+            priority=10,
+            policy_type="bypass",
+        )
         # pg1 -> pg2
         policy_1 = self.spd_add_rem_policy(  # inbound, priority 10
-            1, self.pg2, self.pg1, socket.IPPROTO_UDP,
-            is_out=0, priority=10, policy_type="bypass")
+            1,
+            self.pg2,
+            self.pg1,
+            socket.IPPROTO_UDP,
+            is_out=0,
+            priority=10,
+            policy_type="bypass",
+        )
         # pg2 -> pg0
         policy_2 = self.spd_add_rem_policy(  # inbound, priority 10
-            1, self.pg0, self.pg2, socket.IPPROTO_UDP,
-            is_out=0, priority=10, policy_type="discard")
+            1,
+            self.pg0,
+            self.pg2,
+            socket.IPPROTO_UDP,
+            is_out=0,
+            priority=10,
+            policy_type="discard",
+        )
 
         # create output rules covering the the full ip range
         # 0.0.0.0 -> 255.255.255.255, so we can capture forwarded packets
         policy_3 = self.spd_add_rem_policy(  # outbound, priority 10
-            1, self.pg0, self.pg0, socket.IPPROTO_UDP,
-            is_out=1, priority=10, policy_type="bypass",
-            all_ips=True)
+            1,
+            self.pg0,
+            self.pg0,
+            socket.IPPROTO_UDP,
+            is_out=1,
+            priority=10,
+            policy_type="bypass",
+            all_ips=True,
+        )
 
         # check flow cache is empty (0 active elements) before sending traffic
         self.verify_num_inbound_flow_cache_entries(0)
@@ -396,8 +506,7 @@ class IPSec4SpdTestCaseMultiple(SpdFlowCacheInbound):
                 try:
                     self.logger.debug(ppp("SPD Add - Got packet:", packet))
                 except Exception:
-                    self.logger.error(
-                        ppp("Unexpected or invalid packet:", packet))
+                    self.logger.error(ppp("Unexpected or invalid packet:", packet))
                     raise
 
         # verify captures that matched BYPASS rules
@@ -416,6 +525,7 @@ class IPSec4SpdTestCaseMultiple(SpdFlowCacheInbound):
 class IPSec4SpdTestCaseOverwriteStale(SpdFlowCacheInbound):
     """ IPSec/IPv4 inbound: Policy mode test case with flow cache \
         (overwrite stale entries)"""
+
     def test_ipsec_spd_inbound_overwrite(self):
         # The operation of the flow cache is setup so that the entire cache
         # is invalidated when adding or removing an SPD policy rule.
@@ -436,23 +546,47 @@ class IPSec4SpdTestCaseOverwriteStale(SpdFlowCacheInbound):
         # add input rules on all interfaces
         # pg0 -> pg1
         policy_0 = self.spd_add_rem_policy(  # inbound
-            1, self.pg1, self.pg0, socket.IPPROTO_UDP,
-            is_out=0, priority=10, policy_type="bypass")
+            1,
+            self.pg1,
+            self.pg0,
+            socket.IPPROTO_UDP,
+            is_out=0,
+            priority=10,
+            policy_type="bypass",
+        )
         # pg1 -> pg2
         policy_1 = self.spd_add_rem_policy(  # inbound
-            1, self.pg2, self.pg1, socket.IPPROTO_UDP,
-            is_out=0, priority=10, policy_type="bypass")
+            1,
+            self.pg2,
+            self.pg1,
+            socket.IPPROTO_UDP,
+            is_out=0,
+            priority=10,
+            policy_type="bypass",
+        )
         # pg2 -> pg0
         policy_2 = self.spd_add_rem_policy(  # inbound
-            1, self.pg0, self.pg2, socket.IPPROTO_UDP,
-            is_out=0, priority=10, policy_type="discard")
+            1,
+            self.pg0,
+            self.pg2,
+            socket.IPPROTO_UDP,
+            is_out=0,
+            priority=10,
+            policy_type="discard",
+        )
 
         # create output rules covering the the full ip range
         # 0.0.0.0 -> 255.255.255.255, so we can capture forwarded packets
         policy_3 = self.spd_add_rem_policy(  # outbound
-            1, self.pg0, self.pg0, socket.IPPROTO_UDP,
-            is_out=1, priority=10, policy_type="bypass",
-            all_ips=True)
+            1,
+            self.pg0,
+            self.pg0,
+            socket.IPPROTO_UDP,
+            is_out=1,
+            priority=10,
+            policy_type="bypass",
+            all_ips=True,
+        )
 
         # check flow cache is empty (0 active elements) before sending traffic
         self.verify_num_inbound_flow_cache_entries(0)
@@ -479,8 +613,7 @@ class IPSec4SpdTestCaseOverwriteStale(SpdFlowCacheInbound):
                 try:
                     self.logger.debug(ppp("SPD Add - Got packet:", packet))
                 except Exception:
-                    self.logger.error(
-                        ppp("Unexpected or invalid packet:", packet))
+                    self.logger.error(ppp("Unexpected or invalid packet:", packet))
                     raise
 
         # verify captures that matched BYPASS rules
@@ -497,22 +630,40 @@ class IPSec4SpdTestCaseOverwriteStale(SpdFlowCacheInbound):
 
         # adding an outbound policy should not invalidate output flow cache
         self.spd_add_rem_policy(  # outbound
-            1, self.pg0, self.pg0, socket.IPPROTO_UDP,
-            is_out=1, priority=1, policy_type="bypass",
-            all_ips=True)
+            1,
+            self.pg0,
+            self.pg0,
+            socket.IPPROTO_UDP,
+            is_out=1,
+            priority=1,
+            policy_type="bypass",
+            all_ips=True,
+        )
         # check inbound flow cache counter has not been reset
         self.verify_num_inbound_flow_cache_entries(3)
 
         # remove + readd bypass policy - flow cache counter will be reset,
         # and there will be 3x stale entries in flow cache
         self.spd_add_rem_policy(  # inbound, priority 10
-            1, self.pg1, self.pg0, socket.IPPROTO_UDP,
-            is_out=0, priority=10, policy_type="bypass",
-            remove=True)
+            1,
+            self.pg1,
+            self.pg0,
+            socket.IPPROTO_UDP,
+            is_out=0,
+            priority=10,
+            policy_type="bypass",
+            remove=True,
+        )
         # readd policy
         policy_0 = self.spd_add_rem_policy(  # inbound, priority 10
-            1, self.pg1, self.pg0, socket.IPPROTO_UDP,
-            is_out=0, priority=10, policy_type="bypass")
+            1,
+            self.pg1,
+            self.pg0,
+            socket.IPPROTO_UDP,
+            is_out=0,
+            priority=10,
+            policy_type="bypass",
+        )
         # check counter was reset
         self.verify_num_inbound_flow_cache_entries(0)
 
@@ -532,8 +683,7 @@ class IPSec4SpdTestCaseOverwriteStale(SpdFlowCacheInbound):
                 try:
                     self.logger.debug(ppp("SPD Add - Got packet:", packet))
                 except Exception:
-                    self.logger.error(
-                        ppp("Unexpected or invalid packet:", packet))
+                    self.logger.error(ppp("Unexpected or invalid packet:", packet))
                     raise
 
         # verify captures that matched BYPASS rules
@@ -543,8 +693,8 @@ class IPSec4SpdTestCaseOverwriteStale(SpdFlowCacheInbound):
         self.pg0.assert_nothing_captured()
         # verify all policies matched the expected number of times
         self.verify_policy_match(pkt_count, policy_0)
-        self.verify_policy_match(pkt_count*2, policy_1)
-        self.verify_policy_match(pkt_count*2, policy_2)
+        self.verify_policy_match(pkt_count * 2, policy_1)
+        self.verify_policy_match(pkt_count * 2, policy_2)
         # we are overwriting 3x stale entries - check flow cache counter
         # is correct
         self.verify_num_inbound_flow_cache_entries(3)
@@ -553,18 +703,23 @@ class IPSec4SpdTestCaseOverwriteStale(SpdFlowCacheInbound):
 class IPSec4SpdTestCaseCollision(SpdFlowCacheInbound):
     """ IPSec/IPv4 inbound: Policy mode test case with flow cache \
         (hash collision)"""
+
     # Override class setup to restrict hash table size to 16 buckets.
     # This forces using only the lower 4 bits of the hash as a key,
     # making hash collisions easy to find.
     @classmethod
     def setUpConstants(cls):
         super(SpdFlowCacheInbound, cls).setUpConstants()
-        cls.vpp_cmdline.extend(["ipsec", "{",
-                                "ipv4-inbound-spd-flow-cache on",
-                                "ipv4-inbound-spd-hash-buckets 16",
-                                "}"])
-        cls.logger.info("VPP modified cmdline is %s" % " "
-                        .join(cls.vpp_cmdline))
+        cls.vpp_cmdline.extend(
+            [
+                "ipsec",
+                "{",
+                "ipv4-inbound-spd-flow-cache on",
+                "ipv4-inbound-spd-hash-buckets 16",
+                "}",
+            ]
+        )
+        cls.logger.info("VPP modified cmdline is %s" % " ".join(cls.vpp_cmdline))
 
     def test_ipsec_spd_inbound_collision(self):
         # The flow cache operation is setup to overwrite an entry
@@ -588,20 +743,38 @@ class IPSec4SpdTestCaseCollision(SpdFlowCacheInbound):
         # create output rules covering the the full ip range
         # 0.0.0.0 -> 255.255.255.255, so we can capture forwarded packets
         policy_0 = self.spd_add_rem_policy(  # outbound
-            1, self.pg0, self.pg0, socket.IPPROTO_UDP,
-            is_out=1, priority=10, policy_type="bypass",
-            all_ips=True)
+            1,
+            self.pg0,
+            self.pg0,
+            socket.IPPROTO_UDP,
+            is_out=1,
+            priority=10,
+            policy_type="bypass",
+            all_ips=True,
+        )
 
         capture_intfs = []
         if self.crc32_supported():  # create crc32 collision on last 4 bits
             hashed_with_crc32 = True
             # add matching rules
             policy_1 = self.spd_add_rem_policy(  # inbound, priority 10
-                1, self.pg1, self.pg2, socket.IPPROTO_UDP,
-                is_out=0, priority=10, policy_type="bypass")
+                1,
+                self.pg1,
+                self.pg2,
+                socket.IPPROTO_UDP,
+                is_out=0,
+                priority=10,
+                policy_type="bypass",
+            )
             policy_2 = self.spd_add_rem_policy(  # inbound, priority 10
-                1, self.pg3, self.pg0, socket.IPPROTO_UDP,
-                is_out=0, priority=10, policy_type="bypass")
+                1,
+                self.pg3,
+                self.pg0,
+                socket.IPPROTO_UDP,
+                is_out=0,
+                priority=10,
+                policy_type="bypass",
+            )
 
             # we expect to get captures on pg1 + pg3
             capture_intfs.append(self.pg1)
@@ -623,11 +796,23 @@ class IPSec4SpdTestCaseCollision(SpdFlowCacheInbound):
             hashed_with_crc32 = False
             # add matching rules
             policy_1 = self.spd_add_rem_policy(  # inbound, priority 10
-                1, self.pg1, self.pg2, socket.IPPROTO_UDP,
-                is_out=0, priority=10, policy_type="bypass")
+                1,
+                self.pg1,
+                self.pg2,
+                socket.IPPROTO_UDP,
+                is_out=0,
+                priority=10,
+                policy_type="bypass",
+            )
             policy_2 = self.spd_add_rem_policy(  # inbound, priority 10
-                1, self.pg2, self.pg3, socket.IPPROTO_UDP,
-                is_out=0, priority=10, policy_type="bypass")
+                1,
+                self.pg2,
+                self.pg3,
+                socket.IPPROTO_UDP,
+                is_out=0,
+                priority=10,
+                policy_type="bypass",
+            )
 
             capture_intfs.append(self.pg1)
             capture_intfs.append(self.pg2)
@@ -655,15 +840,13 @@ class IPSec4SpdTestCaseCollision(SpdFlowCacheInbound):
             if_caps.append(pg.get_capture())
             for packet in if_caps[-1]:
                 try:
-                    self.logger.debug(ppp(
-                        "SPD Add - Got packet:", packet))
+                    self.logger.debug(ppp("SPD Add - Got packet:", packet))
                 except Exception:
-                    self.logger.error(ppp(
-                        "Unexpected or invalid packet:", packet))
+                    self.logger.error(ppp("Unexpected or invalid packet:", packet))
                     raise
 
         # verify captures that matched BYPASS rule
-        if(hashed_with_crc32):
+        if hashed_with_crc32:
             self.verify_capture(self.pg2, self.pg1, if_caps[0])
             self.verify_capture(self.pg0, self.pg3, if_caps[1])
         else:  # hashed with xxhash
@@ -673,11 +856,11 @@ class IPSec4SpdTestCaseCollision(SpdFlowCacheInbound):
         # verify all policies matched the expected number of times
         self.verify_policy_match(pkt_count, policy_1)
         self.verify_policy_match(pkt_count, policy_2)
-        self.verify_policy_match(pkt_count*2, policy_0)  # output policy
+        self.verify_policy_match(pkt_count * 2, policy_0)  # output policy
         # we have matched 2 policies, but due to the hash collision
         # one active entry is expected
         self.verify_num_inbound_flow_cache_entries(1)
 
 
-if __name__ == '__main__':
+if __name__ == "__main__":
     unittest.main(testRunner=VppTestRunner)