tests: replace pycodestyle with black
[vpp.git] / test / bfd.py
index bbfa594..4189983 100644 (file)
@@ -5,15 +5,22 @@ from socket import AF_INET, AF_INET6, inet_pton
 from scapy.all import bind_layers
 from scapy.layers.inet import UDP
 from scapy.packet import Packet
-from scapy.fields import BitField, BitEnumField, XByteField, FlagsField,\
-    ConditionalField, StrField
+from scapy.fields import (
+    BitField,
+    BitEnumField,
+    XByteField,
+    FlagsField,
+    ConditionalField,
+    StrField,
+)
 from vpp_object import VppObject
 from util import NumericConstant
 from vpp_papi import VppEnum
 
 
 class BFDDiagCode(NumericConstant):
-    """ BFD Diagnostic Code """
+    """BFD Diagnostic Code"""
+
     no_diagnostic = 0
     control_detection_time_expired = 1
     echo_function_failed = 2
@@ -38,7 +45,8 @@ class BFDDiagCode(NumericConstant):
 
 
 class BFDState(NumericConstant):
-    """ BFD State """
+    """BFD State"""
+
     admin_down = 0
     down = 1
     init = 2
@@ -53,7 +61,8 @@ class BFDState(NumericConstant):
 
 
 class BFDAuthType(NumericConstant):
-    """ BFD Authentication Type """
+    """BFD Authentication Type"""
+
     no_auth = 0
     simple_pwd = 1
     keyed_md5 = 2
@@ -72,34 +81,38 @@ class BFDAuthType(NumericConstant):
 
 
 def bfd_is_auth_used(pkt):
-    """ is packet authenticated? """
+    """is packet authenticated?"""
     return "A" in pkt.sprintf("%BFD.flags%")
 
 
 def bfd_is_simple_pwd_used(pkt):
-    """ is simple password authentication used? """
+    """is simple password authentication used?"""
     return bfd_is_auth_used(pkt) and pkt.auth_type == BFDAuthType.simple_pwd
 
 
 def bfd_is_sha1_used(pkt):
-    """ is sha1 authentication used? """
-    return bfd_is_auth_used(pkt) and pkt.auth_type in \
-        (BFDAuthType.keyed_sha1, BFDAuthType.meticulous_keyed_sha1)
+    """is sha1 authentication used?"""
+    return bfd_is_auth_used(pkt) and pkt.auth_type in (
+        BFDAuthType.keyed_sha1,
+        BFDAuthType.meticulous_keyed_sha1,
+    )
 
 
 def bfd_is_md5_used(pkt):
-    """ is md5 authentication used? """
-    return bfd_is_auth_used(pkt) and pkt.auth_type in \
-        (BFDAuthType.keyed_md5, BFDAuthType.meticulous_keyed_md5)
+    """is md5 authentication used?"""
+    return bfd_is_auth_used(pkt) and pkt.auth_type in (
+        BFDAuthType.keyed_md5,
+        BFDAuthType.meticulous_keyed_md5,
+    )
 
 
 def bfd_is_md5_or_sha1_used(pkt):
-    """ is md5 or sha1 used? """
+    """is md5 or sha1 used?"""
     return bfd_is_md5_used(pkt) or bfd_is_sha1_used(pkt)
 
 
 class BFD(Packet):
-    """ BFD protocol layer for scapy """
+    """BFD protocol layer for scapy"""
 
     udp_dport = 3784  #: BFD destination port per RFC 5881
     udp_dport_echo = 3785  # : BFD destination port for ECHO per RFC 5881
@@ -114,7 +127,7 @@ class BFD(Packet):
         BitField("version", 1, 3),
         BitEnumField("diag", 0, 5, BFDDiagCode.desc_dict),
         BitEnumField("state", 0, 2, BFDState.desc_dict),
-        FlagsField("flags", 0, 6, ['M', 'D', 'A', 'C', 'F', 'P']),
+        FlagsField("flags", 0, 6, ["M", "D", "A", "C", "F", "P"]),
         XByteField("detect_mult", 0),
         BitField("length", bfd_pkt_len, 8),
         BitField("my_discriminator", 0, 32),
@@ -123,22 +136,20 @@ class BFD(Packet):
         BitField("required_min_rx_interval", 0, 32),
         BitField("required_min_echo_rx_interval", 0, 32),
         ConditionalField(
-            BitEnumField("auth_type", 0, 8, BFDAuthType.desc_dict),
-            bfd_is_auth_used),
+            BitEnumField("auth_type", 0, 8, BFDAuthType.desc_dict), bfd_is_auth_used
+        ),
         ConditionalField(BitField("auth_len", 0, 8), bfd_is_auth_used),
         ConditionalField(BitField("auth_key_id", 0, 8), bfd_is_auth_used),
-        ConditionalField(BitField("auth_reserved", 0, 8),
-                         bfd_is_md5_or_sha1_used),
-        ConditionalField(
-            BitField("auth_seq_num", 0, 32), bfd_is_md5_or_sha1_used),
+        ConditionalField(BitField("auth_reserved", 0, 8), bfd_is_md5_or_sha1_used),
+        ConditionalField(BitField("auth_seq_num", 0, 32), bfd_is_md5_or_sha1_used),
         ConditionalField(StrField("auth_key_hash", "0" * 16), bfd_is_md5_used),
-        ConditionalField(
-            StrField("auth_key_hash", "0" * 20), bfd_is_sha1_used),
+        ConditionalField(StrField("auth_key_hash", "0" * 20), bfd_is_sha1_used),
     ]
 
     def mysummary(self):
-        return self.sprintf("BFD(my_disc=%BFD.my_discriminator%,"
-                            "your_disc=%BFD.your_discriminator%)")
+        return self.sprintf(
+            "BFD(my_disc=%BFD.my_discriminator%, your_disc=%BFD.your_discriminator%)"
+        )
 
 
 # glue the BFD packet class to scapy parser
@@ -146,7 +157,7 @@ bind_layers(UDP, BFD, dport=BFD.udp_dport)
 
 
 class BFD_vpp_echo(Packet):
-    """ BFD echo packet as used by VPP (non-rfc, as rfc doesn't define one) """
+    """BFD echo packet as used by VPP (non-rfc, as rfc doesn't define one)"""
 
     udp_dport = 3785  #: BFD echo destination port per RFC 5881
     name = "BFD_VPP_ECHO"
@@ -154,13 +165,14 @@ class BFD_vpp_echo(Packet):
     fields_desc = [
         BitField("discriminator", 0, 32),
         BitField("expire_time_clocks", 0, 64),
-        BitField("checksum", 0, 64)
+        BitField("checksum", 0, 64),
     ]
 
     def mysummary(self):
         return self.sprintf(
             "BFD_VPP_ECHO(disc=%BFD_VPP_ECHO.discriminator%,"
-            "expire_time_clocks=%BFD_VPP_ECHO.expire_time_clocks%)")
+            "expire_time_clocks=%BFD_VPP_ECHO.expire_time_clocks%)"
+        )
 
 
 # glue the BFD echo packet class to scapy parser
@@ -168,7 +180,7 @@ bind_layers(UDP, BFD_vpp_echo, dport=BFD_vpp_echo.udp_dport)
 
 
 class VppBFDAuthKey(VppObject):
-    """ Represents BFD authentication key in VPP """
+    """Represents BFD authentication key in VPP"""
 
     def __init__(self, test, conf_key_id, auth_type, key):
         self._test = test
@@ -179,17 +191,17 @@ class VppBFDAuthKey(VppObject):
 
     @property
     def test(self):
-        """ Test which created this key """
+        """Test which created this key"""
         return self._test
 
     @property
     def auth_type(self):
-        """ Authentication type for this key """
+        """Authentication type for this key"""
         return self._auth_type
 
     @property
     def key(self):
-        """ key data """
+        """key data"""
         return self._key
 
     @key.setter
@@ -198,17 +210,20 @@ class VppBFDAuthKey(VppObject):
 
     @property
     def conf_key_id(self):
-        """ configuration key ID """
+        """configuration key ID"""
         return self._conf_key_id
 
     def add_vpp_config(self):
         self.test.vapi.bfd_auth_set_key(
-            conf_key_id=self._conf_key_id, auth_type=self._auth_type,
-            key=self._key, key_len=len(self._key))
+            conf_key_id=self._conf_key_id,
+            auth_type=self._auth_type,
+            key=self._key,
+            key_len=len(self._key),
+        )
         self._test.registry.register(self, self.test.logger)
 
     def get_bfd_auth_keys_dump_entry(self):
-        """ get the entry in the auth keys dump corresponding to this key """
+        """get the entry in the auth keys dump corresponding to this key"""
         result = self.test.vapi.bfd_auth_keys_dump()
         for k in result:
             if k.conf_key_id == self._conf_key_id:
@@ -226,11 +241,22 @@ class VppBFDAuthKey(VppObject):
 
 
 class VppBFDUDPSession(VppObject):
-    """ Represents BFD UDP session in VPP """
-
-    def __init__(self, test, interface, peer_addr, local_addr=None, af=AF_INET,
-                 desired_min_tx=300000, required_min_rx=300000, detect_mult=3,
-                 sha1_key=None, bfd_key_id=None, is_tunnel=False):
+    """Represents BFD UDP session in VPP"""
+
+    def __init__(
+        self,
+        test,
+        interface,
+        peer_addr,
+        local_addr=None,
+        af=AF_INET,
+        desired_min_tx=300000,
+        required_min_rx=300000,
+        detect_mult=3,
+        sha1_key=None,
+        bfd_key_id=None,
+        is_tunnel=False,
+    ):
         self._test = test
         self._interface = interface
         self._af = af
@@ -251,22 +277,22 @@ class VppBFDUDPSession(VppObject):
 
     @property
     def test(self):
-        """ Test which created this session """
+        """Test which created this session"""
         return self._test
 
     @property
     def interface(self):
-        """ Interface on which this session lives """
+        """Interface on which this session lives"""
         return self._interface
 
     @property
     def af(self):
-        """ Address family - AF_INET or AF_INET6 """
+        """Address family - AF_INET or AF_INET6"""
         return self._af
 
     @property
     def local_addr(self):
-        """ BFD session local address (VPP address) """
+        """BFD session local address (VPP address)"""
         if self._local_addr is None:
             if self.af == AF_INET:
                 return self._interface.local_ip4
@@ -278,28 +304,32 @@ class VppBFDUDPSession(VppObject):
 
     @property
     def peer_addr(self):
-        """ BFD session peer address """
+        """BFD session peer address"""
         return self._peer_addr
 
     def get_bfd_udp_session_dump_entry(self):
-        """ get the namedtuple entry from bfd udp session dump """
+        """get the namedtuple entry from bfd udp session dump"""
         result = self.test.vapi.bfd_udp_session_dump()
         for s in result:
             self.test.logger.debug("session entry: %s" % str(s))
             if s.sw_if_index == self.interface.sw_if_index:
-                if self.af == AF_INET \
-                        and self.interface.local_ip4 == str(s.local_addr) \
-                        and self.interface.remote_ip4 == str(s.peer_addr):
+                if (
+                    self.af == AF_INET
+                    and self.interface.local_ip4 == str(s.local_addr)
+                    and self.interface.remote_ip4 == str(s.peer_addr)
+                ):
                     return s
-                if self.af == AF_INET6 \
-                        and self.interface.local_ip6 == str(s.local_addr) \
-                        and self.interface.remote_ip6 == str(s.peer_addr):
+                if (
+                    self.af == AF_INET6
+                    and self.interface.local_ip6 == str(s.local_addr)
+                    and self.interface.remote_ip6 == str(s.peer_addr)
+                ):
                     return s
         return None
 
     @property
     def state(self):
-        """ BFD session state """
+        """BFD session state"""
         session = self.get_bfd_udp_session_dump_entry()
         if session is None:
             raise Exception("Could not find BFD session in VPP response")
@@ -307,27 +337,27 @@ class VppBFDUDPSession(VppObject):
 
     @property
     def desired_min_tx(self):
-        """ desired minimum tx interval """
+        """desired minimum tx interval"""
         return self._desired_min_tx
 
     @property
     def required_min_rx(self):
-        """ required minimum rx interval """
+        """required minimum rx interval"""
         return self._required_min_rx
 
     @property
     def detect_mult(self):
-        """ detect multiplier """
+        """detect multiplier"""
         return self._detect_mult
 
     @property
     def sha1_key(self):
-        """ sha1 key """
+        """sha1 key"""
         return self._sha1_key
 
     @property
     def bfd_key_id(self):
-        """ bfd key id in use """
+        """bfd key id in use"""
         return self._bfd_key_id
 
     @property
@@ -335,7 +365,7 @@ class VppBFDUDPSession(VppObject):
         return self._is_tunnel
 
     def activate_auth(self, key, bfd_key_id=None, delayed=False):
-        """ activate authentication for this session """
+        """activate authentication for this session"""
         self._bfd_key_id = bfd_key_id if bfd_key_id else randint(0, 255)
         self._sha1_key = key
         conf_key_id = self._sha1_key.conf_key_id
@@ -346,10 +376,11 @@ class VppBFDUDPSession(VppObject):
             peer_addr=self.peer_addr,
             bfd_key_id=self._bfd_key_id,
             conf_key_id=conf_key_id,
-            is_delayed=is_delayed)
+            is_delayed=is_delayed,
+        )
 
     def deactivate_auth(self, delayed=False):
-        """ deactivate authentication """
+        """deactivate authentication"""
         self._bfd_key_id = None
         self._sha1_key = None
         is_delayed = 1 if delayed else 0
@@ -357,45 +388,48 @@ class VppBFDUDPSession(VppObject):
             sw_if_index=self._interface.sw_if_index,
             local_addr=self.local_addr,
             peer_addr=self.peer_addr,
-            is_delayed=is_delayed)
+            is_delayed=is_delayed,
+        )
 
-    def modify_parameters(self,
-                          detect_mult=None,
-                          desired_min_tx=None,
-                          required_min_rx=None):
-        """ modify session parameters """
+    def modify_parameters(
+        self, detect_mult=None, desired_min_tx=None, required_min_rx=None
+    ):
+        """modify session parameters"""
         if detect_mult:
             self._detect_mult = detect_mult
         if desired_min_tx:
             self._desired_min_tx = desired_min_tx
         if required_min_rx:
             self._required_min_rx = required_min_rx
-        self.test.vapi.bfd_udp_mod(sw_if_index=self._interface.sw_if_index,
-                                   desired_min_tx=self.desired_min_tx,
-                                   required_min_rx=self.required_min_rx,
-                                   detect_mult=self.detect_mult,
-                                   local_addr=self.local_addr,
-                                   peer_addr=self.peer_addr)
+        self.test.vapi.bfd_udp_mod(
+            sw_if_index=self._interface.sw_if_index,
+            desired_min_tx=self.desired_min_tx,
+            required_min_rx=self.required_min_rx,
+            detect_mult=self.detect_mult,
+            local_addr=self.local_addr,
+            peer_addr=self.peer_addr,
+        )
 
     def add_vpp_config(self):
         bfd_key_id = self._bfd_key_id if self._sha1_key else None
         conf_key_id = self._sha1_key.conf_key_id if self._sha1_key else None
         is_authenticated = True if self._sha1_key else False
-        self.test.vapi.bfd_udp_add(sw_if_index=self._interface.sw_if_index,
-                                   desired_min_tx=self.desired_min_tx,
-                                   required_min_rx=self.required_min_rx,
-                                   detect_mult=self.detect_mult,
-                                   local_addr=self.local_addr,
-                                   peer_addr=self.peer_addr,
-                                   bfd_key_id=bfd_key_id,
-                                   conf_key_id=conf_key_id,
-                                   is_authenticated=is_authenticated)
+        self.test.vapi.bfd_udp_add(
+            sw_if_index=self._interface.sw_if_index,
+            desired_min_tx=self.desired_min_tx,
+            required_min_rx=self.required_min_rx,
+            detect_mult=self.detect_mult,
+            local_addr=self.local_addr,
+            peer_addr=self.peer_addr,
+            bfd_key_id=bfd_key_id,
+            conf_key_id=conf_key_id,
+            is_authenticated=is_authenticated,
+        )
         self._test.registry.register(self, self.test.logger)
 
-    def upd_vpp_config(self,
-                       detect_mult=None,
-                       desired_min_tx=None,
-                       required_min_rx=None):
+    def upd_vpp_config(
+        self, detect_mult=None, desired_min_tx=None, required_min_rx=None
+    ):
         if desired_min_tx:
             self._desired_min_tx = desired_min_tx
         if required_min_rx:
@@ -405,15 +439,17 @@ class VppBFDUDPSession(VppObject):
         bfd_key_id = self._bfd_key_id if self._sha1_key else None
         conf_key_id = self._sha1_key.conf_key_id if self._sha1_key else None
         is_authenticated = True if self._sha1_key else False
-        self.test.vapi.bfd_udp_upd(sw_if_index=self._interface.sw_if_index,
-                                   desired_min_tx=self.desired_min_tx,
-                                   required_min_rx=self.required_min_rx,
-                                   detect_mult=self.detect_mult,
-                                   local_addr=self.local_addr,
-                                   peer_addr=self.peer_addr,
-                                   bfd_key_id=bfd_key_id,
-                                   conf_key_id=conf_key_id,
-                                   is_authenticated=is_authenticated)
+        self.test.vapi.bfd_udp_upd(
+            sw_if_index=self._interface.sw_if_index,
+            desired_min_tx=self.desired_min_tx,
+            required_min_rx=self.required_min_rx,
+            detect_mult=self.detect_mult,
+            local_addr=self.local_addr,
+            peer_addr=self.peer_addr,
+            bfd_key_id=bfd_key_id,
+            conf_key_id=conf_key_id,
+            is_authenticated=is_authenticated,
+        )
         self._test.registry.register(self, self.test.logger)
 
     def query_vpp_config(self):
@@ -421,27 +457,34 @@ class VppBFDUDPSession(VppObject):
         return session is not None
 
     def remove_vpp_config(self):
-        self.test.vapi.bfd_udp_del(self._interface.sw_if_index,
-                                   local_addr=self.local_addr,
-                                   peer_addr=self.peer_addr)
+        self.test.vapi.bfd_udp_del(
+            self._interface.sw_if_index,
+            local_addr=self.local_addr,
+            peer_addr=self.peer_addr,
+        )
 
     def object_id(self):
-        return "bfd-udp-%s-%s-%s-%s" % (self._interface.sw_if_index,
-                                        self.local_addr,
-                                        self.peer_addr,
-                                        self.af)
+        return "bfd-udp-%s-%s-%s-%s" % (
+            self._interface.sw_if_index,
+            self.local_addr,
+            self.peer_addr,
+            self.af,
+        )
 
     def admin_up(self):
-        """ set bfd session admin-up """
+        """set bfd session admin-up"""
         self.test.vapi.bfd_udp_session_set_flags(
             flags=VppEnum.vl_api_if_status_flags_t.IF_STATUS_API_FLAG_ADMIN_UP,
             sw_if_index=self._interface.sw_if_index,
             local_addr=self.local_addr,
-            peer_addr=self.peer_addr)
+            peer_addr=self.peer_addr,
+        )
 
     def admin_down(self):
-        """ set bfd session admin-down """
+        """set bfd session admin-down"""
         self.test.vapi.bfd_udp_session_set_flags(
-            flags=0, sw_if_index=self._interface.sw_if_index,
+            flags=0,
+            sw_if_index=self._interface.sw_if_index,
             local_addr=self.local_addr,
-            peer_addr=self.peer_addr)
+            peer_addr=self.peer_addr,
+        )