X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=test%2Ftest_vrrp.py;h=8575016c326b91db049081e63bd5637c493517ad;hb=99c317a6066f825af002ca0c18caaef337ef0bcb;hp=1f28cf3d18d282999fffadc6529aa356f50a6e7e;hpb=b23ffd7ef216463c35b75c831e6a27e58971f4ec;p=vpp.git diff --git a/test/test_vrrp.py b/test/test_vrrp.py index 1f28cf3d18d..8575016c326 100644 --- a/test/test_vrrp.py +++ b/test/test_vrrp.py @@ -12,19 +12,28 @@ import socket from socket import inet_pton, inet_ntop from vpp_object import VppObject -from vpp_papi import VppEnum from scapy.packet import raw from scapy.layers.l2 import Ether, ARP from scapy.layers.inet import IP, ICMP, icmptypes -from scapy.layers.inet6 import IPv6, ipv6nh, IPv6ExtHdrHopByHop, \ - ICMPv6MLReport2, ICMPv6ND_NA, ICMPv6ND_NS, ICMPv6NDOptDstLLAddr, \ - ICMPv6NDOptSrcLLAddr, ICMPv6EchoRequest, ICMPv6EchoReply -from scapy.contrib.igmpv3 import IGMPv3, IGMPv3mr, IGMPv3gr +from scapy.layers.inet6 import ( + IPv6, + ipv6nh, + IPv6ExtHdrHopByHop, + ICMPv6MLReport2, + ICMPv6ND_NA, + ICMPv6ND_NS, + ICMPv6NDOptDstLLAddr, + ICMPv6NDOptSrcLLAddr, + ICMPv6EchoRequest, + ICMPv6EchoReply, +) +from scapy.contrib.igmpv3 import IGMPv3, IGMPv3mr from scapy.layers.vrrp import IPPROTO_VRRP, VRRPv3 from scapy.utils6 import in6_getnsma, in6_getnsmac from config import config -from framework import VppTestCase, VppTestRunner +from framework import VppTestCase +from asfframework import VppTestRunner from util import ip6_normalize VRRP_VR_FLAG_PREEMPT = 1 @@ -37,9 +46,11 @@ VRRP_VR_STATE_BACKUP = 1 VRRP_VR_STATE_MASTER = 2 VRRP_VR_STATE_INTF_DOWN = 3 +VRRP_INDEX_INVALID = 0xFFFFFFFF + def is_non_arp(p): - """ Want to filter out advertisements, igmp, etc""" + """Want to filter out advertisements, igmp, etc""" if p.haslayer(ARP): return False @@ -47,7 +58,7 @@ def is_non_arp(p): def is_not_adv(p): - """ Filter out everything but advertisements. E.g. multicast RD/ND """ + """Filter out everything but advertisements. E.g. multicast RD/ND""" if p.haslayer(VRRPv3): return False @@ -55,7 +66,7 @@ def is_not_adv(p): def is_not_echo_reply(p): - """ filter out advertisements and other while waiting for echo reply """ + """filter out advertisements and other while waiting for echo reply""" if p.haslayer(IP) and p.haslayer(ICMP): if icmptypes[p[ICMP].type] == "echo-reply": return False @@ -66,15 +77,16 @@ def is_not_echo_reply(p): class VppVRRPVirtualRouter(VppObject): - - def __init__(self, - test, - intf, - vr_id, - prio=100, - intvl=100, - flags=VRRP_VR_FLAG_PREEMPT, - vips=None): + def __init__( + self, + test, + intf, + vr_id, + prio=100, + intvl=100, + flags=VRRP_VR_FLAG_PREEMPT, + vips=None, + ): self._test = test self._intf = intf self._sw_if_index = self._intf.sw_if_index @@ -82,29 +94,48 @@ class VppVRRPVirtualRouter(VppObject): self._prio = prio self._intvl = intvl self._flags = flags - if (flags & VRRP_VR_FLAG_IPV6): + if flags & VRRP_VR_FLAG_IPV6: self._is_ipv6 = 1 self._adv_dest_mac = "33:33:00:00:00:12" self._virtual_mac = "00:00:5e:00:02:%02x" % vr_id self._adv_dest_ip = "ff02::12" - self._vips = ([intf.local_ip6] if vips is None else vips) + self._vips = [intf.local_ip6] if vips is None else vips else: self._is_ipv6 = 0 self._adv_dest_mac = "01:00:5e:00:00:12" self._virtual_mac = "00:00:5e:00:01:%02x" % vr_id self._adv_dest_ip = "224.0.0.18" - self._vips = ([intf.local_ip4] if vips is None else vips) + self._vips = [intf.local_ip4] if vips is None else vips self._tracked_ifs = [] + self._vrrp_index = VRRP_INDEX_INVALID def add_vpp_config(self): - self._test.vapi.vrrp_vr_add_del(is_add=1, - sw_if_index=self._intf.sw_if_index, - vr_id=self._vr_id, - priority=self._prio, - interval=self._intvl, - flags=self._flags, - n_addrs=len(self._vips), - addrs=self._vips) + self._test.vapi.vrrp_vr_add_del( + is_add=1, + sw_if_index=self._intf.sw_if_index, + vr_id=self._vr_id, + priority=self._prio, + interval=self._intvl, + flags=self._flags, + n_addrs=len(self._vips), + addrs=self._vips, + ) + + def update_vpp_config(self): + r = self._test.vapi.vrrp_vr_update( + vrrp_index=self._vrrp_index, + sw_if_index=self._intf.sw_if_index, + vr_id=self._vr_id, + priority=self._prio, + interval=self._intvl, + flags=self._flags, + n_addrs=len(self._vips), + addrs=self._vips, + ) + self._vrrp_index = r.vrrp_index + + def delete_vpp_config(self): + self._test.vapi.vrrp_vr_del(vrrp_index=self._vrrp_index) def query_vpp_config(self): vrs = self._test.vapi.vrrp_vr_dump(sw_if_index=self._intf.sw_if_index) @@ -112,7 +143,7 @@ class VppVRRPVirtualRouter(VppObject): if vr.config.vr_id != self._vr_id: continue - is_ipv6 = (1 if (vr.config.flags & VRRP_VR_FLAG_IPV6) else 0) + is_ipv6 = 1 if (vr.config.flags & VRRP_VR_FLAG_IPV6) else 0 if is_ipv6 != self._is_ipv6: continue @@ -121,41 +152,45 @@ class VppVRRPVirtualRouter(VppObject): return None def remove_vpp_config(self): - self._test.vapi.vrrp_vr_add_del(is_add=0, - sw_if_index=self._intf.sw_if_index, - vr_id=self._vr_id, - priority=self._prio, - interval=self._intvl, - flags=self._flags, - n_addrs=len(self._vips), - addrs=self._vips) + self._test.vapi.vrrp_vr_add_del( + is_add=0, + sw_if_index=self._intf.sw_if_index, + vr_id=self._vr_id, + priority=self._prio, + interval=self._intvl, + flags=self._flags, + n_addrs=len(self._vips), + addrs=self._vips, + ) def start_stop(self, is_start): - self._test.vapi.vrrp_vr_start_stop(is_start=is_start, - sw_if_index=self._intf.sw_if_index, - vr_id=self._vr_id, - is_ipv6=self._is_ipv6) - self._start_time = (time.time() if is_start else None) + self._test.vapi.vrrp_vr_start_stop( + is_start=is_start, + sw_if_index=self._intf.sw_if_index, + vr_id=self._vr_id, + is_ipv6=self._is_ipv6, + ) + self._start_time = time.time() if is_start else None def add_del_tracked_interface(self, is_add, sw_if_index, prio): args = { - 'sw_if_index': self._intf.sw_if_index, - 'is_ipv6': self._is_ipv6, - 'vr_id': self._vr_id, - 'is_add': is_add, - 'n_ifs': 1, - 'ifs': [{'sw_if_index': sw_if_index, 'priority': prio}] + "sw_if_index": self._intf.sw_if_index, + "is_ipv6": self._is_ipv6, + "vr_id": self._vr_id, + "is_add": is_add, + "n_ifs": 1, + "ifs": [{"sw_if_index": sw_if_index, "priority": prio}], } self._test.vapi.vrrp_vr_track_if_add_del(**args) - self._tracked_ifs.append(args['ifs'][0]) + self._tracked_ifs.append(args["ifs"][0]) def set_unicast_peers(self, addrs): args = { - 'sw_if_index': self._intf.sw_if_index, - 'is_ipv6': self._is_ipv6, - 'vr_id': self._vr_id, - 'n_addrs': len(addrs), - 'addrs': addrs + "sw_if_index": self._intf.sw_if_index, + "is_ipv6": self._is_ipv6, + "vr_id": self._vr_id, + "n_addrs": len(addrs), + "addrs": addrs, } self._test.vapi.vrrp_vr_set_peers(**args) self._unicast_peers = addrs @@ -193,21 +228,22 @@ class VppVRRPVirtualRouter(VppObject): def master_down_seconds(self): vr_details = self.query_vpp_config() - return (vr_details.runtime.master_down_int * 0.01) + return vr_details.runtime.master_down_int * 0.01 def vrrp_adv_packet(self, prio=None, src_ip=None): dst_ip = self._adv_dest_ip if prio is None: prio = self._prio eth = Ether(dst=self._adv_dest_mac, src=self._virtual_mac) - vrrp = VRRPv3(vrid=self._vr_id, priority=prio, - ipcount=len(self._vips), adv=self._intvl) + vrrp = VRRPv3( + vrid=self._vr_id, priority=prio, ipcount=len(self._vips), adv=self._intvl + ) if self._is_ipv6: - src_ip = (self._intf.local_ip6_ll if src_ip is None else src_ip) + src_ip = self._intf.local_ip6_ll if src_ip is None else src_ip ip = IPv6(src=src_ip, dst=dst_ip, nh=IPPROTO_VRRP, hlim=255) vrrp.addrlist = self._vips else: - src_ip = (self._intf.local_ip4 if src_ip is None else src_ip) + src_ip = self._intf.local_ip4 if src_ip is None else src_ip ip = IP(src=src_ip, dst=dst_ip, proto=IPPROTO_VRRP, ttl=255, id=0) vrrp.addrlist = self._vips @@ -216,9 +252,8 @@ class VppVRRPVirtualRouter(VppObject): return pkt -@unittest.skipUnless(config.extended, "part of extended tests") class TestVRRP4(VppTestCase): - """ IPv4 VRRP Test Case """ + """IPv4 VRRP Test Case""" @classmethod def setUpClass(cls): @@ -268,8 +303,7 @@ class TestVRRP4(VppTestCase): self.assertEqual(ip.proto, 2) igmp = pkt[IGMPv3] - self.assertEqual(IGMPv3.igmpv3types[igmp.type], - "Version 3 Membership Report") + self.assertEqual(IGMPv3.igmpv3types[igmp.type], "Version 3 Membership Report") igmpmr = pkt[IGMPv3mr] self.assertEqual(igmpmr.numgrp, 1) @@ -312,16 +346,17 @@ class TestVRRP4(VppTestCase): # VR with priority 255 owns the virtual address and should # become master and start advertising immediately. + @unittest.skipUnless(config.extended, "part of extended tests") def test_vrrp4_master_adv(self): - """ IPv4 Master VR advertises """ + """IPv4 Master VR advertises""" self.pg_enable_capture(self.pg_interfaces) self.pg_start() prio = 255 intvl = self._default_adv - vr = VppVRRPVirtualRouter(self, self.pg0, 100, - prio=prio, intvl=intvl, - flags=self._default_flags) + vr = VppVRRPVirtualRouter( + self, self.pg0, 100, prio=prio, intvl=intvl, flags=self._default_flags + ) vr.add_vpp_config() vr.start_stop(is_start=1) @@ -341,10 +376,67 @@ class TestVRRP4(VppTestCase): vr.remove_vpp_config() self._vrs = [] + # Same as above but with the update API, and add a change + # of parameters to test that too + @unittest.skipUnless(config.extended, "part of extended tests") + def test_vrrp4_master_adv_update(self): + """IPv4 Master VR adv + Update to Backup""" + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + + prio = 255 + intvl = self._default_adv + vr = VppVRRPVirtualRouter( + self, self.pg0, 100, prio=prio, intvl=intvl, flags=self._default_flags + ) + + vr.update_vpp_config() + vr.start_stop(is_start=1) + self.logger.info(self.vapi.cli("show vrrp vr")) + # Update VR with lower prio and larger interval + # we need to keep old VR for the adv checks + upd_vr = VppVRRPVirtualRouter( + self, + self.pg0, + 100, + prio=100, + intvl=2 * intvl, + flags=self._default_flags, + vips=[self.pg0.remote_ip4], + ) + upd_vr._vrrp_index = vr._vrrp_index + upd_vr.update_vpp_config() + start_time = time.time() + self.logger.info(self.vapi.cli("show vrrp vr")) + upd_vr.assert_state_equals(VRRP_VR_STATE_BACKUP) + self._vrs = [upd_vr] + + pkts = self.pg0.get_capture(5) + # Init -> Master: IGMP Join, VRRP adv, gratuitous ARP are sent + self.verify_vrrp4_igmp(pkts[0]) + self.verify_vrrp4_adv(pkts[1], vr, prio=prio) + self.verify_vrrp4_garp(pkts[2], vr.virtual_ips()[0], vr.virtual_mac()) + # Master -> Init: Adv with priority 0 sent to force an election + self.verify_vrrp4_adv(pkts[3], vr, prio=0) + # Init -> Backup: An IGMP join should be sent + self.verify_vrrp4_igmp(pkts[4]) + + # send higher prio advertisements, should not receive any + end_time = start_time + 2 * upd_vr.master_down_seconds() + src_ip = self.pg0.remote_ip4 + pkts = [upd_vr.vrrp_adv_packet(prio=110, src_ip=src_ip)] + while time.time() < end_time: + self.send_and_assert_no_replies(self.pg0, pkts, timeout=intvl * 0.01) + self.logger.info(self.vapi.cli("show trace")) + + upd_vr.start_stop(is_start=0) + self.logger.info(self.vapi.cli("show vrrp vr")) + # VR with priority < 255 enters backup state and does not advertise as # long as it receives higher priority advertisements + @unittest.skipUnless(config.extended, "part of extended tests") def test_vrrp4_backup_noadv(self): - """ IPv4 Backup VR does not advertise """ + """IPv4 Backup VR does not advertise""" self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -352,10 +444,15 @@ class TestVRRP4(VppTestCase): prio = 100 intvl = self._default_adv intvl_s = intvl * 0.01 - vr = VppVRRPVirtualRouter(self, self.pg0, vr_id, - prio=prio, intvl=intvl, - flags=self._default_flags, - vips=[self.pg0.remote_ip4]) + vr = VppVRRPVirtualRouter( + self, + self.pg0, + vr_id, + prio=prio, + intvl=intvl, + flags=self._default_flags, + vips=[self.pg0.remote_ip4], + ) self._vrs.append(vr) vr.add_vpp_config() @@ -371,7 +468,7 @@ class TestVRRP4(VppTestCase): # send higher prio advertisements, should not receive any src_ip = self.pg0.remote_ip4 - pkts = [vr.vrrp_adv_packet(prio=prio+10, src_ip=src_ip)] + pkts = [vr.vrrp_adv_packet(prio=prio + 10, src_ip=src_ip)] while time.time() < end_time: self.send_and_assert_no_replies(self.pg0, pkts, timeout=intvl_s) self.logger.info(self.vapi.cli("show trace")) @@ -382,16 +479,16 @@ class TestVRRP4(VppTestCase): self._vrs = [] def test_vrrp4_master_arp(self): - """ IPv4 Master VR replies to ARP """ + """IPv4 Master VR replies to ARP""" self.pg_start() # VR virtual IP is the default, which is the pg local IP vr_id = 100 prio = 255 intvl = self._default_adv - vr = VppVRRPVirtualRouter(self, self.pg0, 100, - prio=prio, intvl=intvl, - flags=self._default_flags) + vr = VppVRRPVirtualRouter( + self, self.pg0, 100, prio=prio, intvl=intvl, flags=self._default_flags + ) self._vrs.append(vr) vr.add_vpp_config() @@ -413,8 +510,9 @@ class TestVRRP4(VppTestCase): vr.remove_vpp_config() self._vrs = [] + @unittest.skipUnless(config.extended, "part of extended tests") def test_vrrp4_backup_noarp(self): - """ IPv4 Backup VR ignores ARP """ + """IPv4 Backup VR ignores ARP""" # We need an address for a virtual IP that is not the IP that # ARP requests will originate from @@ -422,16 +520,24 @@ class TestVRRP4(VppTestCase): prio = 100 intvl = self._default_adv vip = self.pg0.remote_hosts[1].ip4 - vr = VppVRRPVirtualRouter(self, self.pg0, vr_id, - prio=prio, intvl=intvl, - flags=self._default_flags, - vips=[vip]) + vr = VppVRRPVirtualRouter( + self, + self.pg0, + vr_id, + prio=prio, + intvl=intvl, + flags=self._default_flags, + vips=[vip], + ) self._vrs.append(vr) vr.add_vpp_config() - arp_req = (Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg0.remote_mac) / - ARP(op=ARP.who_has, pdst=vip, - psrc=self.pg0.remote_ip4, hwsrc=self.pg0.remote_mac)) + arp_req = Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg0.remote_mac) / ARP( + op=ARP.who_has, + pdst=vip, + psrc=self.pg0.remote_ip4, + hwsrc=self.pg0.remote_mac, + ) # Before the VR is started make sure no reply to request for VIP self.pg_start() @@ -440,7 +546,7 @@ class TestVRRP4(VppTestCase): # VR should start in backup state and still should not reply to ARP # send a higher priority adv to make sure it does not become master - adv = vr.vrrp_adv_packet(prio=prio+10, src_ip=self.pg0.remote_ip4) + adv = vr.vrrp_adv_packet(prio=prio + 10, src_ip=self.pg0.remote_ip4) vr.start_stop(is_start=1) self.send_and_assert_no_replies(self.pg0, [adv, arp_req], timeout=1) @@ -448,18 +554,24 @@ class TestVRRP4(VppTestCase): vr.remove_vpp_config() self._vrs = [] + @unittest.skipUnless(config.extended, "part of extended tests") def test_vrrp4_election(self): - """ IPv4 Backup VR becomes master if no advertisements received """ + """IPv4 Backup VR becomes master if no advertisements received""" vr_id = 100 prio = 100 intvl = self._default_adv intvl_s = intvl * 0.01 vip = self.pg0.remote_ip4 - vr = VppVRRPVirtualRouter(self, self.pg0, vr_id, - prio=prio, intvl=intvl, - flags=self._default_flags, - vips=[vip]) + vr = VppVRRPVirtualRouter( + self, + self.pg0, + vr_id, + prio=prio, + intvl=intvl, + flags=self._default_flags, + vips=[vip], + ) self._vrs.append(vr) vr.add_vpp_config() @@ -484,18 +596,24 @@ class TestVRRP4(VppTestCase): self.pg0.wait_for_packet(intvl_s, is_not_adv) vr.assert_state_equals(VRRP_VR_STATE_MASTER) + @unittest.skipUnless(config.extended, "part of extended tests") def test_vrrp4_backup_preempts(self): - """ IPv4 Backup VR preempts lower priority master """ + """IPv4 Backup VR preempts lower priority master""" vr_id = 100 prio = 100 intvl = self._default_adv intvl_s = intvl * 0.01 vip = self.pg0.remote_ip4 - vr = VppVRRPVirtualRouter(self, self.pg0, vr_id, - prio=prio, intvl=intvl, - flags=self._default_flags, - vips=[vip]) + vr = VppVRRPVirtualRouter( + self, + self.pg0, + vr_id, + prio=prio, + intvl=intvl, + flags=self._default_flags, + vips=[vip], + ) self._vrs.append(vr) vr.add_vpp_config() @@ -511,7 +629,7 @@ class TestVRRP4(VppTestCase): # send lower prio advertisements until timer expires src_ip = self.pg0.remote_ip4 - pkts = [vr.vrrp_adv_packet(prio=prio-10, src_ip=src_ip)] + pkts = [vr.vrrp_adv_packet(prio=prio - 10, src_ip=src_ip)] while time.time() + intvl_s < end_time: self.send_and_assert_no_replies(self.pg0, pkts, timeout=intvl_s) self.logger.info(self.vapi.cli("show trace")) @@ -521,8 +639,9 @@ class TestVRRP4(VppTestCase): self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv) vr.assert_state_equals(VRRP_VR_STATE_MASTER) + @unittest.skipUnless(config.extended, "part of extended tests") def test_vrrp4_master_preempted(self): - """ IPv4 Master VR preempted by higher priority backup """ + """IPv4 Master VR preempted by higher priority backup""" # A prio 255 VR cannot be preempted so the prio has to be lower and # we have to wait for it to take over @@ -530,10 +649,15 @@ class TestVRRP4(VppTestCase): prio = 100 intvl = self._default_adv vip = self.pg0.remote_ip4 - vr = VppVRRPVirtualRouter(self, self.pg0, vr_id, - prio=prio, intvl=intvl, - flags=self._default_flags, - vips=[vip]) + vr = VppVRRPVirtualRouter( + self, + self.pg0, + vr_id, + prio=prio, + intvl=intvl, + flags=self._default_flags, + vips=[vip], + ) self._vrs.append(vr) vr.add_vpp_config() @@ -557,8 +681,9 @@ class TestVRRP4(VppTestCase): # VR should be in backup state again vr.assert_state_equals(VRRP_VR_STATE_BACKUP) + @unittest.skipUnless(config.extended, "part of extended tests") def test_vrrp4_accept_mode_disabled(self): - """ IPv4 Master VR does not reply for VIP w/ accept mode off """ + """IPv4 Master VR does not reply for VIP w/ accept mode off""" # accept mode only matters when prio < 255, so it will have to # come up as a backup and take over as master after the timeout @@ -566,10 +691,15 @@ class TestVRRP4(VppTestCase): prio = 100 intvl = self._default_adv vip = self.pg0.remote_hosts[4].ip4 - vr = VppVRRPVirtualRouter(self, self.pg0, vr_id, - prio=prio, intvl=intvl, - flags=self._default_flags, - vips=[vip]) + vr = VppVRRPVirtualRouter( + self, + self.pg0, + vr_id, + prio=prio, + intvl=intvl, + flags=self._default_flags, + vips=[vip], + ) self._vrs.append(vr) vr.add_vpp_config() @@ -587,17 +717,20 @@ class TestVRRP4(VppTestCase): vr.assert_state_equals(VRRP_VR_STATE_MASTER) # send an ICMP echo to the VR virtual IP address - echo = (Ether(dst=vr.virtual_mac(), src=self.pg0.remote_mac) / - IP(dst=vip, src=self.pg0.remote_ip4) / - ICMP(seq=1, id=self.pg0.sw_if_index, type='echo-request')) + echo = ( + Ether(dst=vr.virtual_mac(), src=self.pg0.remote_mac) + / IP(dst=vip, src=self.pg0.remote_ip4) + / ICMP(seq=1, id=self.pg0.sw_if_index, type="echo-request") + ) self.pg_send(self.pg0, [echo]) # wait for an echo reply. none should be received time.sleep(1) self.pg0.assert_nothing_captured(filter_out_fn=is_not_echo_reply) + @unittest.skipUnless(config.extended, "part of extended tests") def test_vrrp4_accept_mode_enabled(self): - """ IPv4 Master VR replies for VIP w/ accept mode on """ + """IPv4 Master VR replies for VIP w/ accept mode on""" # A prio 255 VR cannot be preempted so the prio has to be lower and # we have to wait for it to take over @@ -605,11 +738,10 @@ class TestVRRP4(VppTestCase): prio = 100 intvl = self._default_adv vip = self.pg0.remote_hosts[4].ip4 - flags = (VRRP_VR_FLAG_PREEMPT | VRRP_VR_FLAG_ACCEPT) - vr = VppVRRPVirtualRouter(self, self.pg0, vr_id, - prio=prio, intvl=intvl, - flags=flags, - vips=[vip]) + flags = VRRP_VR_FLAG_PREEMPT | VRRP_VR_FLAG_ACCEPT + vr = VppVRRPVirtualRouter( + self, self.pg0, vr_id, prio=prio, intvl=intvl, flags=flags, vips=[vip] + ) self._vrs.append(vr) vr.add_vpp_config() @@ -627,15 +759,18 @@ class TestVRRP4(VppTestCase): vr.assert_state_equals(VRRP_VR_STATE_MASTER) # send an ICMP echo to the VR virtual IP address - echo = (Ether(dst=vr.virtual_mac(), src=self.pg0.remote_mac) / - IP(dst=vip, src=self.pg0.remote_ip4) / - ICMP(seq=1, id=self.pg0.sw_if_index, type='echo-request')) + echo = ( + Ether(dst=vr.virtual_mac(), src=self.pg0.remote_mac) + / IP(dst=vip, src=self.pg0.remote_ip4) + / ICMP(seq=1, id=self.pg0.sw_if_index, type="echo-request") + ) self.pg_send(self.pg0, [echo]) # wait for an echo reply. time.sleep(1) - rx_pkts = self.pg0.get_capture(expected_count=1, timeout=1, - filter_out_fn=is_not_echo_reply) + rx_pkts = self.pg0.get_capture( + expected_count=1, timeout=1, filter_out_fn=is_not_echo_reply + ) self.assertEqual(rx_pkts[0][IP].src, vip) self.assertEqual(rx_pkts[0][IP].dst, self.pg0.remote_ip4) @@ -643,18 +778,24 @@ class TestVRRP4(VppTestCase): self.assertEqual(rx_pkts[0][ICMP].seq, 1) self.assertEqual(rx_pkts[0][ICMP].id, self.pg0.sw_if_index) + @unittest.skipUnless(config.extended, "part of extended tests") def test_vrrp4_intf_tracking(self): - """ IPv4 Master VR adjusts priority based on tracked interface """ + """IPv4 Master VR adjusts priority based on tracked interface""" vr_id = 100 prio = 255 intvl = self._default_adv intvl_s = intvl * 0.01 vip = self.pg0.local_ip4 - vr = VppVRRPVirtualRouter(self, self.pg0, vr_id, - prio=prio, intvl=intvl, - flags=self._default_flags, - vips=[vip]) + vr = VppVRRPVirtualRouter( + self, + self.pg0, + vr_id, + prio=prio, + intvl=intvl, + flags=self._default_flags, + vips=[vip], + ) self._vrs.append(vr) vr.add_vpp_config() @@ -664,9 +805,9 @@ class TestVRRP4(VppTestCase): # add pg1 as a tracked interface and start the VR adjustment = 50 adjusted_prio = prio - adjustment - vr.add_del_tracked_interface(is_add=1, - sw_if_index=self.pg1.sw_if_index, - prio=adjustment) + vr.add_del_tracked_interface( + is_add=1, sw_if_index=self.pg1.sw_if_index, prio=adjustment + ) vr.start_stop(is_start=1) vr.assert_state_equals(VRRP_VR_STATE_MASTER) @@ -675,52 +816,47 @@ class TestVRRP4(VppTestCase): # tracked intf is up -> advertised priority == configured priority self.pg0.enable_capture() - rx = self.pg0.wait_for_packet(timeout=intvl_s, - filter_out_fn=is_not_adv) + rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv) self.assertEqual(rx, adv_configured) # take down pg1, verify priority is now being adjusted self.pg1.admin_down() self.pg0.enable_capture() - rx = self.pg0.wait_for_packet(timeout=intvl_s, - filter_out_fn=is_not_adv) + rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv) self.assertEqual(rx, adv_adjusted) # bring up pg1, verify priority now matches configured value self.pg1.admin_up() self.pg0.enable_capture() - rx = self.pg0.wait_for_packet(timeout=intvl_s, - filter_out_fn=is_not_adv) + rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv) self.assertEqual(rx, adv_configured) # remove IP address from pg1, verify priority now being adjusted self.pg1.unconfig_ip4() self.pg0.enable_capture() - rx = self.pg0.wait_for_packet(timeout=intvl_s, - filter_out_fn=is_not_adv) + rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv) self.assertEqual(rx, adv_adjusted) # add IP address to pg1, verify priority now matches configured value self.pg1.config_ip4() self.pg0.enable_capture() - rx = self.pg0.wait_for_packet(timeout=intvl_s, - filter_out_fn=is_not_adv) + rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv) self.assertEqual(rx, adv_configured) + @unittest.skipUnless(config.extended, "part of extended tests") def test_vrrp4_master_adv_unicast(self): - """ IPv4 Master VR advertises (unicast) """ + """IPv4 Master VR advertises (unicast)""" vr_id = 100 prio = 255 intvl = self._default_adv intvl_s = intvl * 0.01 vip = self.pg0.local_ip4 - flags = (self._default_flags | VRRP_VR_FLAG_UNICAST) + flags = self._default_flags | VRRP_VR_FLAG_UNICAST unicast_peer = self.pg0.remote_hosts[4] - vr = VppVRRPVirtualRouter(self, self.pg0, vr_id, - prio=prio, intvl=intvl, - flags=flags, - vips=[vip]) + vr = VppVRRPVirtualRouter( + self, self.pg0, vr_id, prio=prio, intvl=intvl, flags=flags, vips=[vip] + ) self._vrs.append(vr) vr.add_vpp_config() vr.set_unicast_peers([unicast_peer.ip4]) @@ -733,8 +869,7 @@ class TestVRRP4(VppTestCase): vr.assert_state_equals(VRRP_VR_STATE_MASTER) self.pg0.enable_capture() - rx = self.pg0.wait_for_packet(timeout=intvl_s, - filter_out_fn=is_not_adv) + rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv) self.assertTrue(rx.haslayer(Ether)) self.assertTrue(rx.haslayer(IP)) @@ -749,9 +884,8 @@ class TestVRRP4(VppTestCase): self.assertEqual(rx[VRRPv3].addrlist, [vip]) -@unittest.skipUnless(config.extended, "part of extended tests") class TestVRRP6(VppTestCase): - """ IPv6 VRRP Test Case """ + """IPv6 VRRP Test Case""" @classmethod def setUpClass(cls): @@ -773,7 +907,7 @@ class TestVRRP6(VppTestCase): i.configure_ipv6_neighbors() self._vrs = [] - self._default_flags = (VRRP_VR_FLAG_IPV6 | VRRP_VR_FLAG_PREEMPT) + self._default_flags = VRRP_VR_FLAG_IPV6 | VRRP_VR_FLAG_PREEMPT self._default_adv = 100 def tearDown(self): @@ -844,16 +978,17 @@ class TestVRRP6(VppTestCase): # VR with priority 255 owns the virtual address and should # become master and start advertising immediately. + @unittest.skipUnless(config.extended, "part of extended tests") def test_vrrp6_master_adv(self): - """ IPv6 Master VR advertises """ + """IPv6 Master VR advertises""" self.pg_enable_capture(self.pg_interfaces) self.pg_start() prio = 255 intvl = self._default_adv - vr = VppVRRPVirtualRouter(self, self.pg0, 100, - prio=prio, intvl=intvl, - flags=self._default_flags) + vr = VppVRRPVirtualRouter( + self, self.pg0, 100, prio=prio, intvl=intvl, flags=self._default_flags + ) self._vrs.append(vr) vr.add_vpp_config() @@ -875,10 +1010,71 @@ class TestVRRP6(VppTestCase): vr.remove_vpp_config() self._vrs = [] + # Same as above but with the update API, and add a change + # of parameters to test that too + @unittest.skipUnless(config.extended, "part of extended tests") + def test_vrrp6_master_adv_update(self): + """IPv6 Master VR adv + Update to Backup""" + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + + prio = 255 + intvl = self._default_adv + vr = VppVRRPVirtualRouter( + self, self.pg0, 100, prio=prio, intvl=intvl, flags=self._default_flags + ) + + vr.update_vpp_config() + vr.start_stop(is_start=1) + self.logger.info(self.vapi.cli("show vrrp vr")) + # Update VR with lower prio and larger interval + # we need to keep old VR for the adv checks + upd_vr = VppVRRPVirtualRouter( + self, + self.pg0, + 100, + prio=100, + intvl=2 * intvl, + flags=self._default_flags, + vips=[self.pg0.remote_ip6], + ) + upd_vr._vrrp_index = vr._vrrp_index + upd_vr.update_vpp_config() + start_time = time.time() + self.logger.info(self.vapi.cli("show vrrp vr")) + upd_vr.assert_state_equals(VRRP_VR_STATE_BACKUP) + self._vrs = [upd_vr] + + pkts = self.pg0.get_capture(5, filter_out_fn=None) + + # Init -> Master: Multicast group Join, VRRP adv, gratuitous NAs sent + self.verify_vrrp6_mlr(pkts[0], vr) + self.verify_vrrp6_adv(pkts[1], vr, prio=prio) + self.verify_vrrp6_gna(pkts[2], vr) + # Master -> Init: Adv with priority 0 sent to force an election + self.verify_vrrp6_adv(pkts[3], vr, prio=0) + # Init -> Backup: A multicast listener report should be sent + # not actually verified in the test below, where I took this from + + # send higher prio advertisements, should not see VPP send any + src_ip = self.pg0.remote_ip6_ll + pkts = [upd_vr.vrrp_adv_packet(prio=110, src_ip=src_ip)] + self.logger.info(self.vapi.cli("show vlib graph")) + end_time = start_time + 2 * upd_vr.master_down_seconds() + while time.time() < end_time: + self.send_and_assert_no_replies( + self.pg0, pkts, timeout=0.01 * upd_vr._intvl + ) + self.logger.info(self.vapi.cli("show trace")) + + vr.start_stop(is_start=0) + self.logger.info(self.vapi.cli("show vrrp vr")) + # VR with priority < 255 enters backup state and does not advertise as # long as it receives higher priority advertisements + @unittest.skipUnless(config.extended, "part of extended tests") def test_vrrp6_backup_noadv(self): - """ IPv6 Backup VR does not advertise """ + """IPv6 Backup VR does not advertise""" self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -886,10 +1082,15 @@ class TestVRRP6(VppTestCase): prio = 100 intvl = self._default_adv intvl_s = intvl * 0.01 - vr = VppVRRPVirtualRouter(self, self.pg0, vr_id, - prio=prio, intvl=intvl, - flags=self._default_flags, - vips=[self.pg0.remote_ip6]) + vr = VppVRRPVirtualRouter( + self, + self.pg0, + vr_id, + prio=prio, + intvl=intvl, + flags=self._default_flags, + vips=[self.pg0.remote_ip6], + ) vr.add_vpp_config() self._vrs.append(vr) @@ -905,7 +1106,7 @@ class TestVRRP6(VppTestCase): # send higher prio advertisements, should not see VPP send any src_ip = self.pg0.remote_ip6_ll num_advs = 5 - pkts = [vr.vrrp_adv_packet(prio=prio+10, src_ip=src_ip)] + pkts = [vr.vrrp_adv_packet(prio=prio + 10, src_ip=src_ip)] self.logger.info(self.vapi.cli("show vlib graph")) while time.time() < end_time: self.send_and_assert_no_replies(self.pg0, pkts, timeout=intvl_s) @@ -918,16 +1119,16 @@ class TestVRRP6(VppTestCase): self._vrs = [] def test_vrrp6_master_nd(self): - """ IPv6 Master VR replies to NDP """ + """IPv6 Master VR replies to NDP""" self.pg_start() # VR virtual IP is the default, which is the pg local IP vr_id = 100 prio = 255 intvl = self._default_adv - vr = VppVRRPVirtualRouter(self, self.pg0, 100, - prio=prio, intvl=intvl, - flags=self._default_flags) + vr = VppVRRPVirtualRouter( + self, self.pg0, 100, prio=prio, intvl=intvl, flags=self._default_flags + ) vr.add_vpp_config() self._vrs.append(vr) @@ -948,8 +1149,9 @@ class TestVRRP6(VppTestCase): vr.remove_vpp_config() self._vrs = [] + @unittest.skipUnless(config.extended, "part of extended tests") def test_vrrp6_backup_nond(self): - """ IPv6 Backup VR ignores NDP """ + """IPv6 Backup VR ignores NDP""" # We need an address for a virtual IP that is not the IP that # ARP requests will originate from @@ -958,10 +1160,15 @@ class TestVRRP6(VppTestCase): intvl = self._default_adv intvl_s = intvl * 0.01 vip = self.pg0.remote_hosts[1].ip6 - vr = VppVRRPVirtualRouter(self, self.pg0, vr_id, - prio=prio, intvl=intvl, - flags=self._default_flags, - vips=[vip]) + vr = VppVRRPVirtualRouter( + self, + self.pg0, + vr_id, + prio=prio, + intvl=intvl, + flags=self._default_flags, + vips=[vip], + ) vr.add_vpp_config() self._vrs.append(vr) @@ -969,35 +1176,43 @@ class TestVRRP6(VppTestCase): dmac = in6_getnsmac(nsma) dst_ip = inet_ntop(socket.AF_INET6, nsma) - ndp_req = (Ether(dst=dmac, src=self.pg0.remote_mac) / - IPv6(dst=dst_ip, src=self.pg0.remote_ip6) / - ICMPv6ND_NS(tgt=vip) / - ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac)) + ndp_req = ( + Ether(dst=dmac, src=self.pg0.remote_mac) + / IPv6(dst=dst_ip, src=self.pg0.remote_ip6) + / ICMPv6ND_NS(tgt=vip) + / ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac) + ) # Before the VR is started make sure no reply to request for VIP self.send_and_assert_no_replies(self.pg0, [ndp_req], timeout=1) # VR should start in backup state and still should not reply to NDP # send a higher priority adv to make sure it does not become master - adv = vr.vrrp_adv_packet(prio=prio+10, src_ip=self.pg0.remote_ip6) + adv = vr.vrrp_adv_packet(prio=prio + 10, src_ip=self.pg0.remote_ip6) pkts = [adv, ndp_req] vr.start_stop(is_start=1) - self.send_and_assert_no_replies(self.pg0, pkts, timeout=intvl_s) + self.send_and_assert_no_replies(self.pg0, pkts, timeout=intvl_s) vr.start_stop(is_start=0) + @unittest.skipUnless(config.extended, "part of extended tests") def test_vrrp6_election(self): - """ IPv6 Backup VR becomes master if no advertisements received """ + """IPv6 Backup VR becomes master if no advertisements received""" vr_id = 100 prio = 100 intvl = self._default_adv intvl_s = intvl * 0.01 vip = self.pg0.remote_ip6 - vr = VppVRRPVirtualRouter(self, self.pg0, vr_id, - prio=prio, intvl=intvl, - flags=self._default_flags, - vips=[vip]) + vr = VppVRRPVirtualRouter( + self, + self.pg0, + vr_id, + prio=prio, + intvl=intvl, + flags=self._default_flags, + vips=[vip], + ) self._vrs.append(vr) vr.add_vpp_config() @@ -1022,18 +1237,24 @@ class TestVRRP6(VppTestCase): self.pg0.wait_for_packet(intvl_s, is_not_adv) vr.assert_state_equals(VRRP_VR_STATE_MASTER) + @unittest.skipUnless(config.extended, "part of extended tests") def test_vrrp6_backup_preempts(self): - """ IPv6 Backup VR preempts lower priority master """ + """IPv6 Backup VR preempts lower priority master""" vr_id = 100 prio = 100 intvl = self._default_adv intvl_s = intvl * 0.01 vip = self.pg0.remote_ip6 - vr = VppVRRPVirtualRouter(self, self.pg0, vr_id, - prio=prio, intvl=intvl, - flags=self._default_flags, - vips=[vip]) + vr = VppVRRPVirtualRouter( + self, + self.pg0, + vr_id, + prio=prio, + intvl=intvl, + flags=self._default_flags, + vips=[vip], + ) self._vrs.append(vr) vr.add_vpp_config() @@ -1049,7 +1270,7 @@ class TestVRRP6(VppTestCase): # send lower prio advertisements until timer expires src_ip = self.pg0.remote_ip6 - pkts = [vr.vrrp_adv_packet(prio=prio-10, src_ip=src_ip)] + pkts = [vr.vrrp_adv_packet(prio=prio - 10, src_ip=src_ip)] while (time.time() + intvl_s) < end_time: self.send_and_assert_no_replies(self.pg0, pkts, timeout=intvl_s) self.logger.info(self.vapi.cli("show trace")) @@ -1059,8 +1280,9 @@ class TestVRRP6(VppTestCase): self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv) vr.assert_state_equals(VRRP_VR_STATE_MASTER) + @unittest.skipUnless(config.extended, "part of extended tests") def test_vrrp6_master_preempted(self): - """ IPv6 Master VR preempted by higher priority backup """ + """IPv6 Master VR preempted by higher priority backup""" # A prio 255 VR cannot be preempted so the prio has to be lower and # we have to wait for it to take over @@ -1068,10 +1290,15 @@ class TestVRRP6(VppTestCase): prio = 100 intvl = self._default_adv vip = self.pg0.remote_ip6 - vr = VppVRRPVirtualRouter(self, self.pg0, vr_id, - prio=prio, intvl=intvl, - flags=self._default_flags, - vips=[vip]) + vr = VppVRRPVirtualRouter( + self, + self.pg0, + vr_id, + prio=prio, + intvl=intvl, + flags=self._default_flags, + vips=[vip], + ) self._vrs.append(vr) vr.add_vpp_config() @@ -1095,8 +1322,9 @@ class TestVRRP6(VppTestCase): # VR should be in backup state again vr.assert_state_equals(VRRP_VR_STATE_BACKUP) + @unittest.skipUnless(config.extended, "part of extended tests") def test_vrrp6_accept_mode_disabled(self): - """ IPv6 Master VR does not reply for VIP w/ accept mode off """ + """IPv6 Master VR does not reply for VIP w/ accept mode off""" # accept mode only matters when prio < 255, so it will have to # come up as a backup and take over as master after the timeout @@ -1104,10 +1332,15 @@ class TestVRRP6(VppTestCase): prio = 100 intvl = self._default_adv vip = self.pg0.remote_hosts[4].ip6 - vr = VppVRRPVirtualRouter(self, self.pg0, vr_id, - prio=prio, intvl=intvl, - flags=self._default_flags, - vips=[vip]) + vr = VppVRRPVirtualRouter( + self, + self.pg0, + vr_id, + prio=prio, + intvl=intvl, + flags=self._default_flags, + vips=[vip], + ) self._vrs.append(vr) vr.add_vpp_config() @@ -1125,17 +1358,20 @@ class TestVRRP6(VppTestCase): vr.assert_state_equals(VRRP_VR_STATE_MASTER) # send an ICMPv6 echo to the VR virtual IP address - echo = (Ether(dst=vr.virtual_mac(), src=self.pg0.remote_mac) / - IPv6(dst=vip, src=self.pg0.remote_ip6) / - ICMPv6EchoRequest(seq=1, id=self.pg0.sw_if_index)) + echo = ( + Ether(dst=vr.virtual_mac(), src=self.pg0.remote_mac) + / IPv6(dst=vip, src=self.pg0.remote_ip6) + / ICMPv6EchoRequest(seq=1, id=self.pg0.sw_if_index) + ) self.pg_send(self.pg0, [echo]) # wait for an echo reply. none should be received time.sleep(1) self.pg0.assert_nothing_captured(filter_out_fn=is_not_echo_reply) + @unittest.skipUnless(config.extended, "part of extended tests") def test_vrrp6_accept_mode_enabled(self): - """ IPv6 Master VR replies for VIP w/ accept mode on """ + """IPv6 Master VR replies for VIP w/ accept mode on""" # A prio 255 VR cannot be preempted so the prio has to be lower and # we have to wait for it to take over @@ -1143,11 +1379,10 @@ class TestVRRP6(VppTestCase): prio = 100 intvl = self._default_adv vip = self.pg0.remote_hosts[4].ip6 - flags = (self._default_flags | VRRP_VR_FLAG_ACCEPT) - vr = VppVRRPVirtualRouter(self, self.pg0, vr_id, - prio=prio, intvl=intvl, - flags=flags, - vips=[vip]) + flags = self._default_flags | VRRP_VR_FLAG_ACCEPT + vr = VppVRRPVirtualRouter( + self, self.pg0, vr_id, prio=prio, intvl=intvl, flags=flags, vips=[vip] + ) self._vrs.append(vr) vr.add_vpp_config() @@ -1165,33 +1400,42 @@ class TestVRRP6(VppTestCase): vr.assert_state_equals(VRRP_VR_STATE_MASTER) # send an ICMP echo to the VR virtual IP address - echo = (Ether(dst=vr.virtual_mac(), src=self.pg0.remote_mac) / - IPv6(dst=vip, src=self.pg0.remote_ip6) / - ICMPv6EchoRequest(seq=1, id=self.pg0.sw_if_index)) + echo = ( + Ether(dst=vr.virtual_mac(), src=self.pg0.remote_mac) + / IPv6(dst=vip, src=self.pg0.remote_ip6) + / ICMPv6EchoRequest(seq=1, id=self.pg0.sw_if_index) + ) self.pg_send(self.pg0, [echo]) # wait for an echo reply. time.sleep(1) - rx_pkts = self.pg0.get_capture(expected_count=1, timeout=1, - filter_out_fn=is_not_echo_reply) + rx_pkts = self.pg0.get_capture( + expected_count=1, timeout=1, filter_out_fn=is_not_echo_reply + ) self.assertEqual(rx_pkts[0][IPv6].src, vip) self.assertEqual(rx_pkts[0][IPv6].dst, self.pg0.remote_ip6) self.assertEqual(rx_pkts[0][ICMPv6EchoReply].seq, 1) self.assertEqual(rx_pkts[0][ICMPv6EchoReply].id, self.pg0.sw_if_index) + @unittest.skipUnless(config.extended, "part of extended tests") def test_vrrp6_intf_tracking(self): - """ IPv6 Master VR adjusts priority based on tracked interface """ + """IPv6 Master VR adjusts priority based on tracked interface""" vr_id = 100 prio = 255 intvl = self._default_adv intvl_s = intvl * 0.01 vip = self.pg0.local_ip6 - vr = VppVRRPVirtualRouter(self, self.pg0, vr_id, - prio=prio, intvl=intvl, - flags=self._default_flags, - vips=[vip]) + vr = VppVRRPVirtualRouter( + self, + self.pg0, + vr_id, + prio=prio, + intvl=intvl, + flags=self._default_flags, + vips=[vip], + ) self._vrs.append(vr) vr.add_vpp_config() @@ -1201,9 +1445,9 @@ class TestVRRP6(VppTestCase): # add pg1 as a tracked interface and start the VR adjustment = 50 adjusted_prio = prio - adjustment - vr.add_del_tracked_interface(is_add=1, - sw_if_index=self.pg1.sw_if_index, - prio=adjustment) + vr.add_del_tracked_interface( + is_add=1, sw_if_index=self.pg1.sw_if_index, prio=adjustment + ) vr.start_stop(is_start=1) vr.assert_state_equals(VRRP_VR_STATE_MASTER) @@ -1212,52 +1456,47 @@ class TestVRRP6(VppTestCase): # tracked intf is up -> advertised priority == configured priority self.pg0.enable_capture() - rx = self.pg0.wait_for_packet(timeout=intvl_s, - filter_out_fn=is_not_adv) + rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv) self.assertEqual(rx, adv_configured) # take down pg1, verify priority is now being adjusted self.pg1.admin_down() self.pg0.enable_capture() - rx = self.pg0.wait_for_packet(timeout=intvl_s, - filter_out_fn=is_not_adv) + rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv) self.assertEqual(rx, adv_adjusted) # bring up pg1, verify priority now matches configured value self.pg1.admin_up() self.pg0.enable_capture() - rx = self.pg0.wait_for_packet(timeout=intvl_s, - filter_out_fn=is_not_adv) + rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv) self.assertEqual(rx, adv_configured) # remove IP address from pg1, verify priority now being adjusted self.pg1.unconfig_ip6() self.pg0.enable_capture() - rx = self.pg0.wait_for_packet(timeout=intvl_s, - filter_out_fn=is_not_adv) + rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv) self.assertEqual(rx, adv_adjusted) # add IP address to pg1, verify priority now matches configured value self.pg1.config_ip6() self.pg0.enable_capture() - rx = self.pg0.wait_for_packet(timeout=intvl_s, - filter_out_fn=is_not_adv) + rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv) self.assertEqual(rx, adv_configured) + @unittest.skipUnless(config.extended, "part of extended tests") def test_vrrp6_master_adv_unicast(self): - """ IPv6 Master VR advertises (unicast) """ + """IPv6 Master VR advertises (unicast)""" vr_id = 100 prio = 255 intvl = self._default_adv intvl_s = intvl * 0.01 vip = self.pg0.local_ip6 - flags = (self._default_flags | VRRP_VR_FLAG_UNICAST) + flags = self._default_flags | VRRP_VR_FLAG_UNICAST unicast_peer = self.pg0.remote_hosts[4] - vr = VppVRRPVirtualRouter(self, self.pg0, vr_id, - prio=prio, intvl=intvl, - flags=flags, - vips=[vip]) + vr = VppVRRPVirtualRouter( + self, self.pg0, vr_id, prio=prio, intvl=intvl, flags=flags, vips=[vip] + ) self._vrs.append(vr) vr.add_vpp_config() vr.set_unicast_peers([unicast_peer.ip6]) @@ -1270,23 +1509,22 @@ class TestVRRP6(VppTestCase): vr.assert_state_equals(VRRP_VR_STATE_MASTER) self.pg0.enable_capture() - rx = self.pg0.wait_for_packet(timeout=intvl_s, - filter_out_fn=is_not_adv) + rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv) self.assertTrue(rx.haslayer(Ether)) self.assertTrue(rx.haslayer(IPv6)) self.assertTrue(rx.haslayer(VRRPv3)) self.assertEqual(rx[Ether].src, self.pg0.local_mac) self.assertEqual(rx[Ether].dst, unicast_peer.mac) - self.assertEqual(ip6_normalize(rx[IPv6].src), - ip6_normalize(self.pg0.local_ip6_ll)) - self.assertEqual(ip6_normalize(rx[IPv6].dst), - ip6_normalize(unicast_peer.ip6)) + self.assertEqual( + ip6_normalize(rx[IPv6].src), ip6_normalize(self.pg0.local_ip6_ll) + ) + self.assertEqual(ip6_normalize(rx[IPv6].dst), ip6_normalize(unicast_peer.ip6)) self.assertEqual(rx[VRRPv3].vrid, vr_id) self.assertEqual(rx[VRRPv3].priority, prio) self.assertEqual(rx[VRRPv3].ipcount, 1) self.assertEqual(rx[VRRPv3].addrlist, [vip]) -if __name__ == '__main__': +if __name__ == "__main__": unittest.main(testRunner=VppTestRunner)