VRRP_VR_STATE_MASTER = 2
VRRP_VR_STATE_INTF_DOWN = 3
+VRRP_INDEX_INVALID = 0xffffffff
+
def is_non_arp(p):
""" Want to filter out advertisements, igmp, etc"""
self._adv_dest_ip = "224.0.0.18"
self._vips = ([intf.local_ip4] if vips is None else vips)
self._tracked_ifs = []
+ self._vrrp_index = VRRP_INDEX_INVALID
def add_vpp_config(self):
self._test.vapi.vrrp_vr_add_del(is_add=1,
n_addrs=len(self._vips),
addrs=self._vips)
+ def update_vpp_config(self):
+ r = self._test.vapi.vrrp_vr_update(vrrp_index=self._vrrp_index,
+ sw_if_index=self._intf.sw_if_index,
+ vr_id=self._vr_id,
+ priority=self._prio,
+ interval=self._intvl,
+ flags=self._flags,
+ n_addrs=len(self._vips),
+ addrs=self._vips)
+ self._vrrp_index = r.vrrp_index
+
+ def delete_vpp_config(self):
+ self._test.vapi.vrrp_vr_del(vrrp_index=self._vrrp_index)
+
def query_vpp_config(self):
vrs = self._test.vapi.vrrp_vr_dump(sw_if_index=self._intf.sw_if_index)
for vr in vrs:
return pkt
-@unittest.skipUnless(config.extended, "part of extended tests")
class TestVRRP4(VppTestCase):
""" IPv4 VRRP Test Case """
# VR with priority 255 owns the virtual address and should
# become master and start advertising immediately.
+ @unittest.skipUnless(config.extended, "part of extended tests")
def test_vrrp4_master_adv(self):
""" IPv4 Master VR advertises """
self.pg_enable_capture(self.pg_interfaces)
vr.remove_vpp_config()
self._vrs = []
+ # Same as above but with the update API, and add a change
+ # of parameters to test that too
+ @unittest.skipUnless(config.extended, "part of extended tests")
+ def test_vrrp4_master_adv_update(self):
+ """ IPv4 Master VR adv + Update to Backup """
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ prio = 255
+ intvl = self._default_adv
+ vr = VppVRRPVirtualRouter(self, self.pg0, 100,
+ prio=prio, intvl=intvl,
+ flags=self._default_flags)
+
+ vr.update_vpp_config()
+ vr.start_stop(is_start=1)
+ self.logger.info(self.vapi.cli("show vrrp vr"))
+ # Update VR with lower prio and larger interval
+ # we need to keep old VR for the adv checks
+ upd_vr = VppVRRPVirtualRouter(self, self.pg0, 100,
+ prio=100, intvl=2*intvl,
+ flags=self._default_flags,
+ vips=[self.pg0.remote_ip4])
+ upd_vr._vrrp_index = vr._vrrp_index
+ upd_vr.update_vpp_config()
+ start_time = time.time()
+ self.logger.info(self.vapi.cli("show vrrp vr"))
+ upd_vr.assert_state_equals(VRRP_VR_STATE_BACKUP)
+ self._vrs = [upd_vr]
+
+ pkts = self.pg0.get_capture(5)
+ # Init -> Master: IGMP Join, VRRP adv, gratuitous ARP are sent
+ self.verify_vrrp4_igmp(pkts[0])
+ self.verify_vrrp4_adv(pkts[1], vr, prio=prio)
+ self.verify_vrrp4_garp(pkts[2], vr.virtual_ips()[0], vr.virtual_mac())
+ # Master -> Init: Adv with priority 0 sent to force an election
+ self.verify_vrrp4_adv(pkts[3], vr, prio=0)
+ # Init -> Backup: An IGMP join should be sent
+ self.verify_vrrp4_igmp(pkts[4])
+
+ # send higher prio advertisements, should not receive any
+ end_time = start_time + 2 * upd_vr.master_down_seconds()
+ src_ip = self.pg0.remote_ip4
+ pkts = [upd_vr.vrrp_adv_packet(prio=110, src_ip=src_ip)]
+ while time.time() < end_time:
+ self.send_and_assert_no_replies(self.pg0, pkts, timeout=intvl*0.01)
+ self.logger.info(self.vapi.cli("show trace"))
+
+ upd_vr.start_stop(is_start=0)
+ self.logger.info(self.vapi.cli("show vrrp vr"))
+
# VR with priority < 255 enters backup state and does not advertise as
# long as it receives higher priority advertisements
+ @unittest.skipUnless(config.extended, "part of extended tests")
def test_vrrp4_backup_noadv(self):
""" IPv4 Backup VR does not advertise """
self.pg_enable_capture(self.pg_interfaces)
vr.remove_vpp_config()
self._vrs = []
+ @unittest.skipUnless(config.extended, "part of extended tests")
def test_vrrp4_backup_noarp(self):
""" IPv4 Backup VR ignores ARP """
# We need an address for a virtual IP that is not the IP that
vr.remove_vpp_config()
self._vrs = []
+ @unittest.skipUnless(config.extended, "part of extended tests")
def test_vrrp4_election(self):
""" IPv4 Backup VR becomes master if no advertisements received """
self.pg0.wait_for_packet(intvl_s, is_not_adv)
vr.assert_state_equals(VRRP_VR_STATE_MASTER)
+ @unittest.skipUnless(config.extended, "part of extended tests")
def test_vrrp4_backup_preempts(self):
""" IPv4 Backup VR preempts lower priority master """
self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv)
vr.assert_state_equals(VRRP_VR_STATE_MASTER)
+ @unittest.skipUnless(config.extended, "part of extended tests")
def test_vrrp4_master_preempted(self):
""" IPv4 Master VR preempted by higher priority backup """
# VR should be in backup state again
vr.assert_state_equals(VRRP_VR_STATE_BACKUP)
+ @unittest.skipUnless(config.extended, "part of extended tests")
def test_vrrp4_accept_mode_disabled(self):
""" IPv4 Master VR does not reply for VIP w/ accept mode off """
time.sleep(1)
self.pg0.assert_nothing_captured(filter_out_fn=is_not_echo_reply)
+ @unittest.skipUnless(config.extended, "part of extended tests")
def test_vrrp4_accept_mode_enabled(self):
""" IPv4 Master VR replies for VIP w/ accept mode on """
self.assertEqual(rx_pkts[0][ICMP].seq, 1)
self.assertEqual(rx_pkts[0][ICMP].id, self.pg0.sw_if_index)
+ @unittest.skipUnless(config.extended, "part of extended tests")
def test_vrrp4_intf_tracking(self):
""" IPv4 Master VR adjusts priority based on tracked interface """
filter_out_fn=is_not_adv)
self.assertEqual(rx, adv_configured)
+ @unittest.skipUnless(config.extended, "part of extended tests")
def test_vrrp4_master_adv_unicast(self):
""" IPv4 Master VR advertises (unicast) """
self.assertEqual(rx[VRRPv3].addrlist, [vip])
-@unittest.skipUnless(config.extended, "part of extended tests")
class TestVRRP6(VppTestCase):
""" IPv6 VRRP Test Case """
# VR with priority 255 owns the virtual address and should
# become master and start advertising immediately.
+ @unittest.skipUnless(config.extended, "part of extended tests")
def test_vrrp6_master_adv(self):
""" IPv6 Master VR advertises """
self.pg_enable_capture(self.pg_interfaces)
vr.remove_vpp_config()
self._vrs = []
+ # Same as above but with the update API, and add a change
+ # of parameters to test that too
+ @unittest.skipUnless(config.extended, "part of extended tests")
+ def test_vrrp6_master_adv_update(self):
+ """ IPv6 Master VR adv + Update to Backup """
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ prio = 255
+ intvl = self._default_adv
+ vr = VppVRRPVirtualRouter(self, self.pg0, 100,
+ prio=prio, intvl=intvl,
+ flags=self._default_flags)
+
+ vr.update_vpp_config()
+ vr.start_stop(is_start=1)
+ self.logger.info(self.vapi.cli("show vrrp vr"))
+ # Update VR with lower prio and larger interval
+ # we need to keep old VR for the adv checks
+ upd_vr = VppVRRPVirtualRouter(self, self.pg0, 100,
+ prio=100, intvl=2*intvl,
+ flags=self._default_flags,
+ vips=[self.pg0.remote_ip6])
+ upd_vr._vrrp_index = vr._vrrp_index
+ upd_vr.update_vpp_config()
+ start_time = time.time()
+ self.logger.info(self.vapi.cli("show vrrp vr"))
+ upd_vr.assert_state_equals(VRRP_VR_STATE_BACKUP)
+ self._vrs = [upd_vr]
+
+ pkts = self.pg0.get_capture(5, filter_out_fn=None)
+
+ # Init -> Master: Multicast group Join, VRRP adv, gratuitous NAs sent
+ self.verify_vrrp6_mlr(pkts[0], vr)
+ self.verify_vrrp6_adv(pkts[1], vr, prio=prio)
+ self.verify_vrrp6_gna(pkts[2], vr)
+ # Master -> Init: Adv with priority 0 sent to force an election
+ self.verify_vrrp6_adv(pkts[3], vr, prio=0)
+ # Init -> Backup: A multicast listener report should be sent
+ # not actually verified in the test below, where I took this from
+
+ # send higher prio advertisements, should not see VPP send any
+ src_ip = self.pg0.remote_ip6_ll
+ pkts = [upd_vr.vrrp_adv_packet(prio=110, src_ip=src_ip)]
+ self.logger.info(self.vapi.cli("show vlib graph"))
+ end_time = start_time + 2 * upd_vr.master_down_seconds()
+ while time.time() < end_time:
+ self.send_and_assert_no_replies(
+ self.pg0, pkts, timeout=0.01*upd_vr._intvl)
+ self.logger.info(self.vapi.cli("show trace"))
+
+ vr.start_stop(is_start=0)
+ self.logger.info(self.vapi.cli("show vrrp vr"))
+
# VR with priority < 255 enters backup state and does not advertise as
# long as it receives higher priority advertisements
+ @unittest.skipUnless(config.extended, "part of extended tests")
def test_vrrp6_backup_noadv(self):
""" IPv6 Backup VR does not advertise """
self.pg_enable_capture(self.pg_interfaces)
vr.remove_vpp_config()
self._vrs = []
+ @unittest.skipUnless(config.extended, "part of extended tests")
def test_vrrp6_backup_nond(self):
""" IPv6 Backup VR ignores NDP """
# We need an address for a virtual IP that is not the IP that
vr.start_stop(is_start=0)
+ @unittest.skipUnless(config.extended, "part of extended tests")
def test_vrrp6_election(self):
""" IPv6 Backup VR becomes master if no advertisements received """
self.pg0.wait_for_packet(intvl_s, is_not_adv)
vr.assert_state_equals(VRRP_VR_STATE_MASTER)
+ @unittest.skipUnless(config.extended, "part of extended tests")
def test_vrrp6_backup_preempts(self):
""" IPv6 Backup VR preempts lower priority master """
self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv)
vr.assert_state_equals(VRRP_VR_STATE_MASTER)
+ @unittest.skipUnless(config.extended, "part of extended tests")
def test_vrrp6_master_preempted(self):
""" IPv6 Master VR preempted by higher priority backup """
# VR should be in backup state again
vr.assert_state_equals(VRRP_VR_STATE_BACKUP)
+ @unittest.skipUnless(config.extended, "part of extended tests")
def test_vrrp6_accept_mode_disabled(self):
""" IPv6 Master VR does not reply for VIP w/ accept mode off """
time.sleep(1)
self.pg0.assert_nothing_captured(filter_out_fn=is_not_echo_reply)
+ @unittest.skipUnless(config.extended, "part of extended tests")
def test_vrrp6_accept_mode_enabled(self):
""" IPv6 Master VR replies for VIP w/ accept mode on """
self.assertEqual(rx_pkts[0][ICMPv6EchoReply].seq, 1)
self.assertEqual(rx_pkts[0][ICMPv6EchoReply].id, self.pg0.sw_if_index)
+ @unittest.skipUnless(config.extended, "part of extended tests")
def test_vrrp6_intf_tracking(self):
""" IPv6 Master VR adjusts priority based on tracked interface """
filter_out_fn=is_not_adv)
self.assertEqual(rx, adv_configured)
+ @unittest.skipUnless(config.extended, "part of extended tests")
def test_vrrp6_master_adv_unicast(self):
""" IPv6 Master VR advertises (unicast) """