tests: replace pycodestyle with black
[vpp.git] / test / test_ip_ecmp.py
index 1d3e872..d5347db 100644 (file)
@@ -26,7 +26,7 @@ N_PKTS_IN_STREAM = 300
 
 
 class TestECMP(VppTestCase):
-    """ Equal-cost multi-path routing Test Case """
+    """Equal-cost multi-path routing Test Case"""
 
     @classmethod
     def setUpClass(cls):
@@ -92,11 +92,11 @@ class TestECMP(VppTestCase):
             ip_addr = IPv6Address(text_type(ip_addr_start))
             ip_max_len = 128
 
-        return str(ip_addr +
-                   random.randint(0, 2 ** (ip_max_len - ip_prefix_len) - 2))
+        return str(ip_addr + random.randint(0, 2 ** (ip_max_len - ip_prefix_len) - 2))
 
-    def create_stream(self, src_if, src_ip_start, dst_ip_start,
-                      ip_prefix_len, packet_sizes, ip_l=IP):
+    def create_stream(
+        self, src_if, src_ip_start, dst_ip_start, ip_prefix_len, packet_sizes, ip_l=IP
+    ):
         """Create input packet stream for defined interfaces.
 
         :param VppInterface src_if: Source Interface for packet stream.
@@ -112,10 +112,12 @@ class TestECMP(VppTestCase):
             payload = self.info_to_payload(info)
             src_ip = self.get_ip_address(src_ip_start, ip_prefix_len)
             dst_ip = self.get_ip_address(dst_ip_start, ip_prefix_len)
-            p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
-                 ip_l(src=src_ip, dst=dst_ip) /
-                 UDP(sport=1234, dport=1234) /
-                 Raw(payload))
+            p = (
+                Ether(dst=src_if.local_mac, src=src_if.remote_mac)
+                / ip_l(src=src_ip, dst=dst_ip)
+                / UDP(sport=1234, dport=1234)
+                / Raw(payload)
+            )
             info.data = p.copy()
             size = random.choice(packet_sizes)
             self.extend_packet(p, size)
@@ -142,12 +144,17 @@ class TestECMP(VppTestCase):
                 payload_info = self.payload_to_info(packet[Raw])
                 packet_index = payload_info.index
                 ip_sent = self._packet_infos[packet_index].data[ip_l]
-                self.logger.debug("Got packet on port %s: src=%u (id=%u)" %
-                                  (rx_if.name, payload_info.src, packet_index))
+                self.logger.debug(
+                    "Got packet on port %s: src=%u (id=%u)"
+                    % (rx_if.name, payload_info.src, packet_index)
+                )
                 # Check standard fields
-                self.assertIn(packet.dst, rx_if._hosts_by_mac,
-                              "Destination MAC address %s shouldn't be routed "
-                              "via interface %s" % (packet.dst, rx_if.name))
+                self.assertIn(
+                    packet.dst,
+                    rx_if._hosts_by_mac,
+                    "Destination MAC address %s shouldn't be routed "
+                    "via interface %s" % (packet.dst, rx_if.name),
+                )
                 self.assertEqual(packet.src, rx_if.local_mac)
                 self.assertEqual(ip_received.src, ip_sent.src)
                 self.assertEqual(ip_received.dst, ip_sent.dst)
@@ -161,13 +168,15 @@ class TestECMP(VppTestCase):
         # We expect packet routed via all host of pg interface
         for host_mac in host_counters:
             nr = host_counters[host_mac]
-            self.assertNotEqual(
-                nr, 0, "No packet routed via host %s" % host_mac)
-            self.logger.info("%u packets routed via host %s of %s interface" %
-                             (nr, host_mac, rx_if.name))
+            self.assertNotEqual(nr, 0, "No packet routed via host %s" % host_mac)
+            self.logger.info(
+                "%u packets routed via host %s of %s interface"
+                % (nr, host_mac, rx_if.name)
+            )
             count += nr
-        self.logger.info("Total amount of %u packets routed via %s interface" %
-                         (count, rx_if.name))
+        self.logger.info(
+            "Total amount of %u packets routed via %s interface" % (count, rx_if.name)
+        )
 
         return count
 
@@ -184,28 +193,27 @@ class TestECMP(VppTestCase):
         for pg_if in self.pg_interfaces[1:]:
             for nh_host in pg_if.remote_hosts:
                 nh_host_ip = nh_host.ip4 if is_ipv6 == 0 else nh_host.ip6
-                paths.append(VppRoutePath(nh_host_ip,
-                                          pg_if.sw_if_index))
+                paths.append(VppRoutePath(nh_host_ip, pg_if.sw_if_index))
 
         rip = VppIpRoute(self, dst_ip_net, dst_prefix_len, paths)
         rip.add_vpp_config()
-        self.logger.info("Route via %s on %s created" %
-                         (nh_host_ip, pg_if.name))
+        self.logger.info("Route via %s on %s created" % (nh_host_ip, pg_if.name))
 
         self.logger.debug(self.vapi.ppcli("show ip fib"))
         self.logger.debug(self.vapi.ppcli("show ip6 fib"))
 
     def test_ip_ecmp(self):
-        """ IP equal-cost multi-path routing test """
+        """IP equal-cost multi-path routing test"""
 
-        src_ip_net = '16.0.0.1'
-        dst_ip_net = '32.0.0.1'
+        src_ip_net = "16.0.0.1"
+        dst_ip_net = "32.0.0.1"
         ip_prefix_len = 24
 
         self.create_ip_routes(dst_ip_net, ip_prefix_len)
 
-        pkts = self.create_stream(self.pg0, src_ip_net, dst_ip_net,
-                                  ip_prefix_len, self.pg_if_packet_sizes)
+        pkts = self.create_stream(
+            self.pg0, src_ip_net, dst_ip_net, ip_prefix_len, self.pg_if_packet_sizes
+        )
         self.pg0.add_stream(pkts)
 
         self.pg_enable_capture(self.pg_interfaces)
@@ -216,7 +224,8 @@ class TestECMP(VppTestCase):
         for pg_if in self.pg_interfaces[1:]:
             capture = pg_if._get_capture(timeout=1)
             self.assertNotEqual(
-                len(capture), 0, msg="No packets captured on %s" % pg_if.name)
+                len(capture), 0, msg="No packets captured on %s" % pg_if.name
+            )
             rx_count += self.verify_capture(pg_if, capture)
         self.pg0.assert_nothing_captured(remark="IP packets forwarded on pg0")
 
@@ -224,17 +233,22 @@ class TestECMP(VppTestCase):
         self.assertEqual(rx_count, len(pkts))
 
     def test_ip6_ecmp(self):
-        """ IPv6 equal-cost multi-path routing test """
+        """IPv6 equal-cost multi-path routing test"""
 
-        src_ip_net = '3ffe:51::1'
-        dst_ip_net = '3ffe:71::1'
+        src_ip_net = "3ffe:51::1"
+        dst_ip_net = "3ffe:71::1"
         ip_prefix_len = 64
 
         self.create_ip_routes(dst_ip_net, ip_prefix_len, is_ipv6=1)
 
         pkts = self.create_stream(
-            self.pg0, src_ip_net, dst_ip_net,
-            ip_prefix_len, self.pg_if_packet_sizes, ip_l=IPv6)
+            self.pg0,
+            src_ip_net,
+            dst_ip_net,
+            ip_prefix_len,
+            self.pg_if_packet_sizes,
+            ip_l=IPv6,
+        )
         self.pg0.add_stream(pkts)
 
         self.pg_enable_capture(self.pg_interfaces)
@@ -245,7 +259,8 @@ class TestECMP(VppTestCase):
         for pg_if in self.pg_interfaces[1:]:
             capture = pg_if._get_capture(timeout=1)
             self.assertNotEqual(
-                len(capture), 0, msg="No packets captured on %s" % pg_if.name)
+                len(capture), 0, msg="No packets captured on %s" % pg_if.name
+            )
             rx_count += self.verify_capture(pg_if, capture, ip_l=IPv6)
         self.pg0.assert_nothing_captured(remark="IP packets forwarded on pg0")
 
@@ -253,5 +268,5 @@ class TestECMP(VppTestCase):
         self.assertEqual(rx_count, len(pkts))
 
 
-if __name__ == '__main__':
+if __name__ == "__main__":
     unittest.main(testRunner=VppTestRunner)