tests: replace pycodestyle with black
[vpp.git] / test / test_vrrp.py
index 6a62a88..9319b0f 100644 (file)
@@ -17,9 +17,18 @@ from vpp_papi import VppEnum
 from scapy.packet import raw
 from scapy.layers.l2 import Ether, ARP
 from scapy.layers.inet import IP, ICMP, icmptypes
-from scapy.layers.inet6 import IPv6, ipv6nh, IPv6ExtHdrHopByHop, \
-    ICMPv6MLReport2, ICMPv6ND_NA, ICMPv6ND_NS, ICMPv6NDOptDstLLAddr, \
-    ICMPv6NDOptSrcLLAddr, ICMPv6EchoRequest, ICMPv6EchoReply
+from scapy.layers.inet6 import (
+    IPv6,
+    ipv6nh,
+    IPv6ExtHdrHopByHop,
+    ICMPv6MLReport2,
+    ICMPv6ND_NA,
+    ICMPv6ND_NS,
+    ICMPv6NDOptDstLLAddr,
+    ICMPv6NDOptSrcLLAddr,
+    ICMPv6EchoRequest,
+    ICMPv6EchoReply,
+)
 from scapy.contrib.igmpv3 import IGMPv3, IGMPv3mr, IGMPv3gr
 from scapy.layers.vrrp import IPPROTO_VRRP, VRRPv3
 from scapy.utils6 import in6_getnsma, in6_getnsmac
@@ -37,11 +46,11 @@ VRRP_VR_STATE_BACKUP = 1
 VRRP_VR_STATE_MASTER = 2
 VRRP_VR_STATE_INTF_DOWN = 3
 
-VRRP_INDEX_INVALID = 0xffffffff
+VRRP_INDEX_INVALID = 0xFFFFFFFF
 
 
 def is_non_arp(p):
-    """ Want to filter out advertisements, igmp, etc"""
+    """Want to filter out advertisements, igmp, etc"""
     if p.haslayer(ARP):
         return False
 
@@ -49,7 +58,7 @@ def is_non_arp(p):
 
 
 def is_not_adv(p):
-    """ Filter out everything but advertisements. E.g. multicast RD/ND """
+    """Filter out everything but advertisements. E.g. multicast RD/ND"""
     if p.haslayer(VRRPv3):
         return False
 
@@ -57,7 +66,7 @@ def is_not_adv(p):
 
 
 def is_not_echo_reply(p):
-    """ filter out advertisements and other while waiting for echo reply """
+    """filter out advertisements and other while waiting for echo reply"""
     if p.haslayer(IP) and p.haslayer(ICMP):
         if icmptypes[p[ICMP].type] == "echo-reply":
             return False
@@ -68,15 +77,16 @@ def is_not_echo_reply(p):
 
 
 class VppVRRPVirtualRouter(VppObject):
-
-    def __init__(self,
-                 test,
-                 intf,
-                 vr_id,
-                 prio=100,
-                 intvl=100,
-                 flags=VRRP_VR_FLAG_PREEMPT,
-                 vips=None):
+    def __init__(
+        self,
+        test,
+        intf,
+        vr_id,
+        prio=100,
+        intvl=100,
+        flags=VRRP_VR_FLAG_PREEMPT,
+        vips=None,
+    ):
         self._test = test
         self._intf = intf
         self._sw_if_index = self._intf.sw_if_index
@@ -84,40 +94,44 @@ class VppVRRPVirtualRouter(VppObject):
         self._prio = prio
         self._intvl = intvl
         self._flags = flags
-        if (flags & VRRP_VR_FLAG_IPV6):
+        if flags & VRRP_VR_FLAG_IPV6:
             self._is_ipv6 = 1
             self._adv_dest_mac = "33:33:00:00:00:12"
             self._virtual_mac = "00:00:5e:00:02:%02x" % vr_id
             self._adv_dest_ip = "ff02::12"
-            self._vips = ([intf.local_ip6] if vips is None else vips)
+            self._vips = [intf.local_ip6] if vips is None else vips
         else:
             self._is_ipv6 = 0
             self._adv_dest_mac = "01:00:5e:00:00:12"
             self._virtual_mac = "00:00:5e:00:01:%02x" % vr_id
             self._adv_dest_ip = "224.0.0.18"
-            self._vips = ([intf.local_ip4] if vips is None else vips)
+            self._vips = [intf.local_ip4] if vips is None else vips
         self._tracked_ifs = []
         self._vrrp_index = VRRP_INDEX_INVALID
 
     def add_vpp_config(self):
-        self._test.vapi.vrrp_vr_add_del(is_add=1,
-                                        sw_if_index=self._intf.sw_if_index,
-                                        vr_id=self._vr_id,
-                                        priority=self._prio,
-                                        interval=self._intvl,
-                                        flags=self._flags,
-                                        n_addrs=len(self._vips),
-                                        addrs=self._vips)
+        self._test.vapi.vrrp_vr_add_del(
+            is_add=1,
+            sw_if_index=self._intf.sw_if_index,
+            vr_id=self._vr_id,
+            priority=self._prio,
+            interval=self._intvl,
+            flags=self._flags,
+            n_addrs=len(self._vips),
+            addrs=self._vips,
+        )
 
     def update_vpp_config(self):
-        r = self._test.vapi.vrrp_vr_update(vrrp_index=self._vrrp_index,
-                                           sw_if_index=self._intf.sw_if_index,
-                                           vr_id=self._vr_id,
-                                           priority=self._prio,
-                                           interval=self._intvl,
-                                           flags=self._flags,
-                                           n_addrs=len(self._vips),
-                                           addrs=self._vips)
+        r = self._test.vapi.vrrp_vr_update(
+            vrrp_index=self._vrrp_index,
+            sw_if_index=self._intf.sw_if_index,
+            vr_id=self._vr_id,
+            priority=self._prio,
+            interval=self._intvl,
+            flags=self._flags,
+            n_addrs=len(self._vips),
+            addrs=self._vips,
+        )
         self._vrrp_index = r.vrrp_index
 
     def delete_vpp_config(self):
@@ -129,7 +143,7 @@ class VppVRRPVirtualRouter(VppObject):
             if vr.config.vr_id != self._vr_id:
                 continue
 
-            is_ipv6 = (1 if (vr.config.flags & VRRP_VR_FLAG_IPV6) else 0)
+            is_ipv6 = 1 if (vr.config.flags & VRRP_VR_FLAG_IPV6) else 0
             if is_ipv6 != self._is_ipv6:
                 continue
 
@@ -138,41 +152,45 @@ class VppVRRPVirtualRouter(VppObject):
         return None
 
     def remove_vpp_config(self):
-        self._test.vapi.vrrp_vr_add_del(is_add=0,
-                                        sw_if_index=self._intf.sw_if_index,
-                                        vr_id=self._vr_id,
-                                        priority=self._prio,
-                                        interval=self._intvl,
-                                        flags=self._flags,
-                                        n_addrs=len(self._vips),
-                                        addrs=self._vips)
+        self._test.vapi.vrrp_vr_add_del(
+            is_add=0,
+            sw_if_index=self._intf.sw_if_index,
+            vr_id=self._vr_id,
+            priority=self._prio,
+            interval=self._intvl,
+            flags=self._flags,
+            n_addrs=len(self._vips),
+            addrs=self._vips,
+        )
 
     def start_stop(self, is_start):
-        self._test.vapi.vrrp_vr_start_stop(is_start=is_start,
-                                           sw_if_index=self._intf.sw_if_index,
-                                           vr_id=self._vr_id,
-                                           is_ipv6=self._is_ipv6)
-        self._start_time = (time.time() if is_start else None)
+        self._test.vapi.vrrp_vr_start_stop(
+            is_start=is_start,
+            sw_if_index=self._intf.sw_if_index,
+            vr_id=self._vr_id,
+            is_ipv6=self._is_ipv6,
+        )
+        self._start_time = time.time() if is_start else None
 
     def add_del_tracked_interface(self, is_add, sw_if_index, prio):
         args = {
-            'sw_if_index': self._intf.sw_if_index,
-            'is_ipv6': self._is_ipv6,
-            'vr_id': self._vr_id,
-            'is_add': is_add,
-            'n_ifs': 1,
-            'ifs': [{'sw_if_index': sw_if_index, 'priority': prio}]
+            "sw_if_index": self._intf.sw_if_index,
+            "is_ipv6": self._is_ipv6,
+            "vr_id": self._vr_id,
+            "is_add": is_add,
+            "n_ifs": 1,
+            "ifs": [{"sw_if_index": sw_if_index, "priority": prio}],
         }
         self._test.vapi.vrrp_vr_track_if_add_del(**args)
-        self._tracked_ifs.append(args['ifs'][0])
+        self._tracked_ifs.append(args["ifs"][0])
 
     def set_unicast_peers(self, addrs):
         args = {
-            'sw_if_index': self._intf.sw_if_index,
-            'is_ipv6': self._is_ipv6,
-            'vr_id': self._vr_id,
-            'n_addrs': len(addrs),
-            'addrs': addrs
+            "sw_if_index": self._intf.sw_if_index,
+            "is_ipv6": self._is_ipv6,
+            "vr_id": self._vr_id,
+            "n_addrs": len(addrs),
+            "addrs": addrs,
         }
         self._test.vapi.vrrp_vr_set_peers(**args)
         self._unicast_peers = addrs
@@ -210,21 +228,22 @@ class VppVRRPVirtualRouter(VppObject):
 
     def master_down_seconds(self):
         vr_details = self.query_vpp_config()
-        return (vr_details.runtime.master_down_int * 0.01)
+        return vr_details.runtime.master_down_int * 0.01
 
     def vrrp_adv_packet(self, prio=None, src_ip=None):
         dst_ip = self._adv_dest_ip
         if prio is None:
             prio = self._prio
         eth = Ether(dst=self._adv_dest_mac, src=self._virtual_mac)
-        vrrp = VRRPv3(vrid=self._vr_id, priority=prio,
-                      ipcount=len(self._vips), adv=self._intvl)
+        vrrp = VRRPv3(
+            vrid=self._vr_id, priority=prio, ipcount=len(self._vips), adv=self._intvl
+        )
         if self._is_ipv6:
-            src_ip = (self._intf.local_ip6_ll if src_ip is None else src_ip)
+            src_ip = self._intf.local_ip6_ll if src_ip is None else src_ip
             ip = IPv6(src=src_ip, dst=dst_ip, nh=IPPROTO_VRRP, hlim=255)
             vrrp.addrlist = self._vips
         else:
-            src_ip = (self._intf.local_ip4 if src_ip is None else src_ip)
+            src_ip = self._intf.local_ip4 if src_ip is None else src_ip
             ip = IP(src=src_ip, dst=dst_ip, proto=IPPROTO_VRRP, ttl=255, id=0)
             vrrp.addrlist = self._vips
 
@@ -234,7 +253,7 @@ class VppVRRPVirtualRouter(VppObject):
 
 
 class TestVRRP4(VppTestCase):
-    """ IPv4 VRRP Test Case """
+    """IPv4 VRRP Test Case"""
 
     @classmethod
     def setUpClass(cls):
@@ -284,8 +303,7 @@ class TestVRRP4(VppTestCase):
         self.assertEqual(ip.proto, 2)
 
         igmp = pkt[IGMPv3]
-        self.assertEqual(IGMPv3.igmpv3types[igmp.type],
-                         "Version 3 Membership Report")
+        self.assertEqual(IGMPv3.igmpv3types[igmp.type], "Version 3 Membership Report")
 
         igmpmr = pkt[IGMPv3mr]
         self.assertEqual(igmpmr.numgrp, 1)
@@ -330,15 +348,15 @@ class TestVRRP4(VppTestCase):
     # become master and start advertising immediately.
     @unittest.skipUnless(config.extended, "part of extended tests")
     def test_vrrp4_master_adv(self):
-        """ IPv4 Master VR advertises """
+        """IPv4 Master VR advertises"""
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
 
         prio = 255
         intvl = self._default_adv
-        vr = VppVRRPVirtualRouter(self, self.pg0, 100,
-                                  prio=prio, intvl=intvl,
-                                  flags=self._default_flags)
+        vr = VppVRRPVirtualRouter(
+            self, self.pg0, 100, prio=prio, intvl=intvl, flags=self._default_flags
+        )
 
         vr.add_vpp_config()
         vr.start_stop(is_start=1)
@@ -362,25 +380,30 @@ class TestVRRP4(VppTestCase):
     # of parameters to test that too
     @unittest.skipUnless(config.extended, "part of extended tests")
     def test_vrrp4_master_adv_update(self):
-        """ IPv4 Master VR adv + Update to Backup """
+        """IPv4 Master VR adv + Update to Backup"""
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
 
         prio = 255
         intvl = self._default_adv
-        vr = VppVRRPVirtualRouter(self, self.pg0, 100,
-                                  prio=prio, intvl=intvl,
-                                  flags=self._default_flags)
+        vr = VppVRRPVirtualRouter(
+            self, self.pg0, 100, prio=prio, intvl=intvl, flags=self._default_flags
+        )
 
         vr.update_vpp_config()
         vr.start_stop(is_start=1)
         self.logger.info(self.vapi.cli("show vrrp vr"))
         # Update VR with lower prio and larger interval
         # we need to keep old VR for the adv checks
-        upd_vr = VppVRRPVirtualRouter(self, self.pg0, 100,
-                                      prio=100, intvl=2*intvl,
-                                      flags=self._default_flags,
-                                      vips=[self.pg0.remote_ip4])
+        upd_vr = VppVRRPVirtualRouter(
+            self,
+            self.pg0,
+            100,
+            prio=100,
+            intvl=2 * intvl,
+            flags=self._default_flags,
+            vips=[self.pg0.remote_ip4],
+        )
         upd_vr._vrrp_index = vr._vrrp_index
         upd_vr.update_vpp_config()
         start_time = time.time()
@@ -403,7 +426,7 @@ class TestVRRP4(VppTestCase):
         src_ip = self.pg0.remote_ip4
         pkts = [upd_vr.vrrp_adv_packet(prio=110, src_ip=src_ip)]
         while time.time() < end_time:
-            self.send_and_assert_no_replies(self.pg0, pkts, timeout=intvl*0.01)
+            self.send_and_assert_no_replies(self.pg0, pkts, timeout=intvl * 0.01)
             self.logger.info(self.vapi.cli("show trace"))
 
         upd_vr.start_stop(is_start=0)
@@ -413,7 +436,7 @@ class TestVRRP4(VppTestCase):
     # long as it receives higher priority advertisements
     @unittest.skipUnless(config.extended, "part of extended tests")
     def test_vrrp4_backup_noadv(self):
-        """ IPv4 Backup VR does not advertise """
+        """IPv4 Backup VR does not advertise"""
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
 
@@ -421,10 +444,15 @@ class TestVRRP4(VppTestCase):
         prio = 100
         intvl = self._default_adv
         intvl_s = intvl * 0.01
-        vr = VppVRRPVirtualRouter(self, self.pg0, vr_id,
-                                  prio=prio, intvl=intvl,
-                                  flags=self._default_flags,
-                                  vips=[self.pg0.remote_ip4])
+        vr = VppVRRPVirtualRouter(
+            self,
+            self.pg0,
+            vr_id,
+            prio=prio,
+            intvl=intvl,
+            flags=self._default_flags,
+            vips=[self.pg0.remote_ip4],
+        )
         self._vrs.append(vr)
         vr.add_vpp_config()
 
@@ -440,7 +468,7 @@ class TestVRRP4(VppTestCase):
 
         # send higher prio advertisements, should not receive any
         src_ip = self.pg0.remote_ip4
-        pkts = [vr.vrrp_adv_packet(prio=prio+10, src_ip=src_ip)]
+        pkts = [vr.vrrp_adv_packet(prio=prio + 10, src_ip=src_ip)]
         while time.time() < end_time:
             self.send_and_assert_no_replies(self.pg0, pkts, timeout=intvl_s)
             self.logger.info(self.vapi.cli("show trace"))
@@ -451,16 +479,16 @@ class TestVRRP4(VppTestCase):
         self._vrs = []
 
     def test_vrrp4_master_arp(self):
-        """ IPv4 Master VR replies to ARP """
+        """IPv4 Master VR replies to ARP"""
         self.pg_start()
 
         # VR virtual IP is the default, which is the pg local IP
         vr_id = 100
         prio = 255
         intvl = self._default_adv
-        vr = VppVRRPVirtualRouter(self, self.pg0, 100,
-                                  prio=prio, intvl=intvl,
-                                  flags=self._default_flags)
+        vr = VppVRRPVirtualRouter(
+            self, self.pg0, 100, prio=prio, intvl=intvl, flags=self._default_flags
+        )
         self._vrs.append(vr)
 
         vr.add_vpp_config()
@@ -484,7 +512,7 @@ class TestVRRP4(VppTestCase):
 
     @unittest.skipUnless(config.extended, "part of extended tests")
     def test_vrrp4_backup_noarp(self):
-        """ IPv4 Backup VR ignores ARP """
+        """IPv4 Backup VR ignores ARP"""
         # We need an address for a virtual IP that is not the IP that
         # ARP requests will originate from
 
@@ -492,16 +520,24 @@ class TestVRRP4(VppTestCase):
         prio = 100
         intvl = self._default_adv
         vip = self.pg0.remote_hosts[1].ip4
-        vr = VppVRRPVirtualRouter(self, self.pg0, vr_id,
-                                  prio=prio, intvl=intvl,
-                                  flags=self._default_flags,
-                                  vips=[vip])
+        vr = VppVRRPVirtualRouter(
+            self,
+            self.pg0,
+            vr_id,
+            prio=prio,
+            intvl=intvl,
+            flags=self._default_flags,
+            vips=[vip],
+        )
         self._vrs.append(vr)
         vr.add_vpp_config()
 
-        arp_req = (Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg0.remote_mac) /
-                   ARP(op=ARP.who_has, pdst=vip,
-                   psrc=self.pg0.remote_ip4, hwsrc=self.pg0.remote_mac))
+        arp_req = Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg0.remote_mac) / ARP(
+            op=ARP.who_has,
+            pdst=vip,
+            psrc=self.pg0.remote_ip4,
+            hwsrc=self.pg0.remote_mac,
+        )
 
         # Before the VR is started make sure no reply to request for VIP
         self.pg_start()
@@ -510,7 +546,7 @@ class TestVRRP4(VppTestCase):
 
         # VR should start in backup state and still should not reply to ARP
         # send a higher priority adv to make sure it does not become master
-        adv = vr.vrrp_adv_packet(prio=prio+10, src_ip=self.pg0.remote_ip4)
+        adv = vr.vrrp_adv_packet(prio=prio + 10, src_ip=self.pg0.remote_ip4)
         vr.start_stop(is_start=1)
         self.send_and_assert_no_replies(self.pg0, [adv, arp_req], timeout=1)
 
@@ -520,17 +556,22 @@ class TestVRRP4(VppTestCase):
 
     @unittest.skipUnless(config.extended, "part of extended tests")
     def test_vrrp4_election(self):
-        """ IPv4 Backup VR becomes master if no advertisements received """
+        """IPv4 Backup VR becomes master if no advertisements received"""
 
         vr_id = 100
         prio = 100
         intvl = self._default_adv
         intvl_s = intvl * 0.01
         vip = self.pg0.remote_ip4
-        vr = VppVRRPVirtualRouter(self, self.pg0, vr_id,
-                                  prio=prio, intvl=intvl,
-                                  flags=self._default_flags,
-                                  vips=[vip])
+        vr = VppVRRPVirtualRouter(
+            self,
+            self.pg0,
+            vr_id,
+            prio=prio,
+            intvl=intvl,
+            flags=self._default_flags,
+            vips=[vip],
+        )
         self._vrs.append(vr)
         vr.add_vpp_config()
 
@@ -557,17 +598,22 @@ class TestVRRP4(VppTestCase):
 
     @unittest.skipUnless(config.extended, "part of extended tests")
     def test_vrrp4_backup_preempts(self):
-        """ IPv4 Backup VR preempts lower priority master """
+        """IPv4 Backup VR preempts lower priority master"""
 
         vr_id = 100
         prio = 100
         intvl = self._default_adv
         intvl_s = intvl * 0.01
         vip = self.pg0.remote_ip4
-        vr = VppVRRPVirtualRouter(self, self.pg0, vr_id,
-                                  prio=prio, intvl=intvl,
-                                  flags=self._default_flags,
-                                  vips=[vip])
+        vr = VppVRRPVirtualRouter(
+            self,
+            self.pg0,
+            vr_id,
+            prio=prio,
+            intvl=intvl,
+            flags=self._default_flags,
+            vips=[vip],
+        )
         self._vrs.append(vr)
         vr.add_vpp_config()
 
@@ -583,7 +629,7 @@ class TestVRRP4(VppTestCase):
 
         # send lower prio advertisements until timer expires
         src_ip = self.pg0.remote_ip4
-        pkts = [vr.vrrp_adv_packet(prio=prio-10, src_ip=src_ip)]
+        pkts = [vr.vrrp_adv_packet(prio=prio - 10, src_ip=src_ip)]
         while time.time() + intvl_s < end_time:
             self.send_and_assert_no_replies(self.pg0, pkts, timeout=intvl_s)
             self.logger.info(self.vapi.cli("show trace"))
@@ -595,7 +641,7 @@ class TestVRRP4(VppTestCase):
 
     @unittest.skipUnless(config.extended, "part of extended tests")
     def test_vrrp4_master_preempted(self):
-        """ IPv4 Master VR preempted by higher priority backup """
+        """IPv4 Master VR preempted by higher priority backup"""
 
         # A prio 255 VR cannot be preempted so the prio has to be lower and
         # we have to wait for it to take over
@@ -603,10 +649,15 @@ class TestVRRP4(VppTestCase):
         prio = 100
         intvl = self._default_adv
         vip = self.pg0.remote_ip4
-        vr = VppVRRPVirtualRouter(self, self.pg0, vr_id,
-                                  prio=prio, intvl=intvl,
-                                  flags=self._default_flags,
-                                  vips=[vip])
+        vr = VppVRRPVirtualRouter(
+            self,
+            self.pg0,
+            vr_id,
+            prio=prio,
+            intvl=intvl,
+            flags=self._default_flags,
+            vips=[vip],
+        )
         self._vrs.append(vr)
         vr.add_vpp_config()
 
@@ -632,7 +683,7 @@ class TestVRRP4(VppTestCase):
 
     @unittest.skipUnless(config.extended, "part of extended tests")
     def test_vrrp4_accept_mode_disabled(self):
-        """ IPv4 Master VR does not reply for VIP w/ accept mode off """
+        """IPv4 Master VR does not reply for VIP w/ accept mode off"""
 
         # accept mode only matters when prio < 255, so it will have to
         # come up as a backup and take over as master after the timeout
@@ -640,10 +691,15 @@ class TestVRRP4(VppTestCase):
         prio = 100
         intvl = self._default_adv
         vip = self.pg0.remote_hosts[4].ip4
-        vr = VppVRRPVirtualRouter(self, self.pg0, vr_id,
-                                  prio=prio, intvl=intvl,
-                                  flags=self._default_flags,
-                                  vips=[vip])
+        vr = VppVRRPVirtualRouter(
+            self,
+            self.pg0,
+            vr_id,
+            prio=prio,
+            intvl=intvl,
+            flags=self._default_flags,
+            vips=[vip],
+        )
         self._vrs.append(vr)
         vr.add_vpp_config()
 
@@ -661,9 +717,11 @@ class TestVRRP4(VppTestCase):
         vr.assert_state_equals(VRRP_VR_STATE_MASTER)
 
         # send an ICMP echo to the VR virtual IP address
-        echo = (Ether(dst=vr.virtual_mac(), src=self.pg0.remote_mac) /
-                IP(dst=vip, src=self.pg0.remote_ip4) /
-                ICMP(seq=1, id=self.pg0.sw_if_index, type='echo-request'))
+        echo = (
+            Ether(dst=vr.virtual_mac(), src=self.pg0.remote_mac)
+            / IP(dst=vip, src=self.pg0.remote_ip4)
+            / ICMP(seq=1, id=self.pg0.sw_if_index, type="echo-request")
+        )
         self.pg_send(self.pg0, [echo])
 
         # wait for an echo reply. none should be received
@@ -672,7 +730,7 @@ class TestVRRP4(VppTestCase):
 
     @unittest.skipUnless(config.extended, "part of extended tests")
     def test_vrrp4_accept_mode_enabled(self):
-        """ IPv4 Master VR replies for VIP w/ accept mode on """
+        """IPv4 Master VR replies for VIP w/ accept mode on"""
 
         # A prio 255 VR cannot be preempted so the prio has to be lower and
         # we have to wait for it to take over
@@ -680,11 +738,10 @@ class TestVRRP4(VppTestCase):
         prio = 100
         intvl = self._default_adv
         vip = self.pg0.remote_hosts[4].ip4
-        flags = (VRRP_VR_FLAG_PREEMPT | VRRP_VR_FLAG_ACCEPT)
-        vr = VppVRRPVirtualRouter(self, self.pg0, vr_id,
-                                  prio=prio, intvl=intvl,
-                                  flags=flags,
-                                  vips=[vip])
+        flags = VRRP_VR_FLAG_PREEMPT | VRRP_VR_FLAG_ACCEPT
+        vr = VppVRRPVirtualRouter(
+            self, self.pg0, vr_id, prio=prio, intvl=intvl, flags=flags, vips=[vip]
+        )
         self._vrs.append(vr)
         vr.add_vpp_config()
 
@@ -702,15 +759,18 @@ class TestVRRP4(VppTestCase):
         vr.assert_state_equals(VRRP_VR_STATE_MASTER)
 
         # send an ICMP echo to the VR virtual IP address
-        echo = (Ether(dst=vr.virtual_mac(), src=self.pg0.remote_mac) /
-                IP(dst=vip, src=self.pg0.remote_ip4) /
-                ICMP(seq=1, id=self.pg0.sw_if_index, type='echo-request'))
+        echo = (
+            Ether(dst=vr.virtual_mac(), src=self.pg0.remote_mac)
+            / IP(dst=vip, src=self.pg0.remote_ip4)
+            / ICMP(seq=1, id=self.pg0.sw_if_index, type="echo-request")
+        )
         self.pg_send(self.pg0, [echo])
 
         # wait for an echo reply.
         time.sleep(1)
-        rx_pkts = self.pg0.get_capture(expected_count=1, timeout=1,
-                                       filter_out_fn=is_not_echo_reply)
+        rx_pkts = self.pg0.get_capture(
+            expected_count=1, timeout=1, filter_out_fn=is_not_echo_reply
+        )
 
         self.assertEqual(rx_pkts[0][IP].src, vip)
         self.assertEqual(rx_pkts[0][IP].dst, self.pg0.remote_ip4)
@@ -720,17 +780,22 @@ class TestVRRP4(VppTestCase):
 
     @unittest.skipUnless(config.extended, "part of extended tests")
     def test_vrrp4_intf_tracking(self):
-        """ IPv4 Master VR adjusts priority based on tracked interface """
+        """IPv4 Master VR adjusts priority based on tracked interface"""
 
         vr_id = 100
         prio = 255
         intvl = self._default_adv
         intvl_s = intvl * 0.01
         vip = self.pg0.local_ip4
-        vr = VppVRRPVirtualRouter(self, self.pg0, vr_id,
-                                  prio=prio, intvl=intvl,
-                                  flags=self._default_flags,
-                                  vips=[vip])
+        vr = VppVRRPVirtualRouter(
+            self,
+            self.pg0,
+            vr_id,
+            prio=prio,
+            intvl=intvl,
+            flags=self._default_flags,
+            vips=[vip],
+        )
         self._vrs.append(vr)
         vr.add_vpp_config()
 
@@ -740,9 +805,9 @@ class TestVRRP4(VppTestCase):
         # add pg1 as a tracked interface and start the VR
         adjustment = 50
         adjusted_prio = prio - adjustment
-        vr.add_del_tracked_interface(is_add=1,
-                                     sw_if_index=self.pg1.sw_if_index,
-                                     prio=adjustment)
+        vr.add_del_tracked_interface(
+            is_add=1, sw_if_index=self.pg1.sw_if_index, prio=adjustment
+        )
         vr.start_stop(is_start=1)
         vr.assert_state_equals(VRRP_VR_STATE_MASTER)
 
@@ -751,53 +816,47 @@ class TestVRRP4(VppTestCase):
 
         # tracked intf is up ->  advertised priority == configured priority
         self.pg0.enable_capture()
-        rx = self.pg0.wait_for_packet(timeout=intvl_s,
-                                      filter_out_fn=is_not_adv)
+        rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv)
         self.assertEqual(rx, adv_configured)
 
         # take down pg1, verify priority is now being adjusted
         self.pg1.admin_down()
         self.pg0.enable_capture()
-        rx = self.pg0.wait_for_packet(timeout=intvl_s,
-                                      filter_out_fn=is_not_adv)
+        rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv)
         self.assertEqual(rx, adv_adjusted)
 
         # bring up pg1, verify priority now matches configured value
         self.pg1.admin_up()
         self.pg0.enable_capture()
-        rx = self.pg0.wait_for_packet(timeout=intvl_s,
-                                      filter_out_fn=is_not_adv)
+        rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv)
         self.assertEqual(rx, adv_configured)
 
         # remove IP address from pg1, verify priority now being adjusted
         self.pg1.unconfig_ip4()
         self.pg0.enable_capture()
-        rx = self.pg0.wait_for_packet(timeout=intvl_s,
-                                      filter_out_fn=is_not_adv)
+        rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv)
         self.assertEqual(rx, adv_adjusted)
 
         # add IP address to pg1, verify priority now matches configured value
         self.pg1.config_ip4()
         self.pg0.enable_capture()
-        rx = self.pg0.wait_for_packet(timeout=intvl_s,
-                                      filter_out_fn=is_not_adv)
+        rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv)
         self.assertEqual(rx, adv_configured)
 
     @unittest.skipUnless(config.extended, "part of extended tests")
     def test_vrrp4_master_adv_unicast(self):
-        """ IPv4 Master VR advertises (unicast) """
+        """IPv4 Master VR advertises (unicast)"""
 
         vr_id = 100
         prio = 255
         intvl = self._default_adv
         intvl_s = intvl * 0.01
         vip = self.pg0.local_ip4
-        flags = (self._default_flags | VRRP_VR_FLAG_UNICAST)
+        flags = self._default_flags | VRRP_VR_FLAG_UNICAST
         unicast_peer = self.pg0.remote_hosts[4]
-        vr = VppVRRPVirtualRouter(self, self.pg0, vr_id,
-                                  prio=prio, intvl=intvl,
-                                  flags=flags,
-                                  vips=[vip])
+        vr = VppVRRPVirtualRouter(
+            self, self.pg0, vr_id, prio=prio, intvl=intvl, flags=flags, vips=[vip]
+        )
         self._vrs.append(vr)
         vr.add_vpp_config()
         vr.set_unicast_peers([unicast_peer.ip4])
@@ -810,8 +869,7 @@ class TestVRRP4(VppTestCase):
         vr.assert_state_equals(VRRP_VR_STATE_MASTER)
 
         self.pg0.enable_capture()
-        rx = self.pg0.wait_for_packet(timeout=intvl_s,
-                                      filter_out_fn=is_not_adv)
+        rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv)
 
         self.assertTrue(rx.haslayer(Ether))
         self.assertTrue(rx.haslayer(IP))
@@ -827,7 +885,7 @@ class TestVRRP4(VppTestCase):
 
 
 class TestVRRP6(VppTestCase):
-    """ IPv6 VRRP Test Case """
+    """IPv6 VRRP Test Case"""
 
     @classmethod
     def setUpClass(cls):
@@ -849,7 +907,7 @@ class TestVRRP6(VppTestCase):
             i.configure_ipv6_neighbors()
 
         self._vrs = []
-        self._default_flags = (VRRP_VR_FLAG_IPV6 | VRRP_VR_FLAG_PREEMPT)
+        self._default_flags = VRRP_VR_FLAG_IPV6 | VRRP_VR_FLAG_PREEMPT
         self._default_adv = 100
 
     def tearDown(self):
@@ -922,15 +980,15 @@ class TestVRRP6(VppTestCase):
     # become master and start advertising immediately.
     @unittest.skipUnless(config.extended, "part of extended tests")
     def test_vrrp6_master_adv(self):
-        """ IPv6 Master VR advertises """
+        """IPv6 Master VR advertises"""
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
 
         prio = 255
         intvl = self._default_adv
-        vr = VppVRRPVirtualRouter(self, self.pg0, 100,
-                                  prio=prio, intvl=intvl,
-                                  flags=self._default_flags)
+        vr = VppVRRPVirtualRouter(
+            self, self.pg0, 100, prio=prio, intvl=intvl, flags=self._default_flags
+        )
         self._vrs.append(vr)
 
         vr.add_vpp_config()
@@ -956,25 +1014,30 @@ class TestVRRP6(VppTestCase):
     # of parameters to test that too
     @unittest.skipUnless(config.extended, "part of extended tests")
     def test_vrrp6_master_adv_update(self):
-        """ IPv6 Master VR adv + Update to Backup """
+        """IPv6 Master VR adv + Update to Backup"""
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
 
         prio = 255
         intvl = self._default_adv
-        vr = VppVRRPVirtualRouter(self, self.pg0, 100,
-                                  prio=prio, intvl=intvl,
-                                  flags=self._default_flags)
+        vr = VppVRRPVirtualRouter(
+            self, self.pg0, 100, prio=prio, intvl=intvl, flags=self._default_flags
+        )
 
         vr.update_vpp_config()
         vr.start_stop(is_start=1)
         self.logger.info(self.vapi.cli("show vrrp vr"))
         # Update VR with lower prio and larger interval
         # we need to keep old VR for the adv checks
-        upd_vr = VppVRRPVirtualRouter(self, self.pg0, 100,
-                                      prio=100, intvl=2*intvl,
-                                      flags=self._default_flags,
-                                      vips=[self.pg0.remote_ip6])
+        upd_vr = VppVRRPVirtualRouter(
+            self,
+            self.pg0,
+            100,
+            prio=100,
+            intvl=2 * intvl,
+            flags=self._default_flags,
+            vips=[self.pg0.remote_ip6],
+        )
         upd_vr._vrrp_index = vr._vrrp_index
         upd_vr.update_vpp_config()
         start_time = time.time()
@@ -1000,7 +1063,8 @@ class TestVRRP6(VppTestCase):
         end_time = start_time + 2 * upd_vr.master_down_seconds()
         while time.time() < end_time:
             self.send_and_assert_no_replies(
-                self.pg0, pkts, timeout=0.01*upd_vr._intvl)
+                self.pg0, pkts, timeout=0.01 * upd_vr._intvl
+            )
             self.logger.info(self.vapi.cli("show trace"))
 
         vr.start_stop(is_start=0)
@@ -1010,7 +1074,7 @@ class TestVRRP6(VppTestCase):
     # long as it receives higher priority advertisements
     @unittest.skipUnless(config.extended, "part of extended tests")
     def test_vrrp6_backup_noadv(self):
-        """ IPv6 Backup VR does not advertise """
+        """IPv6 Backup VR does not advertise"""
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
 
@@ -1018,10 +1082,15 @@ class TestVRRP6(VppTestCase):
         prio = 100
         intvl = self._default_adv
         intvl_s = intvl * 0.01
-        vr = VppVRRPVirtualRouter(self, self.pg0, vr_id,
-                                  prio=prio, intvl=intvl,
-                                  flags=self._default_flags,
-                                  vips=[self.pg0.remote_ip6])
+        vr = VppVRRPVirtualRouter(
+            self,
+            self.pg0,
+            vr_id,
+            prio=prio,
+            intvl=intvl,
+            flags=self._default_flags,
+            vips=[self.pg0.remote_ip6],
+        )
         vr.add_vpp_config()
         self._vrs.append(vr)
 
@@ -1037,7 +1106,7 @@ class TestVRRP6(VppTestCase):
         # send higher prio advertisements, should not see VPP send any
         src_ip = self.pg0.remote_ip6_ll
         num_advs = 5
-        pkts = [vr.vrrp_adv_packet(prio=prio+10, src_ip=src_ip)]
+        pkts = [vr.vrrp_adv_packet(prio=prio + 10, src_ip=src_ip)]
         self.logger.info(self.vapi.cli("show vlib graph"))
         while time.time() < end_time:
             self.send_and_assert_no_replies(self.pg0, pkts, timeout=intvl_s)
@@ -1050,16 +1119,16 @@ class TestVRRP6(VppTestCase):
         self._vrs = []
 
     def test_vrrp6_master_nd(self):
-        """ IPv6 Master VR replies to NDP """
+        """IPv6 Master VR replies to NDP"""
         self.pg_start()
 
         # VR virtual IP is the default, which is the pg local IP
         vr_id = 100
         prio = 255
         intvl = self._default_adv
-        vr = VppVRRPVirtualRouter(self, self.pg0, 100,
-                                  prio=prio, intvl=intvl,
-                                  flags=self._default_flags)
+        vr = VppVRRPVirtualRouter(
+            self, self.pg0, 100, prio=prio, intvl=intvl, flags=self._default_flags
+        )
         vr.add_vpp_config()
         self._vrs.append(vr)
 
@@ -1082,7 +1151,7 @@ class TestVRRP6(VppTestCase):
 
     @unittest.skipUnless(config.extended, "part of extended tests")
     def test_vrrp6_backup_nond(self):
-        """ IPv6 Backup VR ignores NDP """
+        """IPv6 Backup VR ignores NDP"""
         # We need an address for a virtual IP that is not the IP that
         # ARP requests will originate from
 
@@ -1091,10 +1160,15 @@ class TestVRRP6(VppTestCase):
         intvl = self._default_adv
         intvl_s = intvl * 0.01
         vip = self.pg0.remote_hosts[1].ip6
-        vr = VppVRRPVirtualRouter(self, self.pg0, vr_id,
-                                  prio=prio, intvl=intvl,
-                                  flags=self._default_flags,
-                                  vips=[vip])
+        vr = VppVRRPVirtualRouter(
+            self,
+            self.pg0,
+            vr_id,
+            prio=prio,
+            intvl=intvl,
+            flags=self._default_flags,
+            vips=[vip],
+        )
         vr.add_vpp_config()
         self._vrs.append(vr)
 
@@ -1102,36 +1176,43 @@ class TestVRRP6(VppTestCase):
         dmac = in6_getnsmac(nsma)
         dst_ip = inet_ntop(socket.AF_INET6, nsma)
 
-        ndp_req = (Ether(dst=dmac, src=self.pg0.remote_mac) /
-                   IPv6(dst=dst_ip, src=self.pg0.remote_ip6) /
-                   ICMPv6ND_NS(tgt=vip) /
-                   ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac))
+        ndp_req = (
+            Ether(dst=dmac, src=self.pg0.remote_mac)
+            / IPv6(dst=dst_ip, src=self.pg0.remote_ip6)
+            / ICMPv6ND_NS(tgt=vip)
+            / ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac)
+        )
 
         # Before the VR is started make sure no reply to request for VIP
         self.send_and_assert_no_replies(self.pg0, [ndp_req], timeout=1)
 
         # VR should start in backup state and still should not reply to NDP
         # send a higher priority adv to make sure it does not become master
-        adv = vr.vrrp_adv_packet(prio=prio+10, src_ip=self.pg0.remote_ip6)
+        adv = vr.vrrp_adv_packet(prio=prio + 10, src_ip=self.pg0.remote_ip6)
         pkts = [adv, ndp_req]
         vr.start_stop(is_start=1)
-        self.send_and_assert_no_replies(self.pg0, pkts,  timeout=intvl_s)
+        self.send_and_assert_no_replies(self.pg0, pkts, timeout=intvl_s)
 
         vr.start_stop(is_start=0)
 
     @unittest.skipUnless(config.extended, "part of extended tests")
     def test_vrrp6_election(self):
-        """ IPv6 Backup VR becomes master if no advertisements received """
+        """IPv6 Backup VR becomes master if no advertisements received"""
 
         vr_id = 100
         prio = 100
         intvl = self._default_adv
         intvl_s = intvl * 0.01
         vip = self.pg0.remote_ip6
-        vr = VppVRRPVirtualRouter(self, self.pg0, vr_id,
-                                  prio=prio, intvl=intvl,
-                                  flags=self._default_flags,
-                                  vips=[vip])
+        vr = VppVRRPVirtualRouter(
+            self,
+            self.pg0,
+            vr_id,
+            prio=prio,
+            intvl=intvl,
+            flags=self._default_flags,
+            vips=[vip],
+        )
         self._vrs.append(vr)
         vr.add_vpp_config()
 
@@ -1158,17 +1239,22 @@ class TestVRRP6(VppTestCase):
 
     @unittest.skipUnless(config.extended, "part of extended tests")
     def test_vrrp6_backup_preempts(self):
-        """ IPv6 Backup VR preempts lower priority master """
+        """IPv6 Backup VR preempts lower priority master"""
 
         vr_id = 100
         prio = 100
         intvl = self._default_adv
         intvl_s = intvl * 0.01
         vip = self.pg0.remote_ip6
-        vr = VppVRRPVirtualRouter(self, self.pg0, vr_id,
-                                  prio=prio, intvl=intvl,
-                                  flags=self._default_flags,
-                                  vips=[vip])
+        vr = VppVRRPVirtualRouter(
+            self,
+            self.pg0,
+            vr_id,
+            prio=prio,
+            intvl=intvl,
+            flags=self._default_flags,
+            vips=[vip],
+        )
         self._vrs.append(vr)
         vr.add_vpp_config()
 
@@ -1184,7 +1270,7 @@ class TestVRRP6(VppTestCase):
 
         # send lower prio advertisements until timer expires
         src_ip = self.pg0.remote_ip6
-        pkts = [vr.vrrp_adv_packet(prio=prio-10, src_ip=src_ip)]
+        pkts = [vr.vrrp_adv_packet(prio=prio - 10, src_ip=src_ip)]
         while (time.time() + intvl_s) < end_time:
             self.send_and_assert_no_replies(self.pg0, pkts, timeout=intvl_s)
             self.logger.info(self.vapi.cli("show trace"))
@@ -1196,7 +1282,7 @@ class TestVRRP6(VppTestCase):
 
     @unittest.skipUnless(config.extended, "part of extended tests")
     def test_vrrp6_master_preempted(self):
-        """ IPv6 Master VR preempted by higher priority backup """
+        """IPv6 Master VR preempted by higher priority backup"""
 
         # A prio 255 VR cannot be preempted so the prio has to be lower and
         # we have to wait for it to take over
@@ -1204,10 +1290,15 @@ class TestVRRP6(VppTestCase):
         prio = 100
         intvl = self._default_adv
         vip = self.pg0.remote_ip6
-        vr = VppVRRPVirtualRouter(self, self.pg0, vr_id,
-                                  prio=prio, intvl=intvl,
-                                  flags=self._default_flags,
-                                  vips=[vip])
+        vr = VppVRRPVirtualRouter(
+            self,
+            self.pg0,
+            vr_id,
+            prio=prio,
+            intvl=intvl,
+            flags=self._default_flags,
+            vips=[vip],
+        )
         self._vrs.append(vr)
         vr.add_vpp_config()
 
@@ -1233,7 +1324,7 @@ class TestVRRP6(VppTestCase):
 
     @unittest.skipUnless(config.extended, "part of extended tests")
     def test_vrrp6_accept_mode_disabled(self):
-        """ IPv6 Master VR does not reply for VIP w/ accept mode off """
+        """IPv6 Master VR does not reply for VIP w/ accept mode off"""
 
         # accept mode only matters when prio < 255, so it will have to
         # come up as a backup and take over as master after the timeout
@@ -1241,10 +1332,15 @@ class TestVRRP6(VppTestCase):
         prio = 100
         intvl = self._default_adv
         vip = self.pg0.remote_hosts[4].ip6
-        vr = VppVRRPVirtualRouter(self, self.pg0, vr_id,
-                                  prio=prio, intvl=intvl,
-                                  flags=self._default_flags,
-                                  vips=[vip])
+        vr = VppVRRPVirtualRouter(
+            self,
+            self.pg0,
+            vr_id,
+            prio=prio,
+            intvl=intvl,
+            flags=self._default_flags,
+            vips=[vip],
+        )
         self._vrs.append(vr)
         vr.add_vpp_config()
 
@@ -1262,9 +1358,11 @@ class TestVRRP6(VppTestCase):
         vr.assert_state_equals(VRRP_VR_STATE_MASTER)
 
         # send an ICMPv6 echo to the VR virtual IP address
-        echo = (Ether(dst=vr.virtual_mac(), src=self.pg0.remote_mac) /
-                IPv6(dst=vip, src=self.pg0.remote_ip6) /
-                ICMPv6EchoRequest(seq=1, id=self.pg0.sw_if_index))
+        echo = (
+            Ether(dst=vr.virtual_mac(), src=self.pg0.remote_mac)
+            / IPv6(dst=vip, src=self.pg0.remote_ip6)
+            / ICMPv6EchoRequest(seq=1, id=self.pg0.sw_if_index)
+        )
         self.pg_send(self.pg0, [echo])
 
         # wait for an echo reply. none should be received
@@ -1273,7 +1371,7 @@ class TestVRRP6(VppTestCase):
 
     @unittest.skipUnless(config.extended, "part of extended tests")
     def test_vrrp6_accept_mode_enabled(self):
-        """ IPv6 Master VR replies for VIP w/ accept mode on """
+        """IPv6 Master VR replies for VIP w/ accept mode on"""
 
         # A prio 255 VR cannot be preempted so the prio has to be lower and
         # we have to wait for it to take over
@@ -1281,11 +1379,10 @@ class TestVRRP6(VppTestCase):
         prio = 100
         intvl = self._default_adv
         vip = self.pg0.remote_hosts[4].ip6
-        flags = (self._default_flags | VRRP_VR_FLAG_ACCEPT)
-        vr = VppVRRPVirtualRouter(self, self.pg0, vr_id,
-                                  prio=prio, intvl=intvl,
-                                  flags=flags,
-                                  vips=[vip])
+        flags = self._default_flags | VRRP_VR_FLAG_ACCEPT
+        vr = VppVRRPVirtualRouter(
+            self, self.pg0, vr_id, prio=prio, intvl=intvl, flags=flags, vips=[vip]
+        )
         self._vrs.append(vr)
         vr.add_vpp_config()
 
@@ -1303,15 +1400,18 @@ class TestVRRP6(VppTestCase):
         vr.assert_state_equals(VRRP_VR_STATE_MASTER)
 
         # send an ICMP echo to the VR virtual IP address
-        echo = (Ether(dst=vr.virtual_mac(), src=self.pg0.remote_mac) /
-                IPv6(dst=vip, src=self.pg0.remote_ip6) /
-                ICMPv6EchoRequest(seq=1, id=self.pg0.sw_if_index))
+        echo = (
+            Ether(dst=vr.virtual_mac(), src=self.pg0.remote_mac)
+            / IPv6(dst=vip, src=self.pg0.remote_ip6)
+            / ICMPv6EchoRequest(seq=1, id=self.pg0.sw_if_index)
+        )
         self.pg_send(self.pg0, [echo])
 
         # wait for an echo reply.
         time.sleep(1)
-        rx_pkts = self.pg0.get_capture(expected_count=1, timeout=1,
-                                       filter_out_fn=is_not_echo_reply)
+        rx_pkts = self.pg0.get_capture(
+            expected_count=1, timeout=1, filter_out_fn=is_not_echo_reply
+        )
 
         self.assertEqual(rx_pkts[0][IPv6].src, vip)
         self.assertEqual(rx_pkts[0][IPv6].dst, self.pg0.remote_ip6)
@@ -1320,17 +1420,22 @@ class TestVRRP6(VppTestCase):
 
     @unittest.skipUnless(config.extended, "part of extended tests")
     def test_vrrp6_intf_tracking(self):
-        """ IPv6 Master VR adjusts priority based on tracked interface """
+        """IPv6 Master VR adjusts priority based on tracked interface"""
 
         vr_id = 100
         prio = 255
         intvl = self._default_adv
         intvl_s = intvl * 0.01
         vip = self.pg0.local_ip6
-        vr = VppVRRPVirtualRouter(self, self.pg0, vr_id,
-                                  prio=prio, intvl=intvl,
-                                  flags=self._default_flags,
-                                  vips=[vip])
+        vr = VppVRRPVirtualRouter(
+            self,
+            self.pg0,
+            vr_id,
+            prio=prio,
+            intvl=intvl,
+            flags=self._default_flags,
+            vips=[vip],
+        )
         self._vrs.append(vr)
         vr.add_vpp_config()
 
@@ -1340,9 +1445,9 @@ class TestVRRP6(VppTestCase):
         # add pg1 as a tracked interface and start the VR
         adjustment = 50
         adjusted_prio = prio - adjustment
-        vr.add_del_tracked_interface(is_add=1,
-                                     sw_if_index=self.pg1.sw_if_index,
-                                     prio=adjustment)
+        vr.add_del_tracked_interface(
+            is_add=1, sw_if_index=self.pg1.sw_if_index, prio=adjustment
+        )
         vr.start_stop(is_start=1)
         vr.assert_state_equals(VRRP_VR_STATE_MASTER)
 
@@ -1351,53 +1456,47 @@ class TestVRRP6(VppTestCase):
 
         # tracked intf is up ->  advertised priority == configured priority
         self.pg0.enable_capture()
-        rx = self.pg0.wait_for_packet(timeout=intvl_s,
-                                      filter_out_fn=is_not_adv)
+        rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv)
         self.assertEqual(rx, adv_configured)
 
         # take down pg1, verify priority is now being adjusted
         self.pg1.admin_down()
         self.pg0.enable_capture()
-        rx = self.pg0.wait_for_packet(timeout=intvl_s,
-                                      filter_out_fn=is_not_adv)
+        rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv)
         self.assertEqual(rx, adv_adjusted)
 
         # bring up pg1, verify priority now matches configured value
         self.pg1.admin_up()
         self.pg0.enable_capture()
-        rx = self.pg0.wait_for_packet(timeout=intvl_s,
-                                      filter_out_fn=is_not_adv)
+        rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv)
         self.assertEqual(rx, adv_configured)
 
         # remove IP address from pg1, verify priority now being adjusted
         self.pg1.unconfig_ip6()
         self.pg0.enable_capture()
-        rx = self.pg0.wait_for_packet(timeout=intvl_s,
-                                      filter_out_fn=is_not_adv)
+        rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv)
         self.assertEqual(rx, adv_adjusted)
 
         # add IP address to pg1, verify priority now matches configured value
         self.pg1.config_ip6()
         self.pg0.enable_capture()
-        rx = self.pg0.wait_for_packet(timeout=intvl_s,
-                                      filter_out_fn=is_not_adv)
+        rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv)
         self.assertEqual(rx, adv_configured)
 
     @unittest.skipUnless(config.extended, "part of extended tests")
     def test_vrrp6_master_adv_unicast(self):
-        """ IPv6 Master VR advertises (unicast) """
+        """IPv6 Master VR advertises (unicast)"""
 
         vr_id = 100
         prio = 255
         intvl = self._default_adv
         intvl_s = intvl * 0.01
         vip = self.pg0.local_ip6
-        flags = (self._default_flags | VRRP_VR_FLAG_UNICAST)
+        flags = self._default_flags | VRRP_VR_FLAG_UNICAST
         unicast_peer = self.pg0.remote_hosts[4]
-        vr = VppVRRPVirtualRouter(self, self.pg0, vr_id,
-                                  prio=prio, intvl=intvl,
-                                  flags=flags,
-                                  vips=[vip])
+        vr = VppVRRPVirtualRouter(
+            self, self.pg0, vr_id, prio=prio, intvl=intvl, flags=flags, vips=[vip]
+        )
         self._vrs.append(vr)
         vr.add_vpp_config()
         vr.set_unicast_peers([unicast_peer.ip6])
@@ -1410,23 +1509,22 @@ class TestVRRP6(VppTestCase):
         vr.assert_state_equals(VRRP_VR_STATE_MASTER)
 
         self.pg0.enable_capture()
-        rx = self.pg0.wait_for_packet(timeout=intvl_s,
-                                      filter_out_fn=is_not_adv)
+        rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv)
 
         self.assertTrue(rx.haslayer(Ether))
         self.assertTrue(rx.haslayer(IPv6))
         self.assertTrue(rx.haslayer(VRRPv3))
         self.assertEqual(rx[Ether].src, self.pg0.local_mac)
         self.assertEqual(rx[Ether].dst, unicast_peer.mac)
-        self.assertEqual(ip6_normalize(rx[IPv6].src),
-                         ip6_normalize(self.pg0.local_ip6_ll))
-        self.assertEqual(ip6_normalize(rx[IPv6].dst),
-                         ip6_normalize(unicast_peer.ip6))
+        self.assertEqual(
+            ip6_normalize(rx[IPv6].src), ip6_normalize(self.pg0.local_ip6_ll)
+        )
+        self.assertEqual(ip6_normalize(rx[IPv6].dst), ip6_normalize(unicast_peer.ip6))
         self.assertEqual(rx[VRRPv3].vrid, vr_id)
         self.assertEqual(rx[VRRPv3].priority, prio)
         self.assertEqual(rx[VRRPv3].ipcount, 1)
         self.assertEqual(rx[VRRPv3].addrlist, [vip])
 
 
-if __name__ == '__main__':
+if __name__ == "__main__":
     unittest.main(testRunner=VppTestRunner)