4 # Copyright 2019-2020 Rubicon Communications, LLC (Netgate)
6 # SPDX-License-Identifier: Apache-2.0
12 from socket import inet_pton, inet_ntop
14 from vpp_object import VppObject
15 from vpp_papi import VppEnum
17 from scapy.packet import raw
18 from scapy.layers.l2 import Ether, ARP
19 from scapy.layers.inet import IP, ICMP, icmptypes
20 from scapy.layers.inet6 import IPv6, ipv6nh, IPv6ExtHdrHopByHop, \
21 ICMPv6MLReport2, ICMPv6ND_NA, ICMPv6ND_NS, ICMPv6NDOptDstLLAddr, \
22 ICMPv6NDOptSrcLLAddr, ICMPv6EchoRequest, ICMPv6EchoReply
23 from scapy.contrib.igmpv3 import IGMPv3, IGMPv3mr, IGMPv3gr
24 from scapy.layers.vrrp import IPPROTO_VRRP, VRRPv3
25 from scapy.utils6 import in6_getnsma, in6_getnsmac
26 from config import config
27 from framework import VppTestCase, VppTestRunner
28 from util import ip6_normalize
30 VRRP_VR_FLAG_PREEMPT = 1
31 VRRP_VR_FLAG_ACCEPT = 2
32 VRRP_VR_FLAG_UNICAST = 4
35 VRRP_VR_STATE_INIT = 0
36 VRRP_VR_STATE_BACKUP = 1
37 VRRP_VR_STATE_MASTER = 2
38 VRRP_VR_STATE_INTF_DOWN = 3
40 VRRP_INDEX_INVALID = 0xffffffff
44 """ Want to filter out advertisements, igmp, etc"""
52 """ Filter out everything but advertisements. E.g. multicast RD/ND """
53 if p.haslayer(VRRPv3):
59 def is_not_echo_reply(p):
60 """ filter out advertisements and other while waiting for echo reply """
61 if p.haslayer(IP) and p.haslayer(ICMP):
62 if icmptypes[p[ICMP].type] == "echo-reply":
64 elif p.haslayer(IPv6) and p.haslayer(ICMPv6EchoReply):
70 class VppVRRPVirtualRouter(VppObject):
78 flags=VRRP_VR_FLAG_PREEMPT,
82 self._sw_if_index = self._intf.sw_if_index
87 if (flags & VRRP_VR_FLAG_IPV6):
89 self._adv_dest_mac = "33:33:00:00:00:12"
90 self._virtual_mac = "00:00:5e:00:02:%02x" % vr_id
91 self._adv_dest_ip = "ff02::12"
92 self._vips = ([intf.local_ip6] if vips is None else vips)
95 self._adv_dest_mac = "01:00:5e:00:00:12"
96 self._virtual_mac = "00:00:5e:00:01:%02x" % vr_id
97 self._adv_dest_ip = "224.0.0.18"
98 self._vips = ([intf.local_ip4] if vips is None else vips)
99 self._tracked_ifs = []
100 self._vrrp_index = VRRP_INDEX_INVALID
102 def add_vpp_config(self):
103 self._test.vapi.vrrp_vr_add_del(is_add=1,
104 sw_if_index=self._intf.sw_if_index,
107 interval=self._intvl,
109 n_addrs=len(self._vips),
112 def update_vpp_config(self):
113 r = self._test.vapi.vrrp_vr_update(vrrp_index=self._vrrp_index,
114 sw_if_index=self._intf.sw_if_index,
117 interval=self._intvl,
119 n_addrs=len(self._vips),
121 self._vrrp_index = r.vrrp_index
123 def delete_vpp_config(self):
124 self._test.vapi.vrrp_vr_del(vrrp_index=self._vrrp_index)
126 def query_vpp_config(self):
127 vrs = self._test.vapi.vrrp_vr_dump(sw_if_index=self._intf.sw_if_index)
129 if vr.config.vr_id != self._vr_id:
132 is_ipv6 = (1 if (vr.config.flags & VRRP_VR_FLAG_IPV6) else 0)
133 if is_ipv6 != self._is_ipv6:
140 def remove_vpp_config(self):
141 self._test.vapi.vrrp_vr_add_del(is_add=0,
142 sw_if_index=self._intf.sw_if_index,
145 interval=self._intvl,
147 n_addrs=len(self._vips),
150 def start_stop(self, is_start):
151 self._test.vapi.vrrp_vr_start_stop(is_start=is_start,
152 sw_if_index=self._intf.sw_if_index,
154 is_ipv6=self._is_ipv6)
155 self._start_time = (time.time() if is_start else None)
157 def add_del_tracked_interface(self, is_add, sw_if_index, prio):
159 'sw_if_index': self._intf.sw_if_index,
160 'is_ipv6': self._is_ipv6,
161 'vr_id': self._vr_id,
164 'ifs': [{'sw_if_index': sw_if_index, 'priority': prio}]
166 self._test.vapi.vrrp_vr_track_if_add_del(**args)
167 self._tracked_ifs.append(args['ifs'][0])
169 def set_unicast_peers(self, addrs):
171 'sw_if_index': self._intf.sw_if_index,
172 'is_ipv6': self._is_ipv6,
173 'vr_id': self._vr_id,
174 'n_addrs': len(addrs),
177 self._test.vapi.vrrp_vr_set_peers(**args)
178 self._unicast_peers = addrs
180 def start_time(self):
181 return self._start_time
183 def virtual_mac(self):
184 return self._virtual_mac
186 def virtual_ips(self):
189 def adv_dest_mac(self):
190 return self._adv_dest_mac
192 def adv_dest_ip(self):
193 return self._adv_dest_ip
201 def adv_interval(self):
207 def assert_state_equals(self, state):
208 vr_details = self.query_vpp_config()
209 self._test.assertEqual(vr_details.runtime.state, state)
211 def master_down_seconds(self):
212 vr_details = self.query_vpp_config()
213 return (vr_details.runtime.master_down_int * 0.01)
215 def vrrp_adv_packet(self, prio=None, src_ip=None):
216 dst_ip = self._adv_dest_ip
219 eth = Ether(dst=self._adv_dest_mac, src=self._virtual_mac)
220 vrrp = VRRPv3(vrid=self._vr_id, priority=prio,
221 ipcount=len(self._vips), adv=self._intvl)
223 src_ip = (self._intf.local_ip6_ll if src_ip is None else src_ip)
224 ip = IPv6(src=src_ip, dst=dst_ip, nh=IPPROTO_VRRP, hlim=255)
225 vrrp.addrlist = self._vips
227 src_ip = (self._intf.local_ip4 if src_ip is None else src_ip)
228 ip = IP(src=src_ip, dst=dst_ip, proto=IPPROTO_VRRP, ttl=255, id=0)
229 vrrp.addrlist = self._vips
231 # Fill in default values & checksums
232 pkt = Ether(raw(eth / ip / vrrp))
236 class TestVRRP4(VppTestCase):
237 """ IPv4 VRRP Test Case """
241 super(TestVRRP4, cls).setUpClass()
244 def tearDownClass(cls):
245 super(TestVRRP4, cls).tearDownClass()
248 super(TestVRRP4, self).setUp()
250 self.create_pg_interfaces(range(2))
252 for i in self.pg_interfaces:
255 i.generate_remote_hosts(5)
256 i.configure_ipv4_neighbors()
259 self._default_flags = VRRP_VR_FLAG_PREEMPT
260 self._default_adv = 100
265 vr_api = vr.query_vpp_config()
266 if vr_api.runtime.state != VRRP_VR_STATE_INIT:
267 vr.start_stop(is_start=0)
268 vr.remove_vpp_config()
270 self.logger.error("Error cleaning up")
272 for i in self.pg_interfaces:
279 super(TestVRRP4, self).tearDown()
281 def verify_vrrp4_igmp(self, pkt):
283 self.assertEqual(ip.dst, "224.0.0.22")
284 self.assertEqual(ip.proto, 2)
287 self.assertEqual(IGMPv3.igmpv3types[igmp.type],
288 "Version 3 Membership Report")
290 igmpmr = pkt[IGMPv3mr]
291 self.assertEqual(igmpmr.numgrp, 1)
292 self.assertEqual(igmpmr.records[0].maddr, "224.0.0.18")
294 def verify_vrrp4_garp(self, pkt, vip, vmac):
297 # ARP "who-has" op == 1
298 self.assertEqual(arp.op, 1)
299 self.assertEqual(arp.pdst, arp.psrc)
300 self.assertEqual(arp.pdst, vip)
301 self.assertEqual(arp.hwsrc, vmac)
303 def verify_vrrp4_adv(self, rx_pkt, vr, prio=None):
304 vips = vr.virtual_ips()
307 vrrp = rx_pkt[VRRPv3]
309 pkt = vr.vrrp_adv_packet(prio=prio)
311 # Source MAC is virtual MAC, destination is multicast MAC
312 self.assertEqual(eth.src, vr.virtual_mac())
313 self.assertEqual(eth.dst, vr.adv_dest_mac())
315 self.assertEqual(ip.dst, "224.0.0.18")
316 self.assertEqual(ip.ttl, 255)
317 self.assertEqual(ip.proto, IPPROTO_VRRP)
319 self.assertEqual(vrrp.version, 3)
320 self.assertEqual(vrrp.type, 1)
321 self.assertEqual(vrrp.vrid, vr.vr_id())
324 self.assertEqual(vrrp.priority, prio)
325 self.assertEqual(vrrp.ipcount, len(vips))
326 self.assertEqual(vrrp.adv, vr.adv_interval())
327 self.assertListEqual(vrrp.addrlist, vips)
329 # VR with priority 255 owns the virtual address and should
330 # become master and start advertising immediately.
331 @unittest.skipUnless(config.extended, "part of extended tests")
332 def test_vrrp4_master_adv(self):
333 """ IPv4 Master VR advertises """
334 self.pg_enable_capture(self.pg_interfaces)
338 intvl = self._default_adv
339 vr = VppVRRPVirtualRouter(self, self.pg0, 100,
340 prio=prio, intvl=intvl,
341 flags=self._default_flags)
344 vr.start_stop(is_start=1)
345 self.logger.info(self.vapi.cli("show vrrp vr"))
346 vr.start_stop(is_start=0)
347 self.logger.info(self.vapi.cli("show vrrp vr"))
349 pkts = self.pg0.get_capture(4)
351 # Init -> Master: IGMP Join, VRRP adv, gratuitous ARP are sent
352 self.verify_vrrp4_igmp(pkts[0])
353 self.verify_vrrp4_adv(pkts[1], vr, prio=prio)
354 self.verify_vrrp4_garp(pkts[2], vr.virtual_ips()[0], vr.virtual_mac())
355 # Master -> Init: Adv with priority 0 sent to force an election
356 self.verify_vrrp4_adv(pkts[3], vr, prio=0)
358 vr.remove_vpp_config()
361 # Same as above but with the update API, and add a change
362 # of parameters to test that too
363 @unittest.skipUnless(config.extended, "part of extended tests")
364 def test_vrrp4_master_adv_update(self):
365 """ IPv4 Master VR adv + Update to Backup """
366 self.pg_enable_capture(self.pg_interfaces)
370 intvl = self._default_adv
371 vr = VppVRRPVirtualRouter(self, self.pg0, 100,
372 prio=prio, intvl=intvl,
373 flags=self._default_flags)
375 vr.update_vpp_config()
376 vr.start_stop(is_start=1)
377 self.logger.info(self.vapi.cli("show vrrp vr"))
378 # Update VR with lower prio and larger interval
379 # we need to keep old VR for the adv checks
380 upd_vr = VppVRRPVirtualRouter(self, self.pg0, 100,
381 prio=100, intvl=2*intvl,
382 flags=self._default_flags,
383 vips=[self.pg0.remote_ip4])
384 upd_vr._vrrp_index = vr._vrrp_index
385 upd_vr.update_vpp_config()
386 start_time = time.time()
387 self.logger.info(self.vapi.cli("show vrrp vr"))
388 upd_vr.assert_state_equals(VRRP_VR_STATE_BACKUP)
391 pkts = self.pg0.get_capture(5)
392 # Init -> Master: IGMP Join, VRRP adv, gratuitous ARP are sent
393 self.verify_vrrp4_igmp(pkts[0])
394 self.verify_vrrp4_adv(pkts[1], vr, prio=prio)
395 self.verify_vrrp4_garp(pkts[2], vr.virtual_ips()[0], vr.virtual_mac())
396 # Master -> Init: Adv with priority 0 sent to force an election
397 self.verify_vrrp4_adv(pkts[3], vr, prio=0)
398 # Init -> Backup: An IGMP join should be sent
399 self.verify_vrrp4_igmp(pkts[4])
401 # send higher prio advertisements, should not receive any
402 end_time = start_time + 2 * upd_vr.master_down_seconds()
403 src_ip = self.pg0.remote_ip4
404 pkts = [upd_vr.vrrp_adv_packet(prio=110, src_ip=src_ip)]
405 while time.time() < end_time:
406 self.send_and_assert_no_replies(self.pg0, pkts, timeout=intvl*0.01)
407 self.logger.info(self.vapi.cli("show trace"))
409 upd_vr.start_stop(is_start=0)
410 self.logger.info(self.vapi.cli("show vrrp vr"))
412 # VR with priority < 255 enters backup state and does not advertise as
413 # long as it receives higher priority advertisements
414 @unittest.skipUnless(config.extended, "part of extended tests")
415 def test_vrrp4_backup_noadv(self):
416 """ IPv4 Backup VR does not advertise """
417 self.pg_enable_capture(self.pg_interfaces)
422 intvl = self._default_adv
423 intvl_s = intvl * 0.01
424 vr = VppVRRPVirtualRouter(self, self.pg0, vr_id,
425 prio=prio, intvl=intvl,
426 flags=self._default_flags,
427 vips=[self.pg0.remote_ip4])
431 vr.start_stop(is_start=1)
433 vr.assert_state_equals(VRRP_VR_STATE_BACKUP)
434 # watch for advertisements for 2x the master down preemption timeout
435 end_time = vr.start_time() + 2 * vr.master_down_seconds()
437 # Init -> Backup: An IGMP join should be sent
438 pkts = self.pg0.get_capture(1)
439 self.verify_vrrp4_igmp(pkts[0])
441 # send higher prio advertisements, should not receive any
442 src_ip = self.pg0.remote_ip4
443 pkts = [vr.vrrp_adv_packet(prio=prio+10, src_ip=src_ip)]
444 while time.time() < end_time:
445 self.send_and_assert_no_replies(self.pg0, pkts, timeout=intvl_s)
446 self.logger.info(self.vapi.cli("show trace"))
448 vr.start_stop(is_start=0)
449 self.logger.info(self.vapi.cli("show vrrp vr"))
450 vr.remove_vpp_config()
453 def test_vrrp4_master_arp(self):
454 """ IPv4 Master VR replies to ARP """
457 # VR virtual IP is the default, which is the pg local IP
460 intvl = self._default_adv
461 vr = VppVRRPVirtualRouter(self, self.pg0, 100,
462 prio=prio, intvl=intvl,
463 flags=self._default_flags)
468 # before the VR is up, ARP should resolve to interface MAC
469 self.pg0.resolve_arp()
470 self.assertNotEqual(self.pg0.local_mac, vr.virtual_mac())
472 # start the VR, ARP should now resolve to virtual MAC
473 vr.start_stop(is_start=1)
474 self.pg0.resolve_arp()
475 self.assertEqual(self.pg0.local_mac, vr.virtual_mac())
477 # stop the VR, ARP should resolve to interface MAC again
478 vr.start_stop(is_start=0)
479 self.pg0.resolve_arp()
480 self.assertNotEqual(self.pg0.local_mac, vr.virtual_mac())
482 vr.remove_vpp_config()
485 @unittest.skipUnless(config.extended, "part of extended tests")
486 def test_vrrp4_backup_noarp(self):
487 """ IPv4 Backup VR ignores ARP """
488 # We need an address for a virtual IP that is not the IP that
489 # ARP requests will originate from
493 intvl = self._default_adv
494 vip = self.pg0.remote_hosts[1].ip4
495 vr = VppVRRPVirtualRouter(self, self.pg0, vr_id,
496 prio=prio, intvl=intvl,
497 flags=self._default_flags,
502 arp_req = (Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg0.remote_mac) /
503 ARP(op=ARP.who_has, pdst=vip,
504 psrc=self.pg0.remote_ip4, hwsrc=self.pg0.remote_mac))
506 # Before the VR is started make sure no reply to request for VIP
508 self.pg_enable_capture(self.pg_interfaces)
509 self.send_and_assert_no_replies(self.pg0, [arp_req], timeout=1)
511 # VR should start in backup state and still should not reply to ARP
512 # send a higher priority adv to make sure it does not become master
513 adv = vr.vrrp_adv_packet(prio=prio+10, src_ip=self.pg0.remote_ip4)
514 vr.start_stop(is_start=1)
515 self.send_and_assert_no_replies(self.pg0, [adv, arp_req], timeout=1)
517 vr.start_stop(is_start=0)
518 vr.remove_vpp_config()
521 @unittest.skipUnless(config.extended, "part of extended tests")
522 def test_vrrp4_election(self):
523 """ IPv4 Backup VR becomes master if no advertisements received """
527 intvl = self._default_adv
528 intvl_s = intvl * 0.01
529 vip = self.pg0.remote_ip4
530 vr = VppVRRPVirtualRouter(self, self.pg0, vr_id,
531 prio=prio, intvl=intvl,
532 flags=self._default_flags,
537 # After adding the VR, it should be in the init state
538 vr.assert_state_equals(VRRP_VR_STATE_INIT)
541 vr.start_stop(is_start=1)
543 # VR should be in backup state after starting
544 vr.assert_state_equals(VRRP_VR_STATE_BACKUP)
545 end_time = vr.start_time() + vr.master_down_seconds()
547 # should not receive adverts until timer expires & state transition
548 self.pg_enable_capture(self.pg_interfaces)
549 while (time.time() + intvl_s) < end_time:
551 self.pg0.assert_nothing_captured(filter_out_fn=is_not_adv)
553 # VR should be in master state, should send an adv
554 self.pg0.enable_capture()
555 self.pg0.wait_for_packet(intvl_s, is_not_adv)
556 vr.assert_state_equals(VRRP_VR_STATE_MASTER)
558 @unittest.skipUnless(config.extended, "part of extended tests")
559 def test_vrrp4_backup_preempts(self):
560 """ IPv4 Backup VR preempts lower priority master """
564 intvl = self._default_adv
565 intvl_s = intvl * 0.01
566 vip = self.pg0.remote_ip4
567 vr = VppVRRPVirtualRouter(self, self.pg0, vr_id,
568 prio=prio, intvl=intvl,
569 flags=self._default_flags,
574 # After adding the VR, it should be in the init state
575 vr.assert_state_equals(VRRP_VR_STATE_INIT)
578 vr.start_stop(is_start=1)
580 # VR should be in backup state after starting
581 vr.assert_state_equals(VRRP_VR_STATE_BACKUP)
582 end_time = vr.start_time() + vr.master_down_seconds()
584 # send lower prio advertisements until timer expires
585 src_ip = self.pg0.remote_ip4
586 pkts = [vr.vrrp_adv_packet(prio=prio-10, src_ip=src_ip)]
587 while time.time() + intvl_s < end_time:
588 self.send_and_assert_no_replies(self.pg0, pkts, timeout=intvl_s)
589 self.logger.info(self.vapi.cli("show trace"))
591 # when timer expires, VR should take over as master
592 self.pg0.enable_capture()
593 self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv)
594 vr.assert_state_equals(VRRP_VR_STATE_MASTER)
596 @unittest.skipUnless(config.extended, "part of extended tests")
597 def test_vrrp4_master_preempted(self):
598 """ IPv4 Master VR preempted by higher priority backup """
600 # A prio 255 VR cannot be preempted so the prio has to be lower and
601 # we have to wait for it to take over
604 intvl = self._default_adv
605 vip = self.pg0.remote_ip4
606 vr = VppVRRPVirtualRouter(self, self.pg0, vr_id,
607 prio=prio, intvl=intvl,
608 flags=self._default_flags,
613 # After adding the VR, it should be in the init state
614 vr.assert_state_equals(VRRP_VR_STATE_INIT)
617 vr.start_stop(is_start=1)
618 vr.assert_state_equals(VRRP_VR_STATE_BACKUP)
620 # wait for VR to take over as master
621 end_time = vr.start_time() + vr.master_down_seconds()
622 sleep_s = end_time - time.time()
624 vr.assert_state_equals(VRRP_VR_STATE_MASTER)
626 # Build advertisement packet and send it
627 pkts = [vr.vrrp_adv_packet(prio=255, src_ip=self.pg0.remote_ip4)]
628 self.pg_send(self.pg0, pkts)
630 # VR should be in backup state again
631 vr.assert_state_equals(VRRP_VR_STATE_BACKUP)
633 @unittest.skipUnless(config.extended, "part of extended tests")
634 def test_vrrp4_accept_mode_disabled(self):
635 """ IPv4 Master VR does not reply for VIP w/ accept mode off """
637 # accept mode only matters when prio < 255, so it will have to
638 # come up as a backup and take over as master after the timeout
641 intvl = self._default_adv
642 vip = self.pg0.remote_hosts[4].ip4
643 vr = VppVRRPVirtualRouter(self, self.pg0, vr_id,
644 prio=prio, intvl=intvl,
645 flags=self._default_flags,
650 # After adding the VR, it should be in the init state
651 vr.assert_state_equals(VRRP_VR_STATE_INIT)
654 vr.start_stop(is_start=1)
655 vr.assert_state_equals(VRRP_VR_STATE_BACKUP)
657 # wait for VR to take over as master
658 end_time = vr.start_time() + vr.master_down_seconds()
659 sleep_s = end_time - time.time()
661 vr.assert_state_equals(VRRP_VR_STATE_MASTER)
663 # send an ICMP echo to the VR virtual IP address
664 echo = (Ether(dst=vr.virtual_mac(), src=self.pg0.remote_mac) /
665 IP(dst=vip, src=self.pg0.remote_ip4) /
666 ICMP(seq=1, id=self.pg0.sw_if_index, type='echo-request'))
667 self.pg_send(self.pg0, [echo])
669 # wait for an echo reply. none should be received
671 self.pg0.assert_nothing_captured(filter_out_fn=is_not_echo_reply)
673 @unittest.skipUnless(config.extended, "part of extended tests")
674 def test_vrrp4_accept_mode_enabled(self):
675 """ IPv4 Master VR replies for VIP w/ accept mode on """
677 # A prio 255 VR cannot be preempted so the prio has to be lower and
678 # we have to wait for it to take over
681 intvl = self._default_adv
682 vip = self.pg0.remote_hosts[4].ip4
683 flags = (VRRP_VR_FLAG_PREEMPT | VRRP_VR_FLAG_ACCEPT)
684 vr = VppVRRPVirtualRouter(self, self.pg0, vr_id,
685 prio=prio, intvl=intvl,
691 # After adding the VR, it should be in the init state
692 vr.assert_state_equals(VRRP_VR_STATE_INIT)
695 vr.start_stop(is_start=1)
696 vr.assert_state_equals(VRRP_VR_STATE_BACKUP)
698 # wait for VR to take over as master
699 end_time = vr.start_time() + vr.master_down_seconds()
700 sleep_s = end_time - time.time()
702 vr.assert_state_equals(VRRP_VR_STATE_MASTER)
704 # send an ICMP echo to the VR virtual IP address
705 echo = (Ether(dst=vr.virtual_mac(), src=self.pg0.remote_mac) /
706 IP(dst=vip, src=self.pg0.remote_ip4) /
707 ICMP(seq=1, id=self.pg0.sw_if_index, type='echo-request'))
708 self.pg_send(self.pg0, [echo])
710 # wait for an echo reply.
712 rx_pkts = self.pg0.get_capture(expected_count=1, timeout=1,
713 filter_out_fn=is_not_echo_reply)
715 self.assertEqual(rx_pkts[0][IP].src, vip)
716 self.assertEqual(rx_pkts[0][IP].dst, self.pg0.remote_ip4)
717 self.assertEqual(icmptypes[rx_pkts[0][ICMP].type], "echo-reply")
718 self.assertEqual(rx_pkts[0][ICMP].seq, 1)
719 self.assertEqual(rx_pkts[0][ICMP].id, self.pg0.sw_if_index)
721 @unittest.skipUnless(config.extended, "part of extended tests")
722 def test_vrrp4_intf_tracking(self):
723 """ IPv4 Master VR adjusts priority based on tracked interface """
727 intvl = self._default_adv
728 intvl_s = intvl * 0.01
729 vip = self.pg0.local_ip4
730 vr = VppVRRPVirtualRouter(self, self.pg0, vr_id,
731 prio=prio, intvl=intvl,
732 flags=self._default_flags,
737 # After adding the VR, it should be in the init state
738 vr.assert_state_equals(VRRP_VR_STATE_INIT)
740 # add pg1 as a tracked interface and start the VR
742 adjusted_prio = prio - adjustment
743 vr.add_del_tracked_interface(is_add=1,
744 sw_if_index=self.pg1.sw_if_index,
746 vr.start_stop(is_start=1)
747 vr.assert_state_equals(VRRP_VR_STATE_MASTER)
749 adv_configured = vr.vrrp_adv_packet(prio=prio)
750 adv_adjusted = vr.vrrp_adv_packet(prio=adjusted_prio)
752 # tracked intf is up -> advertised priority == configured priority
753 self.pg0.enable_capture()
754 rx = self.pg0.wait_for_packet(timeout=intvl_s,
755 filter_out_fn=is_not_adv)
756 self.assertEqual(rx, adv_configured)
758 # take down pg1, verify priority is now being adjusted
759 self.pg1.admin_down()
760 self.pg0.enable_capture()
761 rx = self.pg0.wait_for_packet(timeout=intvl_s,
762 filter_out_fn=is_not_adv)
763 self.assertEqual(rx, adv_adjusted)
765 # bring up pg1, verify priority now matches configured value
767 self.pg0.enable_capture()
768 rx = self.pg0.wait_for_packet(timeout=intvl_s,
769 filter_out_fn=is_not_adv)
770 self.assertEqual(rx, adv_configured)
772 # remove IP address from pg1, verify priority now being adjusted
773 self.pg1.unconfig_ip4()
774 self.pg0.enable_capture()
775 rx = self.pg0.wait_for_packet(timeout=intvl_s,
776 filter_out_fn=is_not_adv)
777 self.assertEqual(rx, adv_adjusted)
779 # add IP address to pg1, verify priority now matches configured value
780 self.pg1.config_ip4()
781 self.pg0.enable_capture()
782 rx = self.pg0.wait_for_packet(timeout=intvl_s,
783 filter_out_fn=is_not_adv)
784 self.assertEqual(rx, adv_configured)
786 @unittest.skipUnless(config.extended, "part of extended tests")
787 def test_vrrp4_master_adv_unicast(self):
788 """ IPv4 Master VR advertises (unicast) """
792 intvl = self._default_adv
793 intvl_s = intvl * 0.01
794 vip = self.pg0.local_ip4
795 flags = (self._default_flags | VRRP_VR_FLAG_UNICAST)
796 unicast_peer = self.pg0.remote_hosts[4]
797 vr = VppVRRPVirtualRouter(self, self.pg0, vr_id,
798 prio=prio, intvl=intvl,
803 vr.set_unicast_peers([unicast_peer.ip4])
805 # After adding the VR, it should be in the init state
806 vr.assert_state_equals(VRRP_VR_STATE_INIT)
808 # Start VR, transition to master
809 vr.start_stop(is_start=1)
810 vr.assert_state_equals(VRRP_VR_STATE_MASTER)
812 self.pg0.enable_capture()
813 rx = self.pg0.wait_for_packet(timeout=intvl_s,
814 filter_out_fn=is_not_adv)
816 self.assertTrue(rx.haslayer(Ether))
817 self.assertTrue(rx.haslayer(IP))
818 self.assertTrue(rx.haslayer(VRRPv3))
819 self.assertEqual(rx[Ether].src, self.pg0.local_mac)
820 self.assertEqual(rx[Ether].dst, unicast_peer.mac)
821 self.assertEqual(rx[IP].src, self.pg0.local_ip4)
822 self.assertEqual(rx[IP].dst, unicast_peer.ip4)
823 self.assertEqual(rx[VRRPv3].vrid, vr_id)
824 self.assertEqual(rx[VRRPv3].priority, prio)
825 self.assertEqual(rx[VRRPv3].ipcount, 1)
826 self.assertEqual(rx[VRRPv3].addrlist, [vip])
829 class TestVRRP6(VppTestCase):
830 """ IPv6 VRRP Test Case """
834 super(TestVRRP6, cls).setUpClass()
837 def tearDownClass(cls):
838 super(TestVRRP6, cls).tearDownClass()
841 super(TestVRRP6, self).setUp()
843 self.create_pg_interfaces(range(2))
845 for i in self.pg_interfaces:
848 i.generate_remote_hosts(5)
849 i.configure_ipv6_neighbors()
852 self._default_flags = (VRRP_VR_FLAG_IPV6 | VRRP_VR_FLAG_PREEMPT)
853 self._default_adv = 100
858 vr_api = vr.query_vpp_config()
859 if vr_api.runtime.state != VRRP_VR_STATE_INIT:
860 vr.start_stop(is_start=0)
861 vr.remove_vpp_config()
863 self.logger.error("Error cleaning up")
865 for i in self.pg_interfaces:
872 super(TestVRRP6, self).tearDown()
874 def verify_vrrp6_mlr(self, pkt, vr):
876 self.assertEqual(ip6.dst, "ff02::16")
877 self.assertEqual(ipv6nh[ip6.nh], "Hop-by-Hop Option Header")
879 hbh = pkt[IPv6ExtHdrHopByHop]
880 self.assertEqual(ipv6nh[hbh.nh], "ICMPv6")
882 self.assertTrue(pkt.haslayer(ICMPv6MLReport2))
883 mlr = pkt[ICMPv6MLReport2]
884 # should contain mc addr records for:
885 # - VRRPv3 multicast addr
886 # - solicited node mc addr record for each VR virtual IPv6 address
887 vips = vr.virtual_ips()
888 self.assertEqual(mlr.records_number, len(vips) + 1)
889 self.assertEqual(mlr.records[0].dst, vr.adv_dest_ip())
891 def verify_vrrp6_adv(self, rx_pkt, vr, prio=None):
892 self.assertTrue(rx_pkt.haslayer(Ether))
893 self.assertTrue(rx_pkt.haslayer(IPv6))
894 self.assertTrue(rx_pkt.haslayer(VRRPv3))
896 # generate a packet for this VR and compare it to the one received
897 pkt = vr.vrrp_adv_packet(prio=prio)
898 self.assertTrue(rx_pkt.haslayer(Ether))
899 self.assertTrue(rx_pkt.haslayer(IPv6))
900 self.assertTrue(rx_pkt.haslayer(VRRPv3))
902 self.assertEqual(pkt, rx_pkt)
904 def verify_vrrp6_gna(self, pkt, vr):
905 self.assertTrue(pkt.haslayer(Ether))
906 self.assertTrue(pkt.haslayer(IPv6))
907 self.assertTrue(pkt.haslayer(ICMPv6ND_NA))
908 self.assertTrue(pkt.haslayer(ICMPv6NDOptDstLLAddr))
910 self.assertEqual(pkt[Ether].dst, "33:33:00:00:00:01")
912 self.assertEqual(pkt[IPv6].dst, "ff02::1")
913 # convert addrs to packed format since string versions could differ
914 src_addr = inet_pton(socket.AF_INET6, pkt[IPv6].src)
915 vr_ll_addr = inet_pton(socket.AF_INET6, vr.interface().local_ip6_ll)
916 self.assertEqual(src_addr, vr_ll_addr)
918 self.assertTrue(pkt[ICMPv6ND_NA].tgt in vr.virtual_ips())
919 self.assertEqual(pkt[ICMPv6NDOptDstLLAddr].lladdr, vr.virtual_mac())
921 # VR with priority 255 owns the virtual address and should
922 # become master and start advertising immediately.
923 @unittest.skipUnless(config.extended, "part of extended tests")
924 def test_vrrp6_master_adv(self):
925 """ IPv6 Master VR advertises """
926 self.pg_enable_capture(self.pg_interfaces)
930 intvl = self._default_adv
931 vr = VppVRRPVirtualRouter(self, self.pg0, 100,
932 prio=prio, intvl=intvl,
933 flags=self._default_flags)
937 self.logger.info(self.vapi.cli("show vrrp vr"))
938 vr.start_stop(is_start=1)
939 self.logger.info(self.vapi.cli("show vrrp vr"))
940 vr.start_stop(is_start=0)
941 self.logger.info(self.vapi.cli("show vrrp vr"))
943 pkts = self.pg0.get_capture(4, filter_out_fn=None)
945 # Init -> Master: Multicast group Join, VRRP adv, gratuitous NAs sent
946 self.verify_vrrp6_mlr(pkts[0], vr)
947 self.verify_vrrp6_adv(pkts[1], vr, prio=prio)
948 self.verify_vrrp6_gna(pkts[2], vr)
949 # Master -> Init: Adv with priority 0 sent to force an election
950 self.verify_vrrp6_adv(pkts[3], vr, prio=0)
952 vr.remove_vpp_config()
955 # Same as above but with the update API, and add a change
956 # of parameters to test that too
957 @unittest.skipUnless(config.extended, "part of extended tests")
958 def test_vrrp6_master_adv_update(self):
959 """ IPv6 Master VR adv + Update to Backup """
960 self.pg_enable_capture(self.pg_interfaces)
964 intvl = self._default_adv
965 vr = VppVRRPVirtualRouter(self, self.pg0, 100,
966 prio=prio, intvl=intvl,
967 flags=self._default_flags)
969 vr.update_vpp_config()
970 vr.start_stop(is_start=1)
971 self.logger.info(self.vapi.cli("show vrrp vr"))
972 # Update VR with lower prio and larger interval
973 # we need to keep old VR for the adv checks
974 upd_vr = VppVRRPVirtualRouter(self, self.pg0, 100,
975 prio=100, intvl=2*intvl,
976 flags=self._default_flags,
977 vips=[self.pg0.remote_ip6])
978 upd_vr._vrrp_index = vr._vrrp_index
979 upd_vr.update_vpp_config()
980 start_time = time.time()
981 self.logger.info(self.vapi.cli("show vrrp vr"))
982 upd_vr.assert_state_equals(VRRP_VR_STATE_BACKUP)
985 pkts = self.pg0.get_capture(5, filter_out_fn=None)
987 # Init -> Master: Multicast group Join, VRRP adv, gratuitous NAs sent
988 self.verify_vrrp6_mlr(pkts[0], vr)
989 self.verify_vrrp6_adv(pkts[1], vr, prio=prio)
990 self.verify_vrrp6_gna(pkts[2], vr)
991 # Master -> Init: Adv with priority 0 sent to force an election
992 self.verify_vrrp6_adv(pkts[3], vr, prio=0)
993 # Init -> Backup: A multicast listener report should be sent
994 # not actually verified in the test below, where I took this from
996 # send higher prio advertisements, should not see VPP send any
997 src_ip = self.pg0.remote_ip6_ll
998 pkts = [upd_vr.vrrp_adv_packet(prio=110, src_ip=src_ip)]
999 self.logger.info(self.vapi.cli("show vlib graph"))
1000 end_time = start_time + 2 * upd_vr.master_down_seconds()
1001 while time.time() < end_time:
1002 self.send_and_assert_no_replies(
1003 self.pg0, pkts, timeout=0.01*upd_vr._intvl)
1004 self.logger.info(self.vapi.cli("show trace"))
1006 vr.start_stop(is_start=0)
1007 self.logger.info(self.vapi.cli("show vrrp vr"))
1009 # VR with priority < 255 enters backup state and does not advertise as
1010 # long as it receives higher priority advertisements
1011 @unittest.skipUnless(config.extended, "part of extended tests")
1012 def test_vrrp6_backup_noadv(self):
1013 """ IPv6 Backup VR does not advertise """
1014 self.pg_enable_capture(self.pg_interfaces)
1019 intvl = self._default_adv
1020 intvl_s = intvl * 0.01
1021 vr = VppVRRPVirtualRouter(self, self.pg0, vr_id,
1022 prio=prio, intvl=intvl,
1023 flags=self._default_flags,
1024 vips=[self.pg0.remote_ip6])
1026 self._vrs.append(vr)
1028 vr.start_stop(is_start=1)
1030 vr.assert_state_equals(VRRP_VR_STATE_BACKUP)
1031 # watch for advertisements for 2x the master down preemption timeout
1032 end_time = vr.start_time() + 2 * vr.master_down_seconds()
1034 # Init -> Backup: A multicast listener report should be sent
1035 pkts = self.pg0.get_capture(1, filter_out_fn=None)
1037 # send higher prio advertisements, should not see VPP send any
1038 src_ip = self.pg0.remote_ip6_ll
1040 pkts = [vr.vrrp_adv_packet(prio=prio+10, src_ip=src_ip)]
1041 self.logger.info(self.vapi.cli("show vlib graph"))
1042 while time.time() < end_time:
1043 self.send_and_assert_no_replies(self.pg0, pkts, timeout=intvl_s)
1044 self.logger.info(self.vapi.cli("show trace"))
1047 vr.start_stop(is_start=0)
1048 self.logger.info(self.vapi.cli("show vrrp vr"))
1049 vr.remove_vpp_config()
1052 def test_vrrp6_master_nd(self):
1053 """ IPv6 Master VR replies to NDP """
1056 # VR virtual IP is the default, which is the pg local IP
1059 intvl = self._default_adv
1060 vr = VppVRRPVirtualRouter(self, self.pg0, 100,
1061 prio=prio, intvl=intvl,
1062 flags=self._default_flags)
1064 self._vrs.append(vr)
1066 # before the VR is up, NDP should resolve to interface MAC
1067 self.pg0.resolve_ndp()
1068 self.assertNotEqual(self.pg0.local_mac, vr.virtual_mac())
1070 # start the VR, NDP should now resolve to virtual MAC
1071 vr.start_stop(is_start=1)
1072 self.pg0.resolve_ndp()
1073 self.assertEqual(self.pg0.local_mac, vr.virtual_mac())
1075 # stop the VR, ARP should resolve to interface MAC again
1076 vr.start_stop(is_start=0)
1077 self.pg0.resolve_ndp()
1078 self.assertNotEqual(self.pg0.local_mac, vr.virtual_mac())
1080 vr.remove_vpp_config()
1083 @unittest.skipUnless(config.extended, "part of extended tests")
1084 def test_vrrp6_backup_nond(self):
1085 """ IPv6 Backup VR ignores NDP """
1086 # We need an address for a virtual IP that is not the IP that
1087 # ARP requests will originate from
1091 intvl = self._default_adv
1092 intvl_s = intvl * 0.01
1093 vip = self.pg0.remote_hosts[1].ip6
1094 vr = VppVRRPVirtualRouter(self, self.pg0, vr_id,
1095 prio=prio, intvl=intvl,
1096 flags=self._default_flags,
1099 self._vrs.append(vr)
1101 nsma = in6_getnsma(inet_pton(socket.AF_INET6, vip))
1102 dmac = in6_getnsmac(nsma)
1103 dst_ip = inet_ntop(socket.AF_INET6, nsma)
1105 ndp_req = (Ether(dst=dmac, src=self.pg0.remote_mac) /
1106 IPv6(dst=dst_ip, src=self.pg0.remote_ip6) /
1107 ICMPv6ND_NS(tgt=vip) /
1108 ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac))
1110 # Before the VR is started make sure no reply to request for VIP
1111 self.send_and_assert_no_replies(self.pg0, [ndp_req], timeout=1)
1113 # VR should start in backup state and still should not reply to NDP
1114 # send a higher priority adv to make sure it does not become master
1115 adv = vr.vrrp_adv_packet(prio=prio+10, src_ip=self.pg0.remote_ip6)
1116 pkts = [adv, ndp_req]
1117 vr.start_stop(is_start=1)
1118 self.send_and_assert_no_replies(self.pg0, pkts, timeout=intvl_s)
1120 vr.start_stop(is_start=0)
1122 @unittest.skipUnless(config.extended, "part of extended tests")
1123 def test_vrrp6_election(self):
1124 """ IPv6 Backup VR becomes master if no advertisements received """
1128 intvl = self._default_adv
1129 intvl_s = intvl * 0.01
1130 vip = self.pg0.remote_ip6
1131 vr = VppVRRPVirtualRouter(self, self.pg0, vr_id,
1132 prio=prio, intvl=intvl,
1133 flags=self._default_flags,
1135 self._vrs.append(vr)
1138 # After adding the VR, it should be in the init state
1139 vr.assert_state_equals(VRRP_VR_STATE_INIT)
1142 vr.start_stop(is_start=1)
1144 # VR should be in backup state after starting
1145 vr.assert_state_equals(VRRP_VR_STATE_BACKUP)
1146 end_time = vr.start_time() + vr.master_down_seconds()
1148 # no advertisements should arrive until timer expires
1149 self.pg0.enable_capture()
1150 while (time.time() + intvl_s) < end_time:
1152 self.pg0.assert_nothing_captured(filter_out_fn=is_not_adv)
1154 # VR should be in master state after timer expires
1155 self.pg0.enable_capture()
1156 self.pg0.wait_for_packet(intvl_s, is_not_adv)
1157 vr.assert_state_equals(VRRP_VR_STATE_MASTER)
1159 @unittest.skipUnless(config.extended, "part of extended tests")
1160 def test_vrrp6_backup_preempts(self):
1161 """ IPv6 Backup VR preempts lower priority master """
1165 intvl = self._default_adv
1166 intvl_s = intvl * 0.01
1167 vip = self.pg0.remote_ip6
1168 vr = VppVRRPVirtualRouter(self, self.pg0, vr_id,
1169 prio=prio, intvl=intvl,
1170 flags=self._default_flags,
1172 self._vrs.append(vr)
1175 # After adding the VR, it should be in the init state
1176 vr.assert_state_equals(VRRP_VR_STATE_INIT)
1179 vr.start_stop(is_start=1)
1181 # VR should be in backup state after starting
1182 vr.assert_state_equals(VRRP_VR_STATE_BACKUP)
1183 end_time = vr.start_time() + vr.master_down_seconds()
1185 # send lower prio advertisements until timer expires
1186 src_ip = self.pg0.remote_ip6
1187 pkts = [vr.vrrp_adv_packet(prio=prio-10, src_ip=src_ip)]
1188 while (time.time() + intvl_s) < end_time:
1189 self.send_and_assert_no_replies(self.pg0, pkts, timeout=intvl_s)
1190 self.logger.info(self.vapi.cli("show trace"))
1192 # when timer expires, VR should take over as master
1193 self.pg0.enable_capture()
1194 self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv)
1195 vr.assert_state_equals(VRRP_VR_STATE_MASTER)
1197 @unittest.skipUnless(config.extended, "part of extended tests")
1198 def test_vrrp6_master_preempted(self):
1199 """ IPv6 Master VR preempted by higher priority backup """
1201 # A prio 255 VR cannot be preempted so the prio has to be lower and
1202 # we have to wait for it to take over
1205 intvl = self._default_adv
1206 vip = self.pg0.remote_ip6
1207 vr = VppVRRPVirtualRouter(self, self.pg0, vr_id,
1208 prio=prio, intvl=intvl,
1209 flags=self._default_flags,
1211 self._vrs.append(vr)
1214 # After adding the VR, it should be in the init state
1215 vr.assert_state_equals(VRRP_VR_STATE_INIT)
1218 vr.start_stop(is_start=1)
1219 vr.assert_state_equals(VRRP_VR_STATE_BACKUP)
1221 # wait for VR to take over as master
1222 end_time = vr.start_time() + vr.master_down_seconds()
1223 sleep_s = end_time - time.time()
1225 vr.assert_state_equals(VRRP_VR_STATE_MASTER)
1227 # Build advertisement packet and send it
1228 pkts = [vr.vrrp_adv_packet(prio=255, src_ip=self.pg0.remote_ip6)]
1229 self.pg_send(self.pg0, pkts)
1231 # VR should be in backup state again
1232 vr.assert_state_equals(VRRP_VR_STATE_BACKUP)
1234 @unittest.skipUnless(config.extended, "part of extended tests")
1235 def test_vrrp6_accept_mode_disabled(self):
1236 """ IPv6 Master VR does not reply for VIP w/ accept mode off """
1238 # accept mode only matters when prio < 255, so it will have to
1239 # come up as a backup and take over as master after the timeout
1242 intvl = self._default_adv
1243 vip = self.pg0.remote_hosts[4].ip6
1244 vr = VppVRRPVirtualRouter(self, self.pg0, vr_id,
1245 prio=prio, intvl=intvl,
1246 flags=self._default_flags,
1248 self._vrs.append(vr)
1251 # After adding the VR, it should be in the init state
1252 vr.assert_state_equals(VRRP_VR_STATE_INIT)
1255 vr.start_stop(is_start=1)
1256 vr.assert_state_equals(VRRP_VR_STATE_BACKUP)
1258 # wait for VR to take over as master
1259 end_time = vr.start_time() + vr.master_down_seconds()
1260 sleep_s = end_time - time.time()
1262 vr.assert_state_equals(VRRP_VR_STATE_MASTER)
1264 # send an ICMPv6 echo to the VR virtual IP address
1265 echo = (Ether(dst=vr.virtual_mac(), src=self.pg0.remote_mac) /
1266 IPv6(dst=vip, src=self.pg0.remote_ip6) /
1267 ICMPv6EchoRequest(seq=1, id=self.pg0.sw_if_index))
1268 self.pg_send(self.pg0, [echo])
1270 # wait for an echo reply. none should be received
1272 self.pg0.assert_nothing_captured(filter_out_fn=is_not_echo_reply)
1274 @unittest.skipUnless(config.extended, "part of extended tests")
1275 def test_vrrp6_accept_mode_enabled(self):
1276 """ IPv6 Master VR replies for VIP w/ accept mode on """
1278 # A prio 255 VR cannot be preempted so the prio has to be lower and
1279 # we have to wait for it to take over
1282 intvl = self._default_adv
1283 vip = self.pg0.remote_hosts[4].ip6
1284 flags = (self._default_flags | VRRP_VR_FLAG_ACCEPT)
1285 vr = VppVRRPVirtualRouter(self, self.pg0, vr_id,
1286 prio=prio, intvl=intvl,
1289 self._vrs.append(vr)
1292 # After adding the VR, it should be in the init state
1293 vr.assert_state_equals(VRRP_VR_STATE_INIT)
1296 vr.start_stop(is_start=1)
1297 vr.assert_state_equals(VRRP_VR_STATE_BACKUP)
1299 # wait for VR to take over as master
1300 end_time = vr.start_time() + vr.master_down_seconds()
1301 sleep_s = end_time - time.time()
1303 vr.assert_state_equals(VRRP_VR_STATE_MASTER)
1305 # send an ICMP echo to the VR virtual IP address
1306 echo = (Ether(dst=vr.virtual_mac(), src=self.pg0.remote_mac) /
1307 IPv6(dst=vip, src=self.pg0.remote_ip6) /
1308 ICMPv6EchoRequest(seq=1, id=self.pg0.sw_if_index))
1309 self.pg_send(self.pg0, [echo])
1311 # wait for an echo reply.
1313 rx_pkts = self.pg0.get_capture(expected_count=1, timeout=1,
1314 filter_out_fn=is_not_echo_reply)
1316 self.assertEqual(rx_pkts[0][IPv6].src, vip)
1317 self.assertEqual(rx_pkts[0][IPv6].dst, self.pg0.remote_ip6)
1318 self.assertEqual(rx_pkts[0][ICMPv6EchoReply].seq, 1)
1319 self.assertEqual(rx_pkts[0][ICMPv6EchoReply].id, self.pg0.sw_if_index)
1321 @unittest.skipUnless(config.extended, "part of extended tests")
1322 def test_vrrp6_intf_tracking(self):
1323 """ IPv6 Master VR adjusts priority based on tracked interface """
1327 intvl = self._default_adv
1328 intvl_s = intvl * 0.01
1329 vip = self.pg0.local_ip6
1330 vr = VppVRRPVirtualRouter(self, self.pg0, vr_id,
1331 prio=prio, intvl=intvl,
1332 flags=self._default_flags,
1334 self._vrs.append(vr)
1337 # After adding the VR, it should be in the init state
1338 vr.assert_state_equals(VRRP_VR_STATE_INIT)
1340 # add pg1 as a tracked interface and start the VR
1342 adjusted_prio = prio - adjustment
1343 vr.add_del_tracked_interface(is_add=1,
1344 sw_if_index=self.pg1.sw_if_index,
1346 vr.start_stop(is_start=1)
1347 vr.assert_state_equals(VRRP_VR_STATE_MASTER)
1349 adv_configured = vr.vrrp_adv_packet(prio=prio)
1350 adv_adjusted = vr.vrrp_adv_packet(prio=adjusted_prio)
1352 # tracked intf is up -> advertised priority == configured priority
1353 self.pg0.enable_capture()
1354 rx = self.pg0.wait_for_packet(timeout=intvl_s,
1355 filter_out_fn=is_not_adv)
1356 self.assertEqual(rx, adv_configured)
1358 # take down pg1, verify priority is now being adjusted
1359 self.pg1.admin_down()
1360 self.pg0.enable_capture()
1361 rx = self.pg0.wait_for_packet(timeout=intvl_s,
1362 filter_out_fn=is_not_adv)
1363 self.assertEqual(rx, adv_adjusted)
1365 # bring up pg1, verify priority now matches configured value
1367 self.pg0.enable_capture()
1368 rx = self.pg0.wait_for_packet(timeout=intvl_s,
1369 filter_out_fn=is_not_adv)
1370 self.assertEqual(rx, adv_configured)
1372 # remove IP address from pg1, verify priority now being adjusted
1373 self.pg1.unconfig_ip6()
1374 self.pg0.enable_capture()
1375 rx = self.pg0.wait_for_packet(timeout=intvl_s,
1376 filter_out_fn=is_not_adv)
1377 self.assertEqual(rx, adv_adjusted)
1379 # add IP address to pg1, verify priority now matches configured value
1380 self.pg1.config_ip6()
1381 self.pg0.enable_capture()
1382 rx = self.pg0.wait_for_packet(timeout=intvl_s,
1383 filter_out_fn=is_not_adv)
1384 self.assertEqual(rx, adv_configured)
1386 @unittest.skipUnless(config.extended, "part of extended tests")
1387 def test_vrrp6_master_adv_unicast(self):
1388 """ IPv6 Master VR advertises (unicast) """
1392 intvl = self._default_adv
1393 intvl_s = intvl * 0.01
1394 vip = self.pg0.local_ip6
1395 flags = (self._default_flags | VRRP_VR_FLAG_UNICAST)
1396 unicast_peer = self.pg0.remote_hosts[4]
1397 vr = VppVRRPVirtualRouter(self, self.pg0, vr_id,
1398 prio=prio, intvl=intvl,
1401 self._vrs.append(vr)
1403 vr.set_unicast_peers([unicast_peer.ip6])
1405 # After adding the VR, it should be in the init state
1406 vr.assert_state_equals(VRRP_VR_STATE_INIT)
1408 # Start VR, transition to master
1409 vr.start_stop(is_start=1)
1410 vr.assert_state_equals(VRRP_VR_STATE_MASTER)
1412 self.pg0.enable_capture()
1413 rx = self.pg0.wait_for_packet(timeout=intvl_s,
1414 filter_out_fn=is_not_adv)
1416 self.assertTrue(rx.haslayer(Ether))
1417 self.assertTrue(rx.haslayer(IPv6))
1418 self.assertTrue(rx.haslayer(VRRPv3))
1419 self.assertEqual(rx[Ether].src, self.pg0.local_mac)
1420 self.assertEqual(rx[Ether].dst, unicast_peer.mac)
1421 self.assertEqual(ip6_normalize(rx[IPv6].src),
1422 ip6_normalize(self.pg0.local_ip6_ll))
1423 self.assertEqual(ip6_normalize(rx[IPv6].dst),
1424 ip6_normalize(unicast_peer.ip6))
1425 self.assertEqual(rx[VRRPv3].vrid, vr_id)
1426 self.assertEqual(rx[VRRPv3].priority, prio)
1427 self.assertEqual(rx[VRRPv3].ipcount, 1)
1428 self.assertEqual(rx[VRRPv3].addrlist, [vip])
1431 if __name__ == '__main__':
1432 unittest.main(testRunner=VppTestRunner)