4 # Copyright 2019-2020 Rubicon Communications, LLC (Netgate)
6 # SPDX-License-Identifier: Apache-2.0
12 from socket import inet_pton, inet_ntop
14 from vpp_object import VppObject
15 from vpp_papi import VppEnum
17 from scapy.packet import raw
18 from scapy.layers.l2 import Ether, ARP
19 from scapy.layers.inet import IP, ICMP, icmptypes
20 from scapy.layers.inet6 import (
32 from scapy.contrib.igmpv3 import IGMPv3, IGMPv3mr, IGMPv3gr
33 from scapy.layers.vrrp import IPPROTO_VRRP, VRRPv3
34 from scapy.utils6 import in6_getnsma, in6_getnsmac
35 from config import config
36 from framework import VppTestCase, VppTestRunner
37 from util import ip6_normalize
39 VRRP_VR_FLAG_PREEMPT = 1
40 VRRP_VR_FLAG_ACCEPT = 2
41 VRRP_VR_FLAG_UNICAST = 4
44 VRRP_VR_STATE_INIT = 0
45 VRRP_VR_STATE_BACKUP = 1
46 VRRP_VR_STATE_MASTER = 2
47 VRRP_VR_STATE_INTF_DOWN = 3
49 VRRP_INDEX_INVALID = 0xFFFFFFFF
53 """Want to filter out advertisements, igmp, etc"""
61 """Filter out everything but advertisements. E.g. multicast RD/ND"""
62 if p.haslayer(VRRPv3):
68 def is_not_echo_reply(p):
69 """filter out advertisements and other while waiting for echo reply"""
70 if p.haslayer(IP) and p.haslayer(ICMP):
71 if icmptypes[p[ICMP].type] == "echo-reply":
73 elif p.haslayer(IPv6) and p.haslayer(ICMPv6EchoReply):
79 class VppVRRPVirtualRouter(VppObject):
87 flags=VRRP_VR_FLAG_PREEMPT,
92 self._sw_if_index = self._intf.sw_if_index
97 if flags & VRRP_VR_FLAG_IPV6:
99 self._adv_dest_mac = "33:33:00:00:00:12"
100 self._virtual_mac = "00:00:5e:00:02:%02x" % vr_id
101 self._adv_dest_ip = "ff02::12"
102 self._vips = [intf.local_ip6] if vips is None else vips
105 self._adv_dest_mac = "01:00:5e:00:00:12"
106 self._virtual_mac = "00:00:5e:00:01:%02x" % vr_id
107 self._adv_dest_ip = "224.0.0.18"
108 self._vips = [intf.local_ip4] if vips is None else vips
109 self._tracked_ifs = []
110 self._vrrp_index = VRRP_INDEX_INVALID
112 def add_vpp_config(self):
113 self._test.vapi.vrrp_vr_add_del(
115 sw_if_index=self._intf.sw_if_index,
118 interval=self._intvl,
120 n_addrs=len(self._vips),
124 def update_vpp_config(self):
125 r = self._test.vapi.vrrp_vr_update(
126 vrrp_index=self._vrrp_index,
127 sw_if_index=self._intf.sw_if_index,
130 interval=self._intvl,
132 n_addrs=len(self._vips),
135 self._vrrp_index = r.vrrp_index
137 def delete_vpp_config(self):
138 self._test.vapi.vrrp_vr_del(vrrp_index=self._vrrp_index)
140 def query_vpp_config(self):
141 vrs = self._test.vapi.vrrp_vr_dump(sw_if_index=self._intf.sw_if_index)
143 if vr.config.vr_id != self._vr_id:
146 is_ipv6 = 1 if (vr.config.flags & VRRP_VR_FLAG_IPV6) else 0
147 if is_ipv6 != self._is_ipv6:
154 def remove_vpp_config(self):
155 self._test.vapi.vrrp_vr_add_del(
157 sw_if_index=self._intf.sw_if_index,
160 interval=self._intvl,
162 n_addrs=len(self._vips),
166 def start_stop(self, is_start):
167 self._test.vapi.vrrp_vr_start_stop(
169 sw_if_index=self._intf.sw_if_index,
171 is_ipv6=self._is_ipv6,
173 self._start_time = time.time() if is_start else None
175 def add_del_tracked_interface(self, is_add, sw_if_index, prio):
177 "sw_if_index": self._intf.sw_if_index,
178 "is_ipv6": self._is_ipv6,
179 "vr_id": self._vr_id,
182 "ifs": [{"sw_if_index": sw_if_index, "priority": prio}],
184 self._test.vapi.vrrp_vr_track_if_add_del(**args)
185 self._tracked_ifs.append(args["ifs"][0])
187 def set_unicast_peers(self, addrs):
189 "sw_if_index": self._intf.sw_if_index,
190 "is_ipv6": self._is_ipv6,
191 "vr_id": self._vr_id,
192 "n_addrs": len(addrs),
195 self._test.vapi.vrrp_vr_set_peers(**args)
196 self._unicast_peers = addrs
198 def start_time(self):
199 return self._start_time
201 def virtual_mac(self):
202 return self._virtual_mac
204 def virtual_ips(self):
207 def adv_dest_mac(self):
208 return self._adv_dest_mac
210 def adv_dest_ip(self):
211 return self._adv_dest_ip
219 def adv_interval(self):
225 def assert_state_equals(self, state):
226 vr_details = self.query_vpp_config()
227 self._test.assertEqual(vr_details.runtime.state, state)
229 def master_down_seconds(self):
230 vr_details = self.query_vpp_config()
231 return vr_details.runtime.master_down_int * 0.01
233 def vrrp_adv_packet(self, prio=None, src_ip=None):
234 dst_ip = self._adv_dest_ip
237 eth = Ether(dst=self._adv_dest_mac, src=self._virtual_mac)
239 vrid=self._vr_id, priority=prio, ipcount=len(self._vips), adv=self._intvl
242 src_ip = self._intf.local_ip6_ll if src_ip is None else src_ip
243 ip = IPv6(src=src_ip, dst=dst_ip, nh=IPPROTO_VRRP, hlim=255)
244 vrrp.addrlist = self._vips
246 src_ip = self._intf.local_ip4 if src_ip is None else src_ip
247 ip = IP(src=src_ip, dst=dst_ip, proto=IPPROTO_VRRP, ttl=255, id=0)
248 vrrp.addrlist = self._vips
250 # Fill in default values & checksums
251 pkt = Ether(raw(eth / ip / vrrp))
255 class TestVRRP4(VppTestCase):
256 """IPv4 VRRP Test Case"""
260 super(TestVRRP4, cls).setUpClass()
263 def tearDownClass(cls):
264 super(TestVRRP4, cls).tearDownClass()
267 super(TestVRRP4, self).setUp()
269 self.create_pg_interfaces(range(2))
271 for i in self.pg_interfaces:
274 i.generate_remote_hosts(5)
275 i.configure_ipv4_neighbors()
278 self._default_flags = VRRP_VR_FLAG_PREEMPT
279 self._default_adv = 100
284 vr_api = vr.query_vpp_config()
285 if vr_api.runtime.state != VRRP_VR_STATE_INIT:
286 vr.start_stop(is_start=0)
287 vr.remove_vpp_config()
289 self.logger.error("Error cleaning up")
291 for i in self.pg_interfaces:
298 super(TestVRRP4, self).tearDown()
300 def verify_vrrp4_igmp(self, pkt):
302 self.assertEqual(ip.dst, "224.0.0.22")
303 self.assertEqual(ip.proto, 2)
306 self.assertEqual(IGMPv3.igmpv3types[igmp.type], "Version 3 Membership Report")
308 igmpmr = pkt[IGMPv3mr]
309 self.assertEqual(igmpmr.numgrp, 1)
310 self.assertEqual(igmpmr.records[0].maddr, "224.0.0.18")
312 def verify_vrrp4_garp(self, pkt, vip, vmac):
315 # ARP "who-has" op == 1
316 self.assertEqual(arp.op, 1)
317 self.assertEqual(arp.pdst, arp.psrc)
318 self.assertEqual(arp.pdst, vip)
319 self.assertEqual(arp.hwsrc, vmac)
321 def verify_vrrp4_adv(self, rx_pkt, vr, prio=None):
322 vips = vr.virtual_ips()
325 vrrp = rx_pkt[VRRPv3]
327 pkt = vr.vrrp_adv_packet(prio=prio)
329 # Source MAC is virtual MAC, destination is multicast MAC
330 self.assertEqual(eth.src, vr.virtual_mac())
331 self.assertEqual(eth.dst, vr.adv_dest_mac())
333 self.assertEqual(ip.dst, "224.0.0.18")
334 self.assertEqual(ip.ttl, 255)
335 self.assertEqual(ip.proto, IPPROTO_VRRP)
337 self.assertEqual(vrrp.version, 3)
338 self.assertEqual(vrrp.type, 1)
339 self.assertEqual(vrrp.vrid, vr.vr_id())
342 self.assertEqual(vrrp.priority, prio)
343 self.assertEqual(vrrp.ipcount, len(vips))
344 self.assertEqual(vrrp.adv, vr.adv_interval())
345 self.assertListEqual(vrrp.addrlist, vips)
347 # VR with priority 255 owns the virtual address and should
348 # become master and start advertising immediately.
349 @unittest.skipUnless(config.extended, "part of extended tests")
350 def test_vrrp4_master_adv(self):
351 """IPv4 Master VR advertises"""
352 self.pg_enable_capture(self.pg_interfaces)
356 intvl = self._default_adv
357 vr = VppVRRPVirtualRouter(
358 self, self.pg0, 100, prio=prio, intvl=intvl, flags=self._default_flags
362 vr.start_stop(is_start=1)
363 self.logger.info(self.vapi.cli("show vrrp vr"))
364 vr.start_stop(is_start=0)
365 self.logger.info(self.vapi.cli("show vrrp vr"))
367 pkts = self.pg0.get_capture(4)
369 # Init -> Master: IGMP Join, VRRP adv, gratuitous ARP are sent
370 self.verify_vrrp4_igmp(pkts[0])
371 self.verify_vrrp4_adv(pkts[1], vr, prio=prio)
372 self.verify_vrrp4_garp(pkts[2], vr.virtual_ips()[0], vr.virtual_mac())
373 # Master -> Init: Adv with priority 0 sent to force an election
374 self.verify_vrrp4_adv(pkts[3], vr, prio=0)
376 vr.remove_vpp_config()
379 # Same as above but with the update API, and add a change
380 # of parameters to test that too
381 @unittest.skipUnless(config.extended, "part of extended tests")
382 def test_vrrp4_master_adv_update(self):
383 """IPv4 Master VR adv + Update to Backup"""
384 self.pg_enable_capture(self.pg_interfaces)
388 intvl = self._default_adv
389 vr = VppVRRPVirtualRouter(
390 self, self.pg0, 100, prio=prio, intvl=intvl, flags=self._default_flags
393 vr.update_vpp_config()
394 vr.start_stop(is_start=1)
395 self.logger.info(self.vapi.cli("show vrrp vr"))
396 # Update VR with lower prio and larger interval
397 # we need to keep old VR for the adv checks
398 upd_vr = VppVRRPVirtualRouter(
404 flags=self._default_flags,
405 vips=[self.pg0.remote_ip4],
407 upd_vr._vrrp_index = vr._vrrp_index
408 upd_vr.update_vpp_config()
409 start_time = time.time()
410 self.logger.info(self.vapi.cli("show vrrp vr"))
411 upd_vr.assert_state_equals(VRRP_VR_STATE_BACKUP)
414 pkts = self.pg0.get_capture(5)
415 # Init -> Master: IGMP Join, VRRP adv, gratuitous ARP are sent
416 self.verify_vrrp4_igmp(pkts[0])
417 self.verify_vrrp4_adv(pkts[1], vr, prio=prio)
418 self.verify_vrrp4_garp(pkts[2], vr.virtual_ips()[0], vr.virtual_mac())
419 # Master -> Init: Adv with priority 0 sent to force an election
420 self.verify_vrrp4_adv(pkts[3], vr, prio=0)
421 # Init -> Backup: An IGMP join should be sent
422 self.verify_vrrp4_igmp(pkts[4])
424 # send higher prio advertisements, should not receive any
425 end_time = start_time + 2 * upd_vr.master_down_seconds()
426 src_ip = self.pg0.remote_ip4
427 pkts = [upd_vr.vrrp_adv_packet(prio=110, src_ip=src_ip)]
428 while time.time() < end_time:
429 self.send_and_assert_no_replies(self.pg0, pkts, timeout=intvl * 0.01)
430 self.logger.info(self.vapi.cli("show trace"))
432 upd_vr.start_stop(is_start=0)
433 self.logger.info(self.vapi.cli("show vrrp vr"))
435 # VR with priority < 255 enters backup state and does not advertise as
436 # long as it receives higher priority advertisements
437 @unittest.skipUnless(config.extended, "part of extended tests")
438 def test_vrrp4_backup_noadv(self):
439 """IPv4 Backup VR does not advertise"""
440 self.pg_enable_capture(self.pg_interfaces)
445 intvl = self._default_adv
446 intvl_s = intvl * 0.01
447 vr = VppVRRPVirtualRouter(
453 flags=self._default_flags,
454 vips=[self.pg0.remote_ip4],
459 vr.start_stop(is_start=1)
461 vr.assert_state_equals(VRRP_VR_STATE_BACKUP)
462 # watch for advertisements for 2x the master down preemption timeout
463 end_time = vr.start_time() + 2 * vr.master_down_seconds()
465 # Init -> Backup: An IGMP join should be sent
466 pkts = self.pg0.get_capture(1)
467 self.verify_vrrp4_igmp(pkts[0])
469 # send higher prio advertisements, should not receive any
470 src_ip = self.pg0.remote_ip4
471 pkts = [vr.vrrp_adv_packet(prio=prio + 10, src_ip=src_ip)]
472 while time.time() < end_time:
473 self.send_and_assert_no_replies(self.pg0, pkts, timeout=intvl_s)
474 self.logger.info(self.vapi.cli("show trace"))
476 vr.start_stop(is_start=0)
477 self.logger.info(self.vapi.cli("show vrrp vr"))
478 vr.remove_vpp_config()
481 def test_vrrp4_master_arp(self):
482 """IPv4 Master VR replies to ARP"""
485 # VR virtual IP is the default, which is the pg local IP
488 intvl = self._default_adv
489 vr = VppVRRPVirtualRouter(
490 self, self.pg0, 100, prio=prio, intvl=intvl, flags=self._default_flags
496 # before the VR is up, ARP should resolve to interface MAC
497 self.pg0.resolve_arp()
498 self.assertNotEqual(self.pg0.local_mac, vr.virtual_mac())
500 # start the VR, ARP should now resolve to virtual MAC
501 vr.start_stop(is_start=1)
502 self.pg0.resolve_arp()
503 self.assertEqual(self.pg0.local_mac, vr.virtual_mac())
505 # stop the VR, ARP should resolve to interface MAC again
506 vr.start_stop(is_start=0)
507 self.pg0.resolve_arp()
508 self.assertNotEqual(self.pg0.local_mac, vr.virtual_mac())
510 vr.remove_vpp_config()
513 @unittest.skipUnless(config.extended, "part of extended tests")
514 def test_vrrp4_backup_noarp(self):
515 """IPv4 Backup VR ignores ARP"""
516 # We need an address for a virtual IP that is not the IP that
517 # ARP requests will originate from
521 intvl = self._default_adv
522 vip = self.pg0.remote_hosts[1].ip4
523 vr = VppVRRPVirtualRouter(
529 flags=self._default_flags,
535 arp_req = Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg0.remote_mac) / ARP(
538 psrc=self.pg0.remote_ip4,
539 hwsrc=self.pg0.remote_mac,
542 # Before the VR is started make sure no reply to request for VIP
544 self.pg_enable_capture(self.pg_interfaces)
545 self.send_and_assert_no_replies(self.pg0, [arp_req], timeout=1)
547 # VR should start in backup state and still should not reply to ARP
548 # send a higher priority adv to make sure it does not become master
549 adv = vr.vrrp_adv_packet(prio=prio + 10, src_ip=self.pg0.remote_ip4)
550 vr.start_stop(is_start=1)
551 self.send_and_assert_no_replies(self.pg0, [adv, arp_req], timeout=1)
553 vr.start_stop(is_start=0)
554 vr.remove_vpp_config()
557 @unittest.skipUnless(config.extended, "part of extended tests")
558 def test_vrrp4_election(self):
559 """IPv4 Backup VR becomes master if no advertisements received"""
563 intvl = self._default_adv
564 intvl_s = intvl * 0.01
565 vip = self.pg0.remote_ip4
566 vr = VppVRRPVirtualRouter(
572 flags=self._default_flags,
578 # After adding the VR, it should be in the init state
579 vr.assert_state_equals(VRRP_VR_STATE_INIT)
582 vr.start_stop(is_start=1)
584 # VR should be in backup state after starting
585 vr.assert_state_equals(VRRP_VR_STATE_BACKUP)
586 end_time = vr.start_time() + vr.master_down_seconds()
588 # should not receive adverts until timer expires & state transition
589 self.pg_enable_capture(self.pg_interfaces)
590 while (time.time() + intvl_s) < end_time:
592 self.pg0.assert_nothing_captured(filter_out_fn=is_not_adv)
594 # VR should be in master state, should send an adv
595 self.pg0.enable_capture()
596 self.pg0.wait_for_packet(intvl_s, is_not_adv)
597 vr.assert_state_equals(VRRP_VR_STATE_MASTER)
599 @unittest.skipUnless(config.extended, "part of extended tests")
600 def test_vrrp4_backup_preempts(self):
601 """IPv4 Backup VR preempts lower priority master"""
605 intvl = self._default_adv
606 intvl_s = intvl * 0.01
607 vip = self.pg0.remote_ip4
608 vr = VppVRRPVirtualRouter(
614 flags=self._default_flags,
620 # After adding the VR, it should be in the init state
621 vr.assert_state_equals(VRRP_VR_STATE_INIT)
624 vr.start_stop(is_start=1)
626 # VR should be in backup state after starting
627 vr.assert_state_equals(VRRP_VR_STATE_BACKUP)
628 end_time = vr.start_time() + vr.master_down_seconds()
630 # send lower prio advertisements until timer expires
631 src_ip = self.pg0.remote_ip4
632 pkts = [vr.vrrp_adv_packet(prio=prio - 10, src_ip=src_ip)]
633 while time.time() + intvl_s < end_time:
634 self.send_and_assert_no_replies(self.pg0, pkts, timeout=intvl_s)
635 self.logger.info(self.vapi.cli("show trace"))
637 # when timer expires, VR should take over as master
638 self.pg0.enable_capture()
639 self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv)
640 vr.assert_state_equals(VRRP_VR_STATE_MASTER)
642 @unittest.skipUnless(config.extended, "part of extended tests")
643 def test_vrrp4_master_preempted(self):
644 """IPv4 Master VR preempted by higher priority backup"""
646 # A prio 255 VR cannot be preempted so the prio has to be lower and
647 # we have to wait for it to take over
650 intvl = self._default_adv
651 vip = self.pg0.remote_ip4
652 vr = VppVRRPVirtualRouter(
658 flags=self._default_flags,
664 # After adding the VR, it should be in the init state
665 vr.assert_state_equals(VRRP_VR_STATE_INIT)
668 vr.start_stop(is_start=1)
669 vr.assert_state_equals(VRRP_VR_STATE_BACKUP)
671 # wait for VR to take over as master
672 end_time = vr.start_time() + vr.master_down_seconds()
673 sleep_s = end_time - time.time()
675 vr.assert_state_equals(VRRP_VR_STATE_MASTER)
677 # Build advertisement packet and send it
678 pkts = [vr.vrrp_adv_packet(prio=255, src_ip=self.pg0.remote_ip4)]
679 self.pg_send(self.pg0, pkts)
681 # VR should be in backup state again
682 vr.assert_state_equals(VRRP_VR_STATE_BACKUP)
684 @unittest.skipUnless(config.extended, "part of extended tests")
685 def test_vrrp4_accept_mode_disabled(self):
686 """IPv4 Master VR does not reply for VIP w/ accept mode off"""
688 # accept mode only matters when prio < 255, so it will have to
689 # come up as a backup and take over as master after the timeout
692 intvl = self._default_adv
693 vip = self.pg0.remote_hosts[4].ip4
694 vr = VppVRRPVirtualRouter(
700 flags=self._default_flags,
706 # After adding the VR, it should be in the init state
707 vr.assert_state_equals(VRRP_VR_STATE_INIT)
710 vr.start_stop(is_start=1)
711 vr.assert_state_equals(VRRP_VR_STATE_BACKUP)
713 # wait for VR to take over as master
714 end_time = vr.start_time() + vr.master_down_seconds()
715 sleep_s = end_time - time.time()
717 vr.assert_state_equals(VRRP_VR_STATE_MASTER)
719 # send an ICMP echo to the VR virtual IP address
721 Ether(dst=vr.virtual_mac(), src=self.pg0.remote_mac)
722 / IP(dst=vip, src=self.pg0.remote_ip4)
723 / ICMP(seq=1, id=self.pg0.sw_if_index, type="echo-request")
725 self.pg_send(self.pg0, [echo])
727 # wait for an echo reply. none should be received
729 self.pg0.assert_nothing_captured(filter_out_fn=is_not_echo_reply)
731 @unittest.skipUnless(config.extended, "part of extended tests")
732 def test_vrrp4_accept_mode_enabled(self):
733 """IPv4 Master VR replies for VIP w/ accept mode on"""
735 # A prio 255 VR cannot be preempted so the prio has to be lower and
736 # we have to wait for it to take over
739 intvl = self._default_adv
740 vip = self.pg0.remote_hosts[4].ip4
741 flags = VRRP_VR_FLAG_PREEMPT | VRRP_VR_FLAG_ACCEPT
742 vr = VppVRRPVirtualRouter(
743 self, self.pg0, vr_id, prio=prio, intvl=intvl, flags=flags, vips=[vip]
748 # After adding the VR, it should be in the init state
749 vr.assert_state_equals(VRRP_VR_STATE_INIT)
752 vr.start_stop(is_start=1)
753 vr.assert_state_equals(VRRP_VR_STATE_BACKUP)
755 # wait for VR to take over as master
756 end_time = vr.start_time() + vr.master_down_seconds()
757 sleep_s = end_time - time.time()
759 vr.assert_state_equals(VRRP_VR_STATE_MASTER)
761 # send an ICMP echo to the VR virtual IP address
763 Ether(dst=vr.virtual_mac(), src=self.pg0.remote_mac)
764 / IP(dst=vip, src=self.pg0.remote_ip4)
765 / ICMP(seq=1, id=self.pg0.sw_if_index, type="echo-request")
767 self.pg_send(self.pg0, [echo])
769 # wait for an echo reply.
771 rx_pkts = self.pg0.get_capture(
772 expected_count=1, timeout=1, filter_out_fn=is_not_echo_reply
775 self.assertEqual(rx_pkts[0][IP].src, vip)
776 self.assertEqual(rx_pkts[0][IP].dst, self.pg0.remote_ip4)
777 self.assertEqual(icmptypes[rx_pkts[0][ICMP].type], "echo-reply")
778 self.assertEqual(rx_pkts[0][ICMP].seq, 1)
779 self.assertEqual(rx_pkts[0][ICMP].id, self.pg0.sw_if_index)
781 @unittest.skipUnless(config.extended, "part of extended tests")
782 def test_vrrp4_intf_tracking(self):
783 """IPv4 Master VR adjusts priority based on tracked interface"""
787 intvl = self._default_adv
788 intvl_s = intvl * 0.01
789 vip = self.pg0.local_ip4
790 vr = VppVRRPVirtualRouter(
796 flags=self._default_flags,
802 # After adding the VR, it should be in the init state
803 vr.assert_state_equals(VRRP_VR_STATE_INIT)
805 # add pg1 as a tracked interface and start the VR
807 adjusted_prio = prio - adjustment
808 vr.add_del_tracked_interface(
809 is_add=1, sw_if_index=self.pg1.sw_if_index, prio=adjustment
811 vr.start_stop(is_start=1)
812 vr.assert_state_equals(VRRP_VR_STATE_MASTER)
814 adv_configured = vr.vrrp_adv_packet(prio=prio)
815 adv_adjusted = vr.vrrp_adv_packet(prio=adjusted_prio)
817 # tracked intf is up -> advertised priority == configured priority
818 self.pg0.enable_capture()
819 rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv)
820 self.assertEqual(rx, adv_configured)
822 # take down pg1, verify priority is now being adjusted
823 self.pg1.admin_down()
824 self.pg0.enable_capture()
825 rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv)
826 self.assertEqual(rx, adv_adjusted)
828 # bring up pg1, verify priority now matches configured value
830 self.pg0.enable_capture()
831 rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv)
832 self.assertEqual(rx, adv_configured)
834 # remove IP address from pg1, verify priority now being adjusted
835 self.pg1.unconfig_ip4()
836 self.pg0.enable_capture()
837 rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv)
838 self.assertEqual(rx, adv_adjusted)
840 # add IP address to pg1, verify priority now matches configured value
841 self.pg1.config_ip4()
842 self.pg0.enable_capture()
843 rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv)
844 self.assertEqual(rx, adv_configured)
846 @unittest.skipUnless(config.extended, "part of extended tests")
847 def test_vrrp4_master_adv_unicast(self):
848 """IPv4 Master VR advertises (unicast)"""
852 intvl = self._default_adv
853 intvl_s = intvl * 0.01
854 vip = self.pg0.local_ip4
855 flags = self._default_flags | VRRP_VR_FLAG_UNICAST
856 unicast_peer = self.pg0.remote_hosts[4]
857 vr = VppVRRPVirtualRouter(
858 self, self.pg0, vr_id, prio=prio, intvl=intvl, flags=flags, vips=[vip]
862 vr.set_unicast_peers([unicast_peer.ip4])
864 # After adding the VR, it should be in the init state
865 vr.assert_state_equals(VRRP_VR_STATE_INIT)
867 # Start VR, transition to master
868 vr.start_stop(is_start=1)
869 vr.assert_state_equals(VRRP_VR_STATE_MASTER)
871 self.pg0.enable_capture()
872 rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv)
874 self.assertTrue(rx.haslayer(Ether))
875 self.assertTrue(rx.haslayer(IP))
876 self.assertTrue(rx.haslayer(VRRPv3))
877 self.assertEqual(rx[Ether].src, self.pg0.local_mac)
878 self.assertEqual(rx[Ether].dst, unicast_peer.mac)
879 self.assertEqual(rx[IP].src, self.pg0.local_ip4)
880 self.assertEqual(rx[IP].dst, unicast_peer.ip4)
881 self.assertEqual(rx[VRRPv3].vrid, vr_id)
882 self.assertEqual(rx[VRRPv3].priority, prio)
883 self.assertEqual(rx[VRRPv3].ipcount, 1)
884 self.assertEqual(rx[VRRPv3].addrlist, [vip])
887 class TestVRRP6(VppTestCase):
888 """IPv6 VRRP Test Case"""
892 super(TestVRRP6, cls).setUpClass()
895 def tearDownClass(cls):
896 super(TestVRRP6, cls).tearDownClass()
899 super(TestVRRP6, self).setUp()
901 self.create_pg_interfaces(range(2))
903 for i in self.pg_interfaces:
906 i.generate_remote_hosts(5)
907 i.configure_ipv6_neighbors()
910 self._default_flags = VRRP_VR_FLAG_IPV6 | VRRP_VR_FLAG_PREEMPT
911 self._default_adv = 100
916 vr_api = vr.query_vpp_config()
917 if vr_api.runtime.state != VRRP_VR_STATE_INIT:
918 vr.start_stop(is_start=0)
919 vr.remove_vpp_config()
921 self.logger.error("Error cleaning up")
923 for i in self.pg_interfaces:
930 super(TestVRRP6, self).tearDown()
932 def verify_vrrp6_mlr(self, pkt, vr):
934 self.assertEqual(ip6.dst, "ff02::16")
935 self.assertEqual(ipv6nh[ip6.nh], "Hop-by-Hop Option Header")
937 hbh = pkt[IPv6ExtHdrHopByHop]
938 self.assertEqual(ipv6nh[hbh.nh], "ICMPv6")
940 self.assertTrue(pkt.haslayer(ICMPv6MLReport2))
941 mlr = pkt[ICMPv6MLReport2]
942 # should contain mc addr records for:
943 # - VRRPv3 multicast addr
944 # - solicited node mc addr record for each VR virtual IPv6 address
945 vips = vr.virtual_ips()
946 self.assertEqual(mlr.records_number, len(vips) + 1)
947 self.assertEqual(mlr.records[0].dst, vr.adv_dest_ip())
949 def verify_vrrp6_adv(self, rx_pkt, vr, prio=None):
950 self.assertTrue(rx_pkt.haslayer(Ether))
951 self.assertTrue(rx_pkt.haslayer(IPv6))
952 self.assertTrue(rx_pkt.haslayer(VRRPv3))
954 # generate a packet for this VR and compare it to the one received
955 pkt = vr.vrrp_adv_packet(prio=prio)
956 self.assertTrue(rx_pkt.haslayer(Ether))
957 self.assertTrue(rx_pkt.haslayer(IPv6))
958 self.assertTrue(rx_pkt.haslayer(VRRPv3))
960 self.assertEqual(pkt, rx_pkt)
962 def verify_vrrp6_gna(self, pkt, vr):
963 self.assertTrue(pkt.haslayer(Ether))
964 self.assertTrue(pkt.haslayer(IPv6))
965 self.assertTrue(pkt.haslayer(ICMPv6ND_NA))
966 self.assertTrue(pkt.haslayer(ICMPv6NDOptDstLLAddr))
968 self.assertEqual(pkt[Ether].dst, "33:33:00:00:00:01")
970 self.assertEqual(pkt[IPv6].dst, "ff02::1")
971 # convert addrs to packed format since string versions could differ
972 src_addr = inet_pton(socket.AF_INET6, pkt[IPv6].src)
973 vr_ll_addr = inet_pton(socket.AF_INET6, vr.interface().local_ip6_ll)
974 self.assertEqual(src_addr, vr_ll_addr)
976 self.assertTrue(pkt[ICMPv6ND_NA].tgt in vr.virtual_ips())
977 self.assertEqual(pkt[ICMPv6NDOptDstLLAddr].lladdr, vr.virtual_mac())
979 # VR with priority 255 owns the virtual address and should
980 # become master and start advertising immediately.
981 @unittest.skipUnless(config.extended, "part of extended tests")
982 def test_vrrp6_master_adv(self):
983 """IPv6 Master VR advertises"""
984 self.pg_enable_capture(self.pg_interfaces)
988 intvl = self._default_adv
989 vr = VppVRRPVirtualRouter(
990 self, self.pg0, 100, prio=prio, intvl=intvl, flags=self._default_flags
995 self.logger.info(self.vapi.cli("show vrrp vr"))
996 vr.start_stop(is_start=1)
997 self.logger.info(self.vapi.cli("show vrrp vr"))
998 vr.start_stop(is_start=0)
999 self.logger.info(self.vapi.cli("show vrrp vr"))
1001 pkts = self.pg0.get_capture(4, filter_out_fn=None)
1003 # Init -> Master: Multicast group Join, VRRP adv, gratuitous NAs sent
1004 self.verify_vrrp6_mlr(pkts[0], vr)
1005 self.verify_vrrp6_adv(pkts[1], vr, prio=prio)
1006 self.verify_vrrp6_gna(pkts[2], vr)
1007 # Master -> Init: Adv with priority 0 sent to force an election
1008 self.verify_vrrp6_adv(pkts[3], vr, prio=0)
1010 vr.remove_vpp_config()
1013 # Same as above but with the update API, and add a change
1014 # of parameters to test that too
1015 @unittest.skipUnless(config.extended, "part of extended tests")
1016 def test_vrrp6_master_adv_update(self):
1017 """IPv6 Master VR adv + Update to Backup"""
1018 self.pg_enable_capture(self.pg_interfaces)
1022 intvl = self._default_adv
1023 vr = VppVRRPVirtualRouter(
1024 self, self.pg0, 100, prio=prio, intvl=intvl, flags=self._default_flags
1027 vr.update_vpp_config()
1028 vr.start_stop(is_start=1)
1029 self.logger.info(self.vapi.cli("show vrrp vr"))
1030 # Update VR with lower prio and larger interval
1031 # we need to keep old VR for the adv checks
1032 upd_vr = VppVRRPVirtualRouter(
1038 flags=self._default_flags,
1039 vips=[self.pg0.remote_ip6],
1041 upd_vr._vrrp_index = vr._vrrp_index
1042 upd_vr.update_vpp_config()
1043 start_time = time.time()
1044 self.logger.info(self.vapi.cli("show vrrp vr"))
1045 upd_vr.assert_state_equals(VRRP_VR_STATE_BACKUP)
1046 self._vrs = [upd_vr]
1048 pkts = self.pg0.get_capture(5, filter_out_fn=None)
1050 # Init -> Master: Multicast group Join, VRRP adv, gratuitous NAs sent
1051 self.verify_vrrp6_mlr(pkts[0], vr)
1052 self.verify_vrrp6_adv(pkts[1], vr, prio=prio)
1053 self.verify_vrrp6_gna(pkts[2], vr)
1054 # Master -> Init: Adv with priority 0 sent to force an election
1055 self.verify_vrrp6_adv(pkts[3], vr, prio=0)
1056 # Init -> Backup: A multicast listener report should be sent
1057 # not actually verified in the test below, where I took this from
1059 # send higher prio advertisements, should not see VPP send any
1060 src_ip = self.pg0.remote_ip6_ll
1061 pkts = [upd_vr.vrrp_adv_packet(prio=110, src_ip=src_ip)]
1062 self.logger.info(self.vapi.cli("show vlib graph"))
1063 end_time = start_time + 2 * upd_vr.master_down_seconds()
1064 while time.time() < end_time:
1065 self.send_and_assert_no_replies(
1066 self.pg0, pkts, timeout=0.01 * upd_vr._intvl
1068 self.logger.info(self.vapi.cli("show trace"))
1070 vr.start_stop(is_start=0)
1071 self.logger.info(self.vapi.cli("show vrrp vr"))
1073 # VR with priority < 255 enters backup state and does not advertise as
1074 # long as it receives higher priority advertisements
1075 @unittest.skipUnless(config.extended, "part of extended tests")
1076 def test_vrrp6_backup_noadv(self):
1077 """IPv6 Backup VR does not advertise"""
1078 self.pg_enable_capture(self.pg_interfaces)
1083 intvl = self._default_adv
1084 intvl_s = intvl * 0.01
1085 vr = VppVRRPVirtualRouter(
1091 flags=self._default_flags,
1092 vips=[self.pg0.remote_ip6],
1095 self._vrs.append(vr)
1097 vr.start_stop(is_start=1)
1099 vr.assert_state_equals(VRRP_VR_STATE_BACKUP)
1100 # watch for advertisements for 2x the master down preemption timeout
1101 end_time = vr.start_time() + 2 * vr.master_down_seconds()
1103 # Init -> Backup: A multicast listener report should be sent
1104 pkts = self.pg0.get_capture(1, filter_out_fn=None)
1106 # send higher prio advertisements, should not see VPP send any
1107 src_ip = self.pg0.remote_ip6_ll
1109 pkts = [vr.vrrp_adv_packet(prio=prio + 10, src_ip=src_ip)]
1110 self.logger.info(self.vapi.cli("show vlib graph"))
1111 while time.time() < end_time:
1112 self.send_and_assert_no_replies(self.pg0, pkts, timeout=intvl_s)
1113 self.logger.info(self.vapi.cli("show trace"))
1116 vr.start_stop(is_start=0)
1117 self.logger.info(self.vapi.cli("show vrrp vr"))
1118 vr.remove_vpp_config()
1121 def test_vrrp6_master_nd(self):
1122 """IPv6 Master VR replies to NDP"""
1125 # VR virtual IP is the default, which is the pg local IP
1128 intvl = self._default_adv
1129 vr = VppVRRPVirtualRouter(
1130 self, self.pg0, 100, prio=prio, intvl=intvl, flags=self._default_flags
1133 self._vrs.append(vr)
1135 # before the VR is up, NDP should resolve to interface MAC
1136 self.pg0.resolve_ndp()
1137 self.assertNotEqual(self.pg0.local_mac, vr.virtual_mac())
1139 # start the VR, NDP should now resolve to virtual MAC
1140 vr.start_stop(is_start=1)
1141 self.pg0.resolve_ndp()
1142 self.assertEqual(self.pg0.local_mac, vr.virtual_mac())
1144 # stop the VR, ARP should resolve to interface MAC again
1145 vr.start_stop(is_start=0)
1146 self.pg0.resolve_ndp()
1147 self.assertNotEqual(self.pg0.local_mac, vr.virtual_mac())
1149 vr.remove_vpp_config()
1152 @unittest.skipUnless(config.extended, "part of extended tests")
1153 def test_vrrp6_backup_nond(self):
1154 """IPv6 Backup VR ignores NDP"""
1155 # We need an address for a virtual IP that is not the IP that
1156 # ARP requests will originate from
1160 intvl = self._default_adv
1161 intvl_s = intvl * 0.01
1162 vip = self.pg0.remote_hosts[1].ip6
1163 vr = VppVRRPVirtualRouter(
1169 flags=self._default_flags,
1173 self._vrs.append(vr)
1175 nsma = in6_getnsma(inet_pton(socket.AF_INET6, vip))
1176 dmac = in6_getnsmac(nsma)
1177 dst_ip = inet_ntop(socket.AF_INET6, nsma)
1180 Ether(dst=dmac, src=self.pg0.remote_mac)
1181 / IPv6(dst=dst_ip, src=self.pg0.remote_ip6)
1182 / ICMPv6ND_NS(tgt=vip)
1183 / ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac)
1186 # Before the VR is started make sure no reply to request for VIP
1187 self.send_and_assert_no_replies(self.pg0, [ndp_req], timeout=1)
1189 # VR should start in backup state and still should not reply to NDP
1190 # send a higher priority adv to make sure it does not become master
1191 adv = vr.vrrp_adv_packet(prio=prio + 10, src_ip=self.pg0.remote_ip6)
1192 pkts = [adv, ndp_req]
1193 vr.start_stop(is_start=1)
1194 self.send_and_assert_no_replies(self.pg0, pkts, timeout=intvl_s)
1196 vr.start_stop(is_start=0)
1198 @unittest.skipUnless(config.extended, "part of extended tests")
1199 def test_vrrp6_election(self):
1200 """IPv6 Backup VR becomes master if no advertisements received"""
1204 intvl = self._default_adv
1205 intvl_s = intvl * 0.01
1206 vip = self.pg0.remote_ip6
1207 vr = VppVRRPVirtualRouter(
1213 flags=self._default_flags,
1216 self._vrs.append(vr)
1219 # After adding the VR, it should be in the init state
1220 vr.assert_state_equals(VRRP_VR_STATE_INIT)
1223 vr.start_stop(is_start=1)
1225 # VR should be in backup state after starting
1226 vr.assert_state_equals(VRRP_VR_STATE_BACKUP)
1227 end_time = vr.start_time() + vr.master_down_seconds()
1229 # no advertisements should arrive until timer expires
1230 self.pg0.enable_capture()
1231 while (time.time() + intvl_s) < end_time:
1233 self.pg0.assert_nothing_captured(filter_out_fn=is_not_adv)
1235 # VR should be in master state after timer expires
1236 self.pg0.enable_capture()
1237 self.pg0.wait_for_packet(intvl_s, is_not_adv)
1238 vr.assert_state_equals(VRRP_VR_STATE_MASTER)
1240 @unittest.skipUnless(config.extended, "part of extended tests")
1241 def test_vrrp6_backup_preempts(self):
1242 """IPv6 Backup VR preempts lower priority master"""
1246 intvl = self._default_adv
1247 intvl_s = intvl * 0.01
1248 vip = self.pg0.remote_ip6
1249 vr = VppVRRPVirtualRouter(
1255 flags=self._default_flags,
1258 self._vrs.append(vr)
1261 # After adding the VR, it should be in the init state
1262 vr.assert_state_equals(VRRP_VR_STATE_INIT)
1265 vr.start_stop(is_start=1)
1267 # VR should be in backup state after starting
1268 vr.assert_state_equals(VRRP_VR_STATE_BACKUP)
1269 end_time = vr.start_time() + vr.master_down_seconds()
1271 # send lower prio advertisements until timer expires
1272 src_ip = self.pg0.remote_ip6
1273 pkts = [vr.vrrp_adv_packet(prio=prio - 10, src_ip=src_ip)]
1274 while (time.time() + intvl_s) < end_time:
1275 self.send_and_assert_no_replies(self.pg0, pkts, timeout=intvl_s)
1276 self.logger.info(self.vapi.cli("show trace"))
1278 # when timer expires, VR should take over as master
1279 self.pg0.enable_capture()
1280 self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv)
1281 vr.assert_state_equals(VRRP_VR_STATE_MASTER)
1283 @unittest.skipUnless(config.extended, "part of extended tests")
1284 def test_vrrp6_master_preempted(self):
1285 """IPv6 Master VR preempted by higher priority backup"""
1287 # A prio 255 VR cannot be preempted so the prio has to be lower and
1288 # we have to wait for it to take over
1291 intvl = self._default_adv
1292 vip = self.pg0.remote_ip6
1293 vr = VppVRRPVirtualRouter(
1299 flags=self._default_flags,
1302 self._vrs.append(vr)
1305 # After adding the VR, it should be in the init state
1306 vr.assert_state_equals(VRRP_VR_STATE_INIT)
1309 vr.start_stop(is_start=1)
1310 vr.assert_state_equals(VRRP_VR_STATE_BACKUP)
1312 # wait for VR to take over as master
1313 end_time = vr.start_time() + vr.master_down_seconds()
1314 sleep_s = end_time - time.time()
1316 vr.assert_state_equals(VRRP_VR_STATE_MASTER)
1318 # Build advertisement packet and send it
1319 pkts = [vr.vrrp_adv_packet(prio=255, src_ip=self.pg0.remote_ip6)]
1320 self.pg_send(self.pg0, pkts)
1322 # VR should be in backup state again
1323 vr.assert_state_equals(VRRP_VR_STATE_BACKUP)
1325 @unittest.skipUnless(config.extended, "part of extended tests")
1326 def test_vrrp6_accept_mode_disabled(self):
1327 """IPv6 Master VR does not reply for VIP w/ accept mode off"""
1329 # accept mode only matters when prio < 255, so it will have to
1330 # come up as a backup and take over as master after the timeout
1333 intvl = self._default_adv
1334 vip = self.pg0.remote_hosts[4].ip6
1335 vr = VppVRRPVirtualRouter(
1341 flags=self._default_flags,
1344 self._vrs.append(vr)
1347 # After adding the VR, it should be in the init state
1348 vr.assert_state_equals(VRRP_VR_STATE_INIT)
1351 vr.start_stop(is_start=1)
1352 vr.assert_state_equals(VRRP_VR_STATE_BACKUP)
1354 # wait for VR to take over as master
1355 end_time = vr.start_time() + vr.master_down_seconds()
1356 sleep_s = end_time - time.time()
1358 vr.assert_state_equals(VRRP_VR_STATE_MASTER)
1360 # send an ICMPv6 echo to the VR virtual IP address
1362 Ether(dst=vr.virtual_mac(), src=self.pg0.remote_mac)
1363 / IPv6(dst=vip, src=self.pg0.remote_ip6)
1364 / ICMPv6EchoRequest(seq=1, id=self.pg0.sw_if_index)
1366 self.pg_send(self.pg0, [echo])
1368 # wait for an echo reply. none should be received
1370 self.pg0.assert_nothing_captured(filter_out_fn=is_not_echo_reply)
1372 @unittest.skipUnless(config.extended, "part of extended tests")
1373 def test_vrrp6_accept_mode_enabled(self):
1374 """IPv6 Master VR replies for VIP w/ accept mode on"""
1376 # A prio 255 VR cannot be preempted so the prio has to be lower and
1377 # we have to wait for it to take over
1380 intvl = self._default_adv
1381 vip = self.pg0.remote_hosts[4].ip6
1382 flags = self._default_flags | VRRP_VR_FLAG_ACCEPT
1383 vr = VppVRRPVirtualRouter(
1384 self, self.pg0, vr_id, prio=prio, intvl=intvl, flags=flags, vips=[vip]
1386 self._vrs.append(vr)
1389 # After adding the VR, it should be in the init state
1390 vr.assert_state_equals(VRRP_VR_STATE_INIT)
1393 vr.start_stop(is_start=1)
1394 vr.assert_state_equals(VRRP_VR_STATE_BACKUP)
1396 # wait for VR to take over as master
1397 end_time = vr.start_time() + vr.master_down_seconds()
1398 sleep_s = end_time - time.time()
1400 vr.assert_state_equals(VRRP_VR_STATE_MASTER)
1402 # send an ICMP echo to the VR virtual IP address
1404 Ether(dst=vr.virtual_mac(), src=self.pg0.remote_mac)
1405 / IPv6(dst=vip, src=self.pg0.remote_ip6)
1406 / ICMPv6EchoRequest(seq=1, id=self.pg0.sw_if_index)
1408 self.pg_send(self.pg0, [echo])
1410 # wait for an echo reply.
1412 rx_pkts = self.pg0.get_capture(
1413 expected_count=1, timeout=1, filter_out_fn=is_not_echo_reply
1416 self.assertEqual(rx_pkts[0][IPv6].src, vip)
1417 self.assertEqual(rx_pkts[0][IPv6].dst, self.pg0.remote_ip6)
1418 self.assertEqual(rx_pkts[0][ICMPv6EchoReply].seq, 1)
1419 self.assertEqual(rx_pkts[0][ICMPv6EchoReply].id, self.pg0.sw_if_index)
1421 @unittest.skipUnless(config.extended, "part of extended tests")
1422 def test_vrrp6_intf_tracking(self):
1423 """IPv6 Master VR adjusts priority based on tracked interface"""
1427 intvl = self._default_adv
1428 intvl_s = intvl * 0.01
1429 vip = self.pg0.local_ip6
1430 vr = VppVRRPVirtualRouter(
1436 flags=self._default_flags,
1439 self._vrs.append(vr)
1442 # After adding the VR, it should be in the init state
1443 vr.assert_state_equals(VRRP_VR_STATE_INIT)
1445 # add pg1 as a tracked interface and start the VR
1447 adjusted_prio = prio - adjustment
1448 vr.add_del_tracked_interface(
1449 is_add=1, sw_if_index=self.pg1.sw_if_index, prio=adjustment
1451 vr.start_stop(is_start=1)
1452 vr.assert_state_equals(VRRP_VR_STATE_MASTER)
1454 adv_configured = vr.vrrp_adv_packet(prio=prio)
1455 adv_adjusted = vr.vrrp_adv_packet(prio=adjusted_prio)
1457 # tracked intf is up -> advertised priority == configured priority
1458 self.pg0.enable_capture()
1459 rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv)
1460 self.assertEqual(rx, adv_configured)
1462 # take down pg1, verify priority is now being adjusted
1463 self.pg1.admin_down()
1464 self.pg0.enable_capture()
1465 rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv)
1466 self.assertEqual(rx, adv_adjusted)
1468 # bring up pg1, verify priority now matches configured value
1470 self.pg0.enable_capture()
1471 rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv)
1472 self.assertEqual(rx, adv_configured)
1474 # remove IP address from pg1, verify priority now being adjusted
1475 self.pg1.unconfig_ip6()
1476 self.pg0.enable_capture()
1477 rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv)
1478 self.assertEqual(rx, adv_adjusted)
1480 # add IP address to pg1, verify priority now matches configured value
1481 self.pg1.config_ip6()
1482 self.pg0.enable_capture()
1483 rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv)
1484 self.assertEqual(rx, adv_configured)
1486 @unittest.skipUnless(config.extended, "part of extended tests")
1487 def test_vrrp6_master_adv_unicast(self):
1488 """IPv6 Master VR advertises (unicast)"""
1492 intvl = self._default_adv
1493 intvl_s = intvl * 0.01
1494 vip = self.pg0.local_ip6
1495 flags = self._default_flags | VRRP_VR_FLAG_UNICAST
1496 unicast_peer = self.pg0.remote_hosts[4]
1497 vr = VppVRRPVirtualRouter(
1498 self, self.pg0, vr_id, prio=prio, intvl=intvl, flags=flags, vips=[vip]
1500 self._vrs.append(vr)
1502 vr.set_unicast_peers([unicast_peer.ip6])
1504 # After adding the VR, it should be in the init state
1505 vr.assert_state_equals(VRRP_VR_STATE_INIT)
1507 # Start VR, transition to master
1508 vr.start_stop(is_start=1)
1509 vr.assert_state_equals(VRRP_VR_STATE_MASTER)
1511 self.pg0.enable_capture()
1512 rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv)
1514 self.assertTrue(rx.haslayer(Ether))
1515 self.assertTrue(rx.haslayer(IPv6))
1516 self.assertTrue(rx.haslayer(VRRPv3))
1517 self.assertEqual(rx[Ether].src, self.pg0.local_mac)
1518 self.assertEqual(rx[Ether].dst, unicast_peer.mac)
1520 ip6_normalize(rx[IPv6].src), ip6_normalize(self.pg0.local_ip6_ll)
1522 self.assertEqual(ip6_normalize(rx[IPv6].dst), ip6_normalize(unicast_peer.ip6))
1523 self.assertEqual(rx[VRRPv3].vrid, vr_id)
1524 self.assertEqual(rx[VRRPv3].priority, prio)
1525 self.assertEqual(rx[VRRPv3].ipcount, 1)
1526 self.assertEqual(rx[VRRPv3].addrlist, [vip])
1529 if __name__ == "__main__":
1530 unittest.main(testRunner=VppTestRunner)