tests: Add support for getting corefile patterns on FreeBSD
[vpp.git] / test / test_vrrp.py
1 #!/usr/bin/env python3
2
3 #
4 # Copyright 2019-2020 Rubicon Communications, LLC (Netgate)
5 #
6 # SPDX-License-Identifier: Apache-2.0
7 #
8
9 import unittest
10 import time
11 import socket
12 from socket import inet_pton, inet_ntop
13
14 from vpp_object import VppObject
15
16 from scapy.packet import raw
17 from scapy.layers.l2 import Ether, ARP
18 from scapy.layers.inet import IP, ICMP, icmptypes
19 from scapy.layers.inet6 import (
20     IPv6,
21     ipv6nh,
22     IPv6ExtHdrHopByHop,
23     ICMPv6MLReport2,
24     ICMPv6ND_NA,
25     ICMPv6ND_NS,
26     ICMPv6NDOptDstLLAddr,
27     ICMPv6NDOptSrcLLAddr,
28     ICMPv6EchoRequest,
29     ICMPv6EchoReply,
30 )
31 from scapy.contrib.igmpv3 import IGMPv3, IGMPv3mr
32 from scapy.layers.vrrp import IPPROTO_VRRP, VRRPv3
33 from scapy.utils6 import in6_getnsma, in6_getnsmac
34 from config import config
35 from framework import VppTestCase
36 from asfframework import VppTestRunner
37 from util import ip6_normalize
38
39 VRRP_VR_FLAG_PREEMPT = 1
40 VRRP_VR_FLAG_ACCEPT = 2
41 VRRP_VR_FLAG_UNICAST = 4
42 VRRP_VR_FLAG_IPV6 = 8
43
44 VRRP_VR_STATE_INIT = 0
45 VRRP_VR_STATE_BACKUP = 1
46 VRRP_VR_STATE_MASTER = 2
47 VRRP_VR_STATE_INTF_DOWN = 3
48
49 VRRP_INDEX_INVALID = 0xFFFFFFFF
50
51
52 def is_non_arp(p):
53     """Want to filter out advertisements, igmp, etc"""
54     if p.haslayer(ARP):
55         return False
56
57     return True
58
59
60 def is_not_adv(p):
61     """Filter out everything but advertisements. E.g. multicast RD/ND"""
62     if p.haslayer(VRRPv3):
63         return False
64
65     return True
66
67
68 def is_not_echo_reply(p):
69     """filter out advertisements and other while waiting for echo reply"""
70     if p.haslayer(IP) and p.haslayer(ICMP):
71         if icmptypes[p[ICMP].type] == "echo-reply":
72             return False
73     elif p.haslayer(IPv6) and p.haslayer(ICMPv6EchoReply):
74         return False
75
76     return True
77
78
79 class VppVRRPVirtualRouter(VppObject):
80     def __init__(
81         self,
82         test,
83         intf,
84         vr_id,
85         prio=100,
86         intvl=100,
87         flags=VRRP_VR_FLAG_PREEMPT,
88         vips=None,
89     ):
90         self._test = test
91         self._intf = intf
92         self._sw_if_index = self._intf.sw_if_index
93         self._vr_id = vr_id
94         self._prio = prio
95         self._intvl = intvl
96         self._flags = flags
97         if flags & VRRP_VR_FLAG_IPV6:
98             self._is_ipv6 = 1
99             self._adv_dest_mac = "33:33:00:00:00:12"
100             self._virtual_mac = "00:00:5e:00:02:%02x" % vr_id
101             self._adv_dest_ip = "ff02::12"
102             self._vips = [intf.local_ip6] if vips is None else vips
103         else:
104             self._is_ipv6 = 0
105             self._adv_dest_mac = "01:00:5e:00:00:12"
106             self._virtual_mac = "00:00:5e:00:01:%02x" % vr_id
107             self._adv_dest_ip = "224.0.0.18"
108             self._vips = [intf.local_ip4] if vips is None else vips
109         self._tracked_ifs = []
110         self._vrrp_index = VRRP_INDEX_INVALID
111
112     def add_vpp_config(self):
113         self._test.vapi.vrrp_vr_add_del(
114             is_add=1,
115             sw_if_index=self._intf.sw_if_index,
116             vr_id=self._vr_id,
117             priority=self._prio,
118             interval=self._intvl,
119             flags=self._flags,
120             n_addrs=len(self._vips),
121             addrs=self._vips,
122         )
123
124     def update_vpp_config(self):
125         r = self._test.vapi.vrrp_vr_update(
126             vrrp_index=self._vrrp_index,
127             sw_if_index=self._intf.sw_if_index,
128             vr_id=self._vr_id,
129             priority=self._prio,
130             interval=self._intvl,
131             flags=self._flags,
132             n_addrs=len(self._vips),
133             addrs=self._vips,
134         )
135         self._vrrp_index = r.vrrp_index
136
137     def delete_vpp_config(self):
138         self._test.vapi.vrrp_vr_del(vrrp_index=self._vrrp_index)
139
140     def query_vpp_config(self):
141         vrs = self._test.vapi.vrrp_vr_dump(sw_if_index=self._intf.sw_if_index)
142         for vr in vrs:
143             if vr.config.vr_id != self._vr_id:
144                 continue
145
146             is_ipv6 = 1 if (vr.config.flags & VRRP_VR_FLAG_IPV6) else 0
147             if is_ipv6 != self._is_ipv6:
148                 continue
149
150             return vr
151
152         return None
153
154     def remove_vpp_config(self):
155         self._test.vapi.vrrp_vr_add_del(
156             is_add=0,
157             sw_if_index=self._intf.sw_if_index,
158             vr_id=self._vr_id,
159             priority=self._prio,
160             interval=self._intvl,
161             flags=self._flags,
162             n_addrs=len(self._vips),
163             addrs=self._vips,
164         )
165
166     def start_stop(self, is_start):
167         self._test.vapi.vrrp_vr_start_stop(
168             is_start=is_start,
169             sw_if_index=self._intf.sw_if_index,
170             vr_id=self._vr_id,
171             is_ipv6=self._is_ipv6,
172         )
173         self._start_time = time.time() if is_start else None
174
175     def add_del_tracked_interface(self, is_add, sw_if_index, prio):
176         args = {
177             "sw_if_index": self._intf.sw_if_index,
178             "is_ipv6": self._is_ipv6,
179             "vr_id": self._vr_id,
180             "is_add": is_add,
181             "n_ifs": 1,
182             "ifs": [{"sw_if_index": sw_if_index, "priority": prio}],
183         }
184         self._test.vapi.vrrp_vr_track_if_add_del(**args)
185         self._tracked_ifs.append(args["ifs"][0])
186
187     def set_unicast_peers(self, addrs):
188         args = {
189             "sw_if_index": self._intf.sw_if_index,
190             "is_ipv6": self._is_ipv6,
191             "vr_id": self._vr_id,
192             "n_addrs": len(addrs),
193             "addrs": addrs,
194         }
195         self._test.vapi.vrrp_vr_set_peers(**args)
196         self._unicast_peers = addrs
197
198     def start_time(self):
199         return self._start_time
200
201     def virtual_mac(self):
202         return self._virtual_mac
203
204     def virtual_ips(self):
205         return self._vips
206
207     def adv_dest_mac(self):
208         return self._adv_dest_mac
209
210     def adv_dest_ip(self):
211         return self._adv_dest_ip
212
213     def priority(self):
214         return self._prio
215
216     def vr_id(self):
217         return self._vr_id
218
219     def adv_interval(self):
220         return self._intvl
221
222     def interface(self):
223         return self._intf
224
225     def assert_state_equals(self, state):
226         vr_details = self.query_vpp_config()
227         self._test.assertEqual(vr_details.runtime.state, state)
228
229     def master_down_seconds(self):
230         vr_details = self.query_vpp_config()
231         return vr_details.runtime.master_down_int * 0.01
232
233     def vrrp_adv_packet(self, prio=None, src_ip=None):
234         dst_ip = self._adv_dest_ip
235         if prio is None:
236             prio = self._prio
237         eth = Ether(dst=self._adv_dest_mac, src=self._virtual_mac)
238         vrrp = VRRPv3(
239             vrid=self._vr_id, priority=prio, ipcount=len(self._vips), adv=self._intvl
240         )
241         if self._is_ipv6:
242             src_ip = self._intf.local_ip6_ll if src_ip is None else src_ip
243             ip = IPv6(src=src_ip, dst=dst_ip, nh=IPPROTO_VRRP, hlim=255)
244             vrrp.addrlist = self._vips
245         else:
246             src_ip = self._intf.local_ip4 if src_ip is None else src_ip
247             ip = IP(src=src_ip, dst=dst_ip, proto=IPPROTO_VRRP, ttl=255, id=0)
248             vrrp.addrlist = self._vips
249
250         # Fill in default values & checksums
251         pkt = Ether(raw(eth / ip / vrrp))
252         return pkt
253
254
255 class TestVRRP4(VppTestCase):
256     """IPv4 VRRP Test Case"""
257
258     @classmethod
259     def setUpClass(cls):
260         super(TestVRRP4, cls).setUpClass()
261
262     @classmethod
263     def tearDownClass(cls):
264         super(TestVRRP4, cls).tearDownClass()
265
266     def setUp(self):
267         super(TestVRRP4, self).setUp()
268
269         self.create_pg_interfaces(range(2))
270
271         for i in self.pg_interfaces:
272             i.admin_up()
273             i.config_ip4()
274             i.generate_remote_hosts(5)
275             i.configure_ipv4_neighbors()
276
277         self._vrs = []
278         self._default_flags = VRRP_VR_FLAG_PREEMPT
279         self._default_adv = 100
280
281     def tearDown(self):
282         for vr in self._vrs:
283             try:
284                 vr_api = vr.query_vpp_config()
285                 if vr_api.runtime.state != VRRP_VR_STATE_INIT:
286                     vr.start_stop(is_start=0)
287                 vr.remove_vpp_config()
288             except:
289                 self.logger.error("Error cleaning up")
290
291         for i in self.pg_interfaces:
292             i.admin_down()
293             i.unconfig_ip4()
294             i.unconfig_ip6()
295
296         self._vrs = []
297
298         super(TestVRRP4, self).tearDown()
299
300     def verify_vrrp4_igmp(self, pkt):
301         ip = pkt[IP]
302         self.assertEqual(ip.dst, "224.0.0.22")
303         self.assertEqual(ip.proto, 2)
304
305         igmp = pkt[IGMPv3]
306         self.assertEqual(IGMPv3.igmpv3types[igmp.type], "Version 3 Membership Report")
307
308         igmpmr = pkt[IGMPv3mr]
309         self.assertEqual(igmpmr.numgrp, 1)
310         self.assertEqual(igmpmr.records[0].maddr, "224.0.0.18")
311
312     def verify_vrrp4_garp(self, pkt, vip, vmac):
313         arp = pkt[ARP]
314
315         # ARP "who-has" op == 1
316         self.assertEqual(arp.op, 1)
317         self.assertEqual(arp.pdst, arp.psrc)
318         self.assertEqual(arp.pdst, vip)
319         self.assertEqual(arp.hwsrc, vmac)
320
321     def verify_vrrp4_adv(self, rx_pkt, vr, prio=None):
322         vips = vr.virtual_ips()
323         eth = rx_pkt[Ether]
324         ip = rx_pkt[IP]
325         vrrp = rx_pkt[VRRPv3]
326
327         pkt = vr.vrrp_adv_packet(prio=prio)
328
329         # Source MAC is virtual MAC, destination is multicast MAC
330         self.assertEqual(eth.src, vr.virtual_mac())
331         self.assertEqual(eth.dst, vr.adv_dest_mac())
332
333         self.assertEqual(ip.dst, "224.0.0.18")
334         self.assertEqual(ip.ttl, 255)
335         self.assertEqual(ip.proto, IPPROTO_VRRP)
336
337         self.assertEqual(vrrp.version, 3)
338         self.assertEqual(vrrp.type, 1)
339         self.assertEqual(vrrp.vrid, vr.vr_id())
340         if prio is None:
341             prio = vr.priority()
342         self.assertEqual(vrrp.priority, prio)
343         self.assertEqual(vrrp.ipcount, len(vips))
344         self.assertEqual(vrrp.adv, vr.adv_interval())
345         self.assertListEqual(vrrp.addrlist, vips)
346
347     # VR with priority 255 owns the virtual address and should
348     # become master and start advertising immediately.
349     @unittest.skipUnless(config.extended, "part of extended tests")
350     def test_vrrp4_master_adv(self):
351         """IPv4 Master VR advertises"""
352         self.pg_enable_capture(self.pg_interfaces)
353         self.pg_start()
354
355         prio = 255
356         intvl = self._default_adv
357         vr = VppVRRPVirtualRouter(
358             self, self.pg0, 100, prio=prio, intvl=intvl, flags=self._default_flags
359         )
360
361         vr.add_vpp_config()
362         vr.start_stop(is_start=1)
363         self.logger.info(self.vapi.cli("show vrrp vr"))
364         vr.start_stop(is_start=0)
365         self.logger.info(self.vapi.cli("show vrrp vr"))
366
367         pkts = self.pg0.get_capture(4)
368
369         # Init -> Master: IGMP Join, VRRP adv, gratuitous ARP are sent
370         self.verify_vrrp4_igmp(pkts[0])
371         self.verify_vrrp4_adv(pkts[1], vr, prio=prio)
372         self.verify_vrrp4_garp(pkts[2], vr.virtual_ips()[0], vr.virtual_mac())
373         # Master -> Init: Adv with priority 0 sent to force an election
374         self.verify_vrrp4_adv(pkts[3], vr, prio=0)
375
376         vr.remove_vpp_config()
377         self._vrs = []
378
379     # Same as above but with the update API, and add a change
380     # of parameters to test that too
381     @unittest.skipUnless(config.extended, "part of extended tests")
382     def test_vrrp4_master_adv_update(self):
383         """IPv4 Master VR adv + Update to Backup"""
384         self.pg_enable_capture(self.pg_interfaces)
385         self.pg_start()
386
387         prio = 255
388         intvl = self._default_adv
389         vr = VppVRRPVirtualRouter(
390             self, self.pg0, 100, prio=prio, intvl=intvl, flags=self._default_flags
391         )
392
393         vr.update_vpp_config()
394         vr.start_stop(is_start=1)
395         self.logger.info(self.vapi.cli("show vrrp vr"))
396         # Update VR with lower prio and larger interval
397         # we need to keep old VR for the adv checks
398         upd_vr = VppVRRPVirtualRouter(
399             self,
400             self.pg0,
401             100,
402             prio=100,
403             intvl=2 * intvl,
404             flags=self._default_flags,
405             vips=[self.pg0.remote_ip4],
406         )
407         upd_vr._vrrp_index = vr._vrrp_index
408         upd_vr.update_vpp_config()
409         start_time = time.time()
410         self.logger.info(self.vapi.cli("show vrrp vr"))
411         upd_vr.assert_state_equals(VRRP_VR_STATE_BACKUP)
412         self._vrs = [upd_vr]
413
414         pkts = self.pg0.get_capture(5)
415         # Init -> Master: IGMP Join, VRRP adv, gratuitous ARP are sent
416         self.verify_vrrp4_igmp(pkts[0])
417         self.verify_vrrp4_adv(pkts[1], vr, prio=prio)
418         self.verify_vrrp4_garp(pkts[2], vr.virtual_ips()[0], vr.virtual_mac())
419         # Master -> Init: Adv with priority 0 sent to force an election
420         self.verify_vrrp4_adv(pkts[3], vr, prio=0)
421         # Init -> Backup: An IGMP join should be sent
422         self.verify_vrrp4_igmp(pkts[4])
423
424         # send higher prio advertisements, should not receive any
425         end_time = start_time + 2 * upd_vr.master_down_seconds()
426         src_ip = self.pg0.remote_ip4
427         pkts = [upd_vr.vrrp_adv_packet(prio=110, src_ip=src_ip)]
428         while time.time() < end_time:
429             self.send_and_assert_no_replies(self.pg0, pkts, timeout=intvl * 0.01)
430             self.logger.info(self.vapi.cli("show trace"))
431
432         upd_vr.start_stop(is_start=0)
433         self.logger.info(self.vapi.cli("show vrrp vr"))
434
435     # VR with priority < 255 enters backup state and does not advertise as
436     # long as it receives higher priority advertisements
437     @unittest.skipUnless(config.extended, "part of extended tests")
438     def test_vrrp4_backup_noadv(self):
439         """IPv4 Backup VR does not advertise"""
440         self.pg_enable_capture(self.pg_interfaces)
441         self.pg_start()
442
443         vr_id = 100
444         prio = 100
445         intvl = self._default_adv
446         intvl_s = intvl * 0.01
447         vr = VppVRRPVirtualRouter(
448             self,
449             self.pg0,
450             vr_id,
451             prio=prio,
452             intvl=intvl,
453             flags=self._default_flags,
454             vips=[self.pg0.remote_ip4],
455         )
456         self._vrs.append(vr)
457         vr.add_vpp_config()
458
459         vr.start_stop(is_start=1)
460
461         vr.assert_state_equals(VRRP_VR_STATE_BACKUP)
462         # watch for advertisements for 2x the master down preemption timeout
463         end_time = vr.start_time() + 2 * vr.master_down_seconds()
464
465         # Init -> Backup: An IGMP join should be sent
466         pkts = self.pg0.get_capture(1)
467         self.verify_vrrp4_igmp(pkts[0])
468
469         # send higher prio advertisements, should not receive any
470         src_ip = self.pg0.remote_ip4
471         pkts = [vr.vrrp_adv_packet(prio=prio + 10, src_ip=src_ip)]
472         while time.time() < end_time:
473             self.send_and_assert_no_replies(self.pg0, pkts, timeout=intvl_s)
474             self.logger.info(self.vapi.cli("show trace"))
475
476         vr.start_stop(is_start=0)
477         self.logger.info(self.vapi.cli("show vrrp vr"))
478         vr.remove_vpp_config()
479         self._vrs = []
480
481     def test_vrrp4_master_arp(self):
482         """IPv4 Master VR replies to ARP"""
483         self.pg_start()
484
485         # VR virtual IP is the default, which is the pg local IP
486         vr_id = 100
487         prio = 255
488         intvl = self._default_adv
489         vr = VppVRRPVirtualRouter(
490             self, self.pg0, 100, prio=prio, intvl=intvl, flags=self._default_flags
491         )
492         self._vrs.append(vr)
493
494         vr.add_vpp_config()
495
496         # before the VR is up, ARP should resolve to interface MAC
497         self.pg0.resolve_arp()
498         self.assertNotEqual(self.pg0.local_mac, vr.virtual_mac())
499
500         # start the VR, ARP should now resolve to virtual MAC
501         vr.start_stop(is_start=1)
502         self.pg0.resolve_arp()
503         self.assertEqual(self.pg0.local_mac, vr.virtual_mac())
504
505         # stop the VR, ARP should resolve to interface MAC again
506         vr.start_stop(is_start=0)
507         self.pg0.resolve_arp()
508         self.assertNotEqual(self.pg0.local_mac, vr.virtual_mac())
509
510         vr.remove_vpp_config()
511         self._vrs = []
512
513     @unittest.skipUnless(config.extended, "part of extended tests")
514     def test_vrrp4_backup_noarp(self):
515         """IPv4 Backup VR ignores ARP"""
516         # We need an address for a virtual IP that is not the IP that
517         # ARP requests will originate from
518
519         vr_id = 100
520         prio = 100
521         intvl = self._default_adv
522         vip = self.pg0.remote_hosts[1].ip4
523         vr = VppVRRPVirtualRouter(
524             self,
525             self.pg0,
526             vr_id,
527             prio=prio,
528             intvl=intvl,
529             flags=self._default_flags,
530             vips=[vip],
531         )
532         self._vrs.append(vr)
533         vr.add_vpp_config()
534
535         arp_req = Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg0.remote_mac) / ARP(
536             op=ARP.who_has,
537             pdst=vip,
538             psrc=self.pg0.remote_ip4,
539             hwsrc=self.pg0.remote_mac,
540         )
541
542         # Before the VR is started make sure no reply to request for VIP
543         self.pg_start()
544         self.pg_enable_capture(self.pg_interfaces)
545         self.send_and_assert_no_replies(self.pg0, [arp_req], timeout=1)
546
547         # VR should start in backup state and still should not reply to ARP
548         # send a higher priority adv to make sure it does not become master
549         adv = vr.vrrp_adv_packet(prio=prio + 10, src_ip=self.pg0.remote_ip4)
550         vr.start_stop(is_start=1)
551         self.send_and_assert_no_replies(self.pg0, [adv, arp_req], timeout=1)
552
553         vr.start_stop(is_start=0)
554         vr.remove_vpp_config()
555         self._vrs = []
556
557     @unittest.skipUnless(config.extended, "part of extended tests")
558     def test_vrrp4_election(self):
559         """IPv4 Backup VR becomes master if no advertisements received"""
560
561         vr_id = 100
562         prio = 100
563         intvl = self._default_adv
564         intvl_s = intvl * 0.01
565         vip = self.pg0.remote_ip4
566         vr = VppVRRPVirtualRouter(
567             self,
568             self.pg0,
569             vr_id,
570             prio=prio,
571             intvl=intvl,
572             flags=self._default_flags,
573             vips=[vip],
574         )
575         self._vrs.append(vr)
576         vr.add_vpp_config()
577
578         # After adding the VR, it should be in the init state
579         vr.assert_state_equals(VRRP_VR_STATE_INIT)
580
581         self.pg_start()
582         vr.start_stop(is_start=1)
583
584         # VR should be in backup state after starting
585         vr.assert_state_equals(VRRP_VR_STATE_BACKUP)
586         end_time = vr.start_time() + vr.master_down_seconds()
587
588         # should not receive adverts until timer expires & state transition
589         self.pg_enable_capture(self.pg_interfaces)
590         while (time.time() + intvl_s) < end_time:
591             time.sleep(intvl_s)
592             self.pg0.assert_nothing_captured(filter_out_fn=is_not_adv)
593
594         # VR should be in master state, should send an adv
595         self.pg0.enable_capture()
596         self.pg0.wait_for_packet(intvl_s, is_not_adv)
597         vr.assert_state_equals(VRRP_VR_STATE_MASTER)
598
599     @unittest.skipUnless(config.extended, "part of extended tests")
600     def test_vrrp4_backup_preempts(self):
601         """IPv4 Backup VR preempts lower priority master"""
602
603         vr_id = 100
604         prio = 100
605         intvl = self._default_adv
606         intvl_s = intvl * 0.01
607         vip = self.pg0.remote_ip4
608         vr = VppVRRPVirtualRouter(
609             self,
610             self.pg0,
611             vr_id,
612             prio=prio,
613             intvl=intvl,
614             flags=self._default_flags,
615             vips=[vip],
616         )
617         self._vrs.append(vr)
618         vr.add_vpp_config()
619
620         # After adding the VR, it should be in the init state
621         vr.assert_state_equals(VRRP_VR_STATE_INIT)
622
623         self.pg_start()
624         vr.start_stop(is_start=1)
625
626         # VR should be in backup state after starting
627         vr.assert_state_equals(VRRP_VR_STATE_BACKUP)
628         end_time = vr.start_time() + vr.master_down_seconds()
629
630         # send lower prio advertisements until timer expires
631         src_ip = self.pg0.remote_ip4
632         pkts = [vr.vrrp_adv_packet(prio=prio - 10, src_ip=src_ip)]
633         while time.time() + intvl_s < end_time:
634             self.send_and_assert_no_replies(self.pg0, pkts, timeout=intvl_s)
635             self.logger.info(self.vapi.cli("show trace"))
636
637         # when timer expires, VR should take over as master
638         self.pg0.enable_capture()
639         self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv)
640         vr.assert_state_equals(VRRP_VR_STATE_MASTER)
641
642     @unittest.skipUnless(config.extended, "part of extended tests")
643     def test_vrrp4_master_preempted(self):
644         """IPv4 Master VR preempted by higher priority backup"""
645
646         # A prio 255 VR cannot be preempted so the prio has to be lower and
647         # we have to wait for it to take over
648         vr_id = 100
649         prio = 100
650         intvl = self._default_adv
651         vip = self.pg0.remote_ip4
652         vr = VppVRRPVirtualRouter(
653             self,
654             self.pg0,
655             vr_id,
656             prio=prio,
657             intvl=intvl,
658             flags=self._default_flags,
659             vips=[vip],
660         )
661         self._vrs.append(vr)
662         vr.add_vpp_config()
663
664         # After adding the VR, it should be in the init state
665         vr.assert_state_equals(VRRP_VR_STATE_INIT)
666
667         # start VR
668         vr.start_stop(is_start=1)
669         vr.assert_state_equals(VRRP_VR_STATE_BACKUP)
670
671         # wait for VR to take over as master
672         end_time = vr.start_time() + vr.master_down_seconds()
673         sleep_s = end_time - time.time()
674         time.sleep(sleep_s)
675         vr.assert_state_equals(VRRP_VR_STATE_MASTER)
676
677         # Build advertisement packet and send it
678         pkts = [vr.vrrp_adv_packet(prio=255, src_ip=self.pg0.remote_ip4)]
679         self.pg_send(self.pg0, pkts)
680
681         # VR should be in backup state again
682         vr.assert_state_equals(VRRP_VR_STATE_BACKUP)
683
684     @unittest.skipUnless(config.extended, "part of extended tests")
685     def test_vrrp4_accept_mode_disabled(self):
686         """IPv4 Master VR does not reply for VIP w/ accept mode off"""
687
688         # accept mode only matters when prio < 255, so it will have to
689         # come up as a backup and take over as master after the timeout
690         vr_id = 100
691         prio = 100
692         intvl = self._default_adv
693         vip = self.pg0.remote_hosts[4].ip4
694         vr = VppVRRPVirtualRouter(
695             self,
696             self.pg0,
697             vr_id,
698             prio=prio,
699             intvl=intvl,
700             flags=self._default_flags,
701             vips=[vip],
702         )
703         self._vrs.append(vr)
704         vr.add_vpp_config()
705
706         # After adding the VR, it should be in the init state
707         vr.assert_state_equals(VRRP_VR_STATE_INIT)
708
709         # start VR
710         vr.start_stop(is_start=1)
711         vr.assert_state_equals(VRRP_VR_STATE_BACKUP)
712
713         # wait for VR to take over as master
714         end_time = vr.start_time() + vr.master_down_seconds()
715         sleep_s = end_time - time.time()
716         time.sleep(sleep_s)
717         vr.assert_state_equals(VRRP_VR_STATE_MASTER)
718
719         # send an ICMP echo to the VR virtual IP address
720         echo = (
721             Ether(dst=vr.virtual_mac(), src=self.pg0.remote_mac)
722             / IP(dst=vip, src=self.pg0.remote_ip4)
723             / ICMP(seq=1, id=self.pg0.sw_if_index, type="echo-request")
724         )
725         self.pg_send(self.pg0, [echo])
726
727         # wait for an echo reply. none should be received
728         time.sleep(1)
729         self.pg0.assert_nothing_captured(filter_out_fn=is_not_echo_reply)
730
731     @unittest.skipUnless(config.extended, "part of extended tests")
732     def test_vrrp4_accept_mode_enabled(self):
733         """IPv4 Master VR replies for VIP w/ accept mode on"""
734
735         # A prio 255 VR cannot be preempted so the prio has to be lower and
736         # we have to wait for it to take over
737         vr_id = 100
738         prio = 100
739         intvl = self._default_adv
740         vip = self.pg0.remote_hosts[4].ip4
741         flags = VRRP_VR_FLAG_PREEMPT | VRRP_VR_FLAG_ACCEPT
742         vr = VppVRRPVirtualRouter(
743             self, self.pg0, vr_id, prio=prio, intvl=intvl, flags=flags, vips=[vip]
744         )
745         self._vrs.append(vr)
746         vr.add_vpp_config()
747
748         # After adding the VR, it should be in the init state
749         vr.assert_state_equals(VRRP_VR_STATE_INIT)
750
751         # start VR
752         vr.start_stop(is_start=1)
753         vr.assert_state_equals(VRRP_VR_STATE_BACKUP)
754
755         # wait for VR to take over as master
756         end_time = vr.start_time() + vr.master_down_seconds()
757         sleep_s = end_time - time.time()
758         time.sleep(sleep_s)
759         vr.assert_state_equals(VRRP_VR_STATE_MASTER)
760
761         # send an ICMP echo to the VR virtual IP address
762         echo = (
763             Ether(dst=vr.virtual_mac(), src=self.pg0.remote_mac)
764             / IP(dst=vip, src=self.pg0.remote_ip4)
765             / ICMP(seq=1, id=self.pg0.sw_if_index, type="echo-request")
766         )
767         self.pg_send(self.pg0, [echo])
768
769         # wait for an echo reply.
770         time.sleep(1)
771         rx_pkts = self.pg0.get_capture(
772             expected_count=1, timeout=1, filter_out_fn=is_not_echo_reply
773         )
774
775         self.assertEqual(rx_pkts[0][IP].src, vip)
776         self.assertEqual(rx_pkts[0][IP].dst, self.pg0.remote_ip4)
777         self.assertEqual(icmptypes[rx_pkts[0][ICMP].type], "echo-reply")
778         self.assertEqual(rx_pkts[0][ICMP].seq, 1)
779         self.assertEqual(rx_pkts[0][ICMP].id, self.pg0.sw_if_index)
780
781     @unittest.skipUnless(config.extended, "part of extended tests")
782     def test_vrrp4_intf_tracking(self):
783         """IPv4 Master VR adjusts priority based on tracked interface"""
784
785         vr_id = 100
786         prio = 255
787         intvl = self._default_adv
788         intvl_s = intvl * 0.01
789         vip = self.pg0.local_ip4
790         vr = VppVRRPVirtualRouter(
791             self,
792             self.pg0,
793             vr_id,
794             prio=prio,
795             intvl=intvl,
796             flags=self._default_flags,
797             vips=[vip],
798         )
799         self._vrs.append(vr)
800         vr.add_vpp_config()
801
802         # After adding the VR, it should be in the init state
803         vr.assert_state_equals(VRRP_VR_STATE_INIT)
804
805         # add pg1 as a tracked interface and start the VR
806         adjustment = 50
807         adjusted_prio = prio - adjustment
808         vr.add_del_tracked_interface(
809             is_add=1, sw_if_index=self.pg1.sw_if_index, prio=adjustment
810         )
811         vr.start_stop(is_start=1)
812         vr.assert_state_equals(VRRP_VR_STATE_MASTER)
813
814         adv_configured = vr.vrrp_adv_packet(prio=prio)
815         adv_adjusted = vr.vrrp_adv_packet(prio=adjusted_prio)
816
817         # tracked intf is up ->  advertised priority == configured priority
818         self.pg0.enable_capture()
819         rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv)
820         self.assertEqual(rx, adv_configured)
821
822         # take down pg1, verify priority is now being adjusted
823         self.pg1.admin_down()
824         self.pg0.enable_capture()
825         rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv)
826         self.assertEqual(rx, adv_adjusted)
827
828         # bring up pg1, verify priority now matches configured value
829         self.pg1.admin_up()
830         self.pg0.enable_capture()
831         rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv)
832         self.assertEqual(rx, adv_configured)
833
834         # remove IP address from pg1, verify priority now being adjusted
835         self.pg1.unconfig_ip4()
836         self.pg0.enable_capture()
837         rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv)
838         self.assertEqual(rx, adv_adjusted)
839
840         # add IP address to pg1, verify priority now matches configured value
841         self.pg1.config_ip4()
842         self.pg0.enable_capture()
843         rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv)
844         self.assertEqual(rx, adv_configured)
845
846     @unittest.skipUnless(config.extended, "part of extended tests")
847     def test_vrrp4_master_adv_unicast(self):
848         """IPv4 Master VR advertises (unicast)"""
849
850         vr_id = 100
851         prio = 255
852         intvl = self._default_adv
853         intvl_s = intvl * 0.01
854         vip = self.pg0.local_ip4
855         flags = self._default_flags | VRRP_VR_FLAG_UNICAST
856         unicast_peer = self.pg0.remote_hosts[4]
857         vr = VppVRRPVirtualRouter(
858             self, self.pg0, vr_id, prio=prio, intvl=intvl, flags=flags, vips=[vip]
859         )
860         self._vrs.append(vr)
861         vr.add_vpp_config()
862         vr.set_unicast_peers([unicast_peer.ip4])
863
864         # After adding the VR, it should be in the init state
865         vr.assert_state_equals(VRRP_VR_STATE_INIT)
866
867         # Start VR, transition to master
868         vr.start_stop(is_start=1)
869         vr.assert_state_equals(VRRP_VR_STATE_MASTER)
870
871         self.pg0.enable_capture()
872         rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv)
873
874         self.assertTrue(rx.haslayer(Ether))
875         self.assertTrue(rx.haslayer(IP))
876         self.assertTrue(rx.haslayer(VRRPv3))
877         self.assertEqual(rx[Ether].src, self.pg0.local_mac)
878         self.assertEqual(rx[Ether].dst, unicast_peer.mac)
879         self.assertEqual(rx[IP].src, self.pg0.local_ip4)
880         self.assertEqual(rx[IP].dst, unicast_peer.ip4)
881         self.assertEqual(rx[VRRPv3].vrid, vr_id)
882         self.assertEqual(rx[VRRPv3].priority, prio)
883         self.assertEqual(rx[VRRPv3].ipcount, 1)
884         self.assertEqual(rx[VRRPv3].addrlist, [vip])
885
886
887 class TestVRRP6(VppTestCase):
888     """IPv6 VRRP Test Case"""
889
890     @classmethod
891     def setUpClass(cls):
892         super(TestVRRP6, cls).setUpClass()
893
894     @classmethod
895     def tearDownClass(cls):
896         super(TestVRRP6, cls).tearDownClass()
897
898     def setUp(self):
899         super(TestVRRP6, self).setUp()
900
901         self.create_pg_interfaces(range(2))
902
903         for i in self.pg_interfaces:
904             i.admin_up()
905             i.config_ip6()
906             i.generate_remote_hosts(5)
907             i.configure_ipv6_neighbors()
908
909         self._vrs = []
910         self._default_flags = VRRP_VR_FLAG_IPV6 | VRRP_VR_FLAG_PREEMPT
911         self._default_adv = 100
912
913     def tearDown(self):
914         for vr in self._vrs:
915             try:
916                 vr_api = vr.query_vpp_config()
917                 if vr_api.runtime.state != VRRP_VR_STATE_INIT:
918                     vr.start_stop(is_start=0)
919                 vr.remove_vpp_config()
920             except:
921                 self.logger.error("Error cleaning up")
922
923         for i in self.pg_interfaces:
924             i.admin_down()
925             i.unconfig_ip4()
926             i.unconfig_ip6()
927
928         self._vrs = []
929
930         super(TestVRRP6, self).tearDown()
931
932     def verify_vrrp6_mlr(self, pkt, vr):
933         ip6 = pkt[IPv6]
934         self.assertEqual(ip6.dst, "ff02::16")
935         self.assertEqual(ipv6nh[ip6.nh], "Hop-by-Hop Option Header")
936
937         hbh = pkt[IPv6ExtHdrHopByHop]
938         self.assertEqual(ipv6nh[hbh.nh], "ICMPv6")
939
940         self.assertTrue(pkt.haslayer(ICMPv6MLReport2))
941         mlr = pkt[ICMPv6MLReport2]
942         # should contain mc addr records for:
943         # - VRRPv3 multicast addr
944         # - solicited node mc addr record for each VR virtual IPv6 address
945         vips = vr.virtual_ips()
946         self.assertEqual(mlr.records_number, len(vips) + 1)
947         self.assertEqual(mlr.records[0].dst, vr.adv_dest_ip())
948
949     def verify_vrrp6_adv(self, rx_pkt, vr, prio=None):
950         self.assertTrue(rx_pkt.haslayer(Ether))
951         self.assertTrue(rx_pkt.haslayer(IPv6))
952         self.assertTrue(rx_pkt.haslayer(VRRPv3))
953
954         # generate a packet for this VR and compare it to the one received
955         pkt = vr.vrrp_adv_packet(prio=prio)
956         self.assertTrue(rx_pkt.haslayer(Ether))
957         self.assertTrue(rx_pkt.haslayer(IPv6))
958         self.assertTrue(rx_pkt.haslayer(VRRPv3))
959
960         self.assertEqual(pkt, rx_pkt)
961
962     def verify_vrrp6_gna(self, pkt, vr):
963         self.assertTrue(pkt.haslayer(Ether))
964         self.assertTrue(pkt.haslayer(IPv6))
965         self.assertTrue(pkt.haslayer(ICMPv6ND_NA))
966         self.assertTrue(pkt.haslayer(ICMPv6NDOptDstLLAddr))
967
968         self.assertEqual(pkt[Ether].dst, "33:33:00:00:00:01")
969
970         self.assertEqual(pkt[IPv6].dst, "ff02::1")
971         # convert addrs to packed format since string versions could differ
972         src_addr = inet_pton(socket.AF_INET6, pkt[IPv6].src)
973         vr_ll_addr = inet_pton(socket.AF_INET6, vr.interface().local_ip6_ll)
974         self.assertEqual(src_addr, vr_ll_addr)
975
976         self.assertTrue(pkt[ICMPv6ND_NA].tgt in vr.virtual_ips())
977         self.assertEqual(pkt[ICMPv6NDOptDstLLAddr].lladdr, vr.virtual_mac())
978
979     # VR with priority 255 owns the virtual address and should
980     # become master and start advertising immediately.
981     @unittest.skipUnless(config.extended, "part of extended tests")
982     def test_vrrp6_master_adv(self):
983         """IPv6 Master VR advertises"""
984         self.pg_enable_capture(self.pg_interfaces)
985         self.pg_start()
986
987         prio = 255
988         intvl = self._default_adv
989         vr = VppVRRPVirtualRouter(
990             self, self.pg0, 100, prio=prio, intvl=intvl, flags=self._default_flags
991         )
992         self._vrs.append(vr)
993
994         vr.add_vpp_config()
995         self.logger.info(self.vapi.cli("show vrrp vr"))
996         vr.start_stop(is_start=1)
997         self.logger.info(self.vapi.cli("show vrrp vr"))
998         vr.start_stop(is_start=0)
999         self.logger.info(self.vapi.cli("show vrrp vr"))
1000
1001         pkts = self.pg0.get_capture(4, filter_out_fn=None)
1002
1003         # Init -> Master: Multicast group Join, VRRP adv, gratuitous NAs sent
1004         self.verify_vrrp6_mlr(pkts[0], vr)
1005         self.verify_vrrp6_adv(pkts[1], vr, prio=prio)
1006         self.verify_vrrp6_gna(pkts[2], vr)
1007         # Master -> Init: Adv with priority 0 sent to force an election
1008         self.verify_vrrp6_adv(pkts[3], vr, prio=0)
1009
1010         vr.remove_vpp_config()
1011         self._vrs = []
1012
1013     # Same as above but with the update API, and add a change
1014     # of parameters to test that too
1015     @unittest.skipUnless(config.extended, "part of extended tests")
1016     def test_vrrp6_master_adv_update(self):
1017         """IPv6 Master VR adv + Update to Backup"""
1018         self.pg_enable_capture(self.pg_interfaces)
1019         self.pg_start()
1020
1021         prio = 255
1022         intvl = self._default_adv
1023         vr = VppVRRPVirtualRouter(
1024             self, self.pg0, 100, prio=prio, intvl=intvl, flags=self._default_flags
1025         )
1026
1027         vr.update_vpp_config()
1028         vr.start_stop(is_start=1)
1029         self.logger.info(self.vapi.cli("show vrrp vr"))
1030         # Update VR with lower prio and larger interval
1031         # we need to keep old VR for the adv checks
1032         upd_vr = VppVRRPVirtualRouter(
1033             self,
1034             self.pg0,
1035             100,
1036             prio=100,
1037             intvl=2 * intvl,
1038             flags=self._default_flags,
1039             vips=[self.pg0.remote_ip6],
1040         )
1041         upd_vr._vrrp_index = vr._vrrp_index
1042         upd_vr.update_vpp_config()
1043         start_time = time.time()
1044         self.logger.info(self.vapi.cli("show vrrp vr"))
1045         upd_vr.assert_state_equals(VRRP_VR_STATE_BACKUP)
1046         self._vrs = [upd_vr]
1047
1048         pkts = self.pg0.get_capture(5, filter_out_fn=None)
1049
1050         # Init -> Master: Multicast group Join, VRRP adv, gratuitous NAs sent
1051         self.verify_vrrp6_mlr(pkts[0], vr)
1052         self.verify_vrrp6_adv(pkts[1], vr, prio=prio)
1053         self.verify_vrrp6_gna(pkts[2], vr)
1054         # Master -> Init: Adv with priority 0 sent to force an election
1055         self.verify_vrrp6_adv(pkts[3], vr, prio=0)
1056         # Init -> Backup: A multicast listener report should be sent
1057         # not actually verified in the test below, where I took this from
1058
1059         # send higher prio advertisements, should not see VPP send any
1060         src_ip = self.pg0.remote_ip6_ll
1061         pkts = [upd_vr.vrrp_adv_packet(prio=110, src_ip=src_ip)]
1062         self.logger.info(self.vapi.cli("show vlib graph"))
1063         end_time = start_time + 2 * upd_vr.master_down_seconds()
1064         while time.time() < end_time:
1065             self.send_and_assert_no_replies(
1066                 self.pg0, pkts, timeout=0.01 * upd_vr._intvl
1067             )
1068             self.logger.info(self.vapi.cli("show trace"))
1069
1070         vr.start_stop(is_start=0)
1071         self.logger.info(self.vapi.cli("show vrrp vr"))
1072
1073     # VR with priority < 255 enters backup state and does not advertise as
1074     # long as it receives higher priority advertisements
1075     @unittest.skipUnless(config.extended, "part of extended tests")
1076     def test_vrrp6_backup_noadv(self):
1077         """IPv6 Backup VR does not advertise"""
1078         self.pg_enable_capture(self.pg_interfaces)
1079         self.pg_start()
1080
1081         vr_id = 100
1082         prio = 100
1083         intvl = self._default_adv
1084         intvl_s = intvl * 0.01
1085         vr = VppVRRPVirtualRouter(
1086             self,
1087             self.pg0,
1088             vr_id,
1089             prio=prio,
1090             intvl=intvl,
1091             flags=self._default_flags,
1092             vips=[self.pg0.remote_ip6],
1093         )
1094         vr.add_vpp_config()
1095         self._vrs.append(vr)
1096
1097         vr.start_stop(is_start=1)
1098
1099         vr.assert_state_equals(VRRP_VR_STATE_BACKUP)
1100         # watch for advertisements for 2x the master down preemption timeout
1101         end_time = vr.start_time() + 2 * vr.master_down_seconds()
1102
1103         # Init -> Backup: A multicast listener report should be sent
1104         pkts = self.pg0.get_capture(1, filter_out_fn=None)
1105
1106         # send higher prio advertisements, should not see VPP send any
1107         src_ip = self.pg0.remote_ip6_ll
1108         num_advs = 5
1109         pkts = [vr.vrrp_adv_packet(prio=prio + 10, src_ip=src_ip)]
1110         self.logger.info(self.vapi.cli("show vlib graph"))
1111         while time.time() < end_time:
1112             self.send_and_assert_no_replies(self.pg0, pkts, timeout=intvl_s)
1113             self.logger.info(self.vapi.cli("show trace"))
1114             num_advs -= 1
1115
1116         vr.start_stop(is_start=0)
1117         self.logger.info(self.vapi.cli("show vrrp vr"))
1118         vr.remove_vpp_config()
1119         self._vrs = []
1120
1121     def test_vrrp6_master_nd(self):
1122         """IPv6 Master VR replies to NDP"""
1123         self.pg_start()
1124
1125         # VR virtual IP is the default, which is the pg local IP
1126         vr_id = 100
1127         prio = 255
1128         intvl = self._default_adv
1129         vr = VppVRRPVirtualRouter(
1130             self, self.pg0, 100, prio=prio, intvl=intvl, flags=self._default_flags
1131         )
1132         vr.add_vpp_config()
1133         self._vrs.append(vr)
1134
1135         # before the VR is up, NDP should resolve to interface MAC
1136         self.pg0.resolve_ndp()
1137         self.assertNotEqual(self.pg0.local_mac, vr.virtual_mac())
1138
1139         # start the VR, NDP should now resolve to virtual MAC
1140         vr.start_stop(is_start=1)
1141         self.pg0.resolve_ndp()
1142         self.assertEqual(self.pg0.local_mac, vr.virtual_mac())
1143
1144         # stop the VR, ARP should resolve to interface MAC again
1145         vr.start_stop(is_start=0)
1146         self.pg0.resolve_ndp()
1147         self.assertNotEqual(self.pg0.local_mac, vr.virtual_mac())
1148
1149         vr.remove_vpp_config()
1150         self._vrs = []
1151
1152     @unittest.skipUnless(config.extended, "part of extended tests")
1153     def test_vrrp6_backup_nond(self):
1154         """IPv6 Backup VR ignores NDP"""
1155         # We need an address for a virtual IP that is not the IP that
1156         # ARP requests will originate from
1157
1158         vr_id = 100
1159         prio = 100
1160         intvl = self._default_adv
1161         intvl_s = intvl * 0.01
1162         vip = self.pg0.remote_hosts[1].ip6
1163         vr = VppVRRPVirtualRouter(
1164             self,
1165             self.pg0,
1166             vr_id,
1167             prio=prio,
1168             intvl=intvl,
1169             flags=self._default_flags,
1170             vips=[vip],
1171         )
1172         vr.add_vpp_config()
1173         self._vrs.append(vr)
1174
1175         nsma = in6_getnsma(inet_pton(socket.AF_INET6, vip))
1176         dmac = in6_getnsmac(nsma)
1177         dst_ip = inet_ntop(socket.AF_INET6, nsma)
1178
1179         ndp_req = (
1180             Ether(dst=dmac, src=self.pg0.remote_mac)
1181             / IPv6(dst=dst_ip, src=self.pg0.remote_ip6)
1182             / ICMPv6ND_NS(tgt=vip)
1183             / ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac)
1184         )
1185
1186         # Before the VR is started make sure no reply to request for VIP
1187         self.send_and_assert_no_replies(self.pg0, [ndp_req], timeout=1)
1188
1189         # VR should start in backup state and still should not reply to NDP
1190         # send a higher priority adv to make sure it does not become master
1191         adv = vr.vrrp_adv_packet(prio=prio + 10, src_ip=self.pg0.remote_ip6)
1192         pkts = [adv, ndp_req]
1193         vr.start_stop(is_start=1)
1194         self.send_and_assert_no_replies(self.pg0, pkts, timeout=intvl_s)
1195
1196         vr.start_stop(is_start=0)
1197
1198     @unittest.skipUnless(config.extended, "part of extended tests")
1199     def test_vrrp6_election(self):
1200         """IPv6 Backup VR becomes master if no advertisements received"""
1201
1202         vr_id = 100
1203         prio = 100
1204         intvl = self._default_adv
1205         intvl_s = intvl * 0.01
1206         vip = self.pg0.remote_ip6
1207         vr = VppVRRPVirtualRouter(
1208             self,
1209             self.pg0,
1210             vr_id,
1211             prio=prio,
1212             intvl=intvl,
1213             flags=self._default_flags,
1214             vips=[vip],
1215         )
1216         self._vrs.append(vr)
1217         vr.add_vpp_config()
1218
1219         # After adding the VR, it should be in the init state
1220         vr.assert_state_equals(VRRP_VR_STATE_INIT)
1221
1222         self.pg_start()
1223         vr.start_stop(is_start=1)
1224
1225         # VR should be in backup state after starting
1226         vr.assert_state_equals(VRRP_VR_STATE_BACKUP)
1227         end_time = vr.start_time() + vr.master_down_seconds()
1228
1229         # no advertisements should arrive until timer expires
1230         self.pg0.enable_capture()
1231         while (time.time() + intvl_s) < end_time:
1232             time.sleep(intvl_s)
1233             self.pg0.assert_nothing_captured(filter_out_fn=is_not_adv)
1234
1235         # VR should be in master state after timer expires
1236         self.pg0.enable_capture()
1237         self.pg0.wait_for_packet(intvl_s, is_not_adv)
1238         vr.assert_state_equals(VRRP_VR_STATE_MASTER)
1239
1240     @unittest.skipUnless(config.extended, "part of extended tests")
1241     def test_vrrp6_backup_preempts(self):
1242         """IPv6 Backup VR preempts lower priority master"""
1243
1244         vr_id = 100
1245         prio = 100
1246         intvl = self._default_adv
1247         intvl_s = intvl * 0.01
1248         vip = self.pg0.remote_ip6
1249         vr = VppVRRPVirtualRouter(
1250             self,
1251             self.pg0,
1252             vr_id,
1253             prio=prio,
1254             intvl=intvl,
1255             flags=self._default_flags,
1256             vips=[vip],
1257         )
1258         self._vrs.append(vr)
1259         vr.add_vpp_config()
1260
1261         # After adding the VR, it should be in the init state
1262         vr.assert_state_equals(VRRP_VR_STATE_INIT)
1263
1264         self.pg_start()
1265         vr.start_stop(is_start=1)
1266
1267         # VR should be in backup state after starting
1268         vr.assert_state_equals(VRRP_VR_STATE_BACKUP)
1269         end_time = vr.start_time() + vr.master_down_seconds()
1270
1271         # send lower prio advertisements until timer expires
1272         src_ip = self.pg0.remote_ip6
1273         pkts = [vr.vrrp_adv_packet(prio=prio - 10, src_ip=src_ip)]
1274         while (time.time() + intvl_s) < end_time:
1275             self.send_and_assert_no_replies(self.pg0, pkts, timeout=intvl_s)
1276             self.logger.info(self.vapi.cli("show trace"))
1277
1278         # when timer expires, VR should take over as master
1279         self.pg0.enable_capture()
1280         self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv)
1281         vr.assert_state_equals(VRRP_VR_STATE_MASTER)
1282
1283     @unittest.skipUnless(config.extended, "part of extended tests")
1284     def test_vrrp6_master_preempted(self):
1285         """IPv6 Master VR preempted by higher priority backup"""
1286
1287         # A prio 255 VR cannot be preempted so the prio has to be lower and
1288         # we have to wait for it to take over
1289         vr_id = 100
1290         prio = 100
1291         intvl = self._default_adv
1292         vip = self.pg0.remote_ip6
1293         vr = VppVRRPVirtualRouter(
1294             self,
1295             self.pg0,
1296             vr_id,
1297             prio=prio,
1298             intvl=intvl,
1299             flags=self._default_flags,
1300             vips=[vip],
1301         )
1302         self._vrs.append(vr)
1303         vr.add_vpp_config()
1304
1305         # After adding the VR, it should be in the init state
1306         vr.assert_state_equals(VRRP_VR_STATE_INIT)
1307
1308         # start VR
1309         vr.start_stop(is_start=1)
1310         vr.assert_state_equals(VRRP_VR_STATE_BACKUP)
1311
1312         # wait for VR to take over as master
1313         end_time = vr.start_time() + vr.master_down_seconds()
1314         sleep_s = end_time - time.time()
1315         time.sleep(sleep_s)
1316         vr.assert_state_equals(VRRP_VR_STATE_MASTER)
1317
1318         # Build advertisement packet and send it
1319         pkts = [vr.vrrp_adv_packet(prio=255, src_ip=self.pg0.remote_ip6)]
1320         self.pg_send(self.pg0, pkts)
1321
1322         # VR should be in backup state again
1323         vr.assert_state_equals(VRRP_VR_STATE_BACKUP)
1324
1325     @unittest.skipUnless(config.extended, "part of extended tests")
1326     def test_vrrp6_accept_mode_disabled(self):
1327         """IPv6 Master VR does not reply for VIP w/ accept mode off"""
1328
1329         # accept mode only matters when prio < 255, so it will have to
1330         # come up as a backup and take over as master after the timeout
1331         vr_id = 100
1332         prio = 100
1333         intvl = self._default_adv
1334         vip = self.pg0.remote_hosts[4].ip6
1335         vr = VppVRRPVirtualRouter(
1336             self,
1337             self.pg0,
1338             vr_id,
1339             prio=prio,
1340             intvl=intvl,
1341             flags=self._default_flags,
1342             vips=[vip],
1343         )
1344         self._vrs.append(vr)
1345         vr.add_vpp_config()
1346
1347         # After adding the VR, it should be in the init state
1348         vr.assert_state_equals(VRRP_VR_STATE_INIT)
1349
1350         # start VR
1351         vr.start_stop(is_start=1)
1352         vr.assert_state_equals(VRRP_VR_STATE_BACKUP)
1353
1354         # wait for VR to take over as master
1355         end_time = vr.start_time() + vr.master_down_seconds()
1356         sleep_s = end_time - time.time()
1357         time.sleep(sleep_s)
1358         vr.assert_state_equals(VRRP_VR_STATE_MASTER)
1359
1360         # send an ICMPv6 echo to the VR virtual IP address
1361         echo = (
1362             Ether(dst=vr.virtual_mac(), src=self.pg0.remote_mac)
1363             / IPv6(dst=vip, src=self.pg0.remote_ip6)
1364             / ICMPv6EchoRequest(seq=1, id=self.pg0.sw_if_index)
1365         )
1366         self.pg_send(self.pg0, [echo])
1367
1368         # wait for an echo reply. none should be received
1369         time.sleep(1)
1370         self.pg0.assert_nothing_captured(filter_out_fn=is_not_echo_reply)
1371
1372     @unittest.skipUnless(config.extended, "part of extended tests")
1373     def test_vrrp6_accept_mode_enabled(self):
1374         """IPv6 Master VR replies for VIP w/ accept mode on"""
1375
1376         # A prio 255 VR cannot be preempted so the prio has to be lower and
1377         # we have to wait for it to take over
1378         vr_id = 100
1379         prio = 100
1380         intvl = self._default_adv
1381         vip = self.pg0.remote_hosts[4].ip6
1382         flags = self._default_flags | VRRP_VR_FLAG_ACCEPT
1383         vr = VppVRRPVirtualRouter(
1384             self, self.pg0, vr_id, prio=prio, intvl=intvl, flags=flags, vips=[vip]
1385         )
1386         self._vrs.append(vr)
1387         vr.add_vpp_config()
1388
1389         # After adding the VR, it should be in the init state
1390         vr.assert_state_equals(VRRP_VR_STATE_INIT)
1391
1392         # start VR
1393         vr.start_stop(is_start=1)
1394         vr.assert_state_equals(VRRP_VR_STATE_BACKUP)
1395
1396         # wait for VR to take over as master
1397         end_time = vr.start_time() + vr.master_down_seconds()
1398         sleep_s = end_time - time.time()
1399         time.sleep(sleep_s)
1400         vr.assert_state_equals(VRRP_VR_STATE_MASTER)
1401
1402         # send an ICMP echo to the VR virtual IP address
1403         echo = (
1404             Ether(dst=vr.virtual_mac(), src=self.pg0.remote_mac)
1405             / IPv6(dst=vip, src=self.pg0.remote_ip6)
1406             / ICMPv6EchoRequest(seq=1, id=self.pg0.sw_if_index)
1407         )
1408         self.pg_send(self.pg0, [echo])
1409
1410         # wait for an echo reply.
1411         time.sleep(1)
1412         rx_pkts = self.pg0.get_capture(
1413             expected_count=1, timeout=1, filter_out_fn=is_not_echo_reply
1414         )
1415
1416         self.assertEqual(rx_pkts[0][IPv6].src, vip)
1417         self.assertEqual(rx_pkts[0][IPv6].dst, self.pg0.remote_ip6)
1418         self.assertEqual(rx_pkts[0][ICMPv6EchoReply].seq, 1)
1419         self.assertEqual(rx_pkts[0][ICMPv6EchoReply].id, self.pg0.sw_if_index)
1420
1421     @unittest.skipUnless(config.extended, "part of extended tests")
1422     def test_vrrp6_intf_tracking(self):
1423         """IPv6 Master VR adjusts priority based on tracked interface"""
1424
1425         vr_id = 100
1426         prio = 255
1427         intvl = self._default_adv
1428         intvl_s = intvl * 0.01
1429         vip = self.pg0.local_ip6
1430         vr = VppVRRPVirtualRouter(
1431             self,
1432             self.pg0,
1433             vr_id,
1434             prio=prio,
1435             intvl=intvl,
1436             flags=self._default_flags,
1437             vips=[vip],
1438         )
1439         self._vrs.append(vr)
1440         vr.add_vpp_config()
1441
1442         # After adding the VR, it should be in the init state
1443         vr.assert_state_equals(VRRP_VR_STATE_INIT)
1444
1445         # add pg1 as a tracked interface and start the VR
1446         adjustment = 50
1447         adjusted_prio = prio - adjustment
1448         vr.add_del_tracked_interface(
1449             is_add=1, sw_if_index=self.pg1.sw_if_index, prio=adjustment
1450         )
1451         vr.start_stop(is_start=1)
1452         vr.assert_state_equals(VRRP_VR_STATE_MASTER)
1453
1454         adv_configured = vr.vrrp_adv_packet(prio=prio)
1455         adv_adjusted = vr.vrrp_adv_packet(prio=adjusted_prio)
1456
1457         # tracked intf is up ->  advertised priority == configured priority
1458         self.pg0.enable_capture()
1459         rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv)
1460         self.assertEqual(rx, adv_configured)
1461
1462         # take down pg1, verify priority is now being adjusted
1463         self.pg1.admin_down()
1464         self.pg0.enable_capture()
1465         rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv)
1466         self.assertEqual(rx, adv_adjusted)
1467
1468         # bring up pg1, verify priority now matches configured value
1469         self.pg1.admin_up()
1470         self.pg0.enable_capture()
1471         rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv)
1472         self.assertEqual(rx, adv_configured)
1473
1474         # remove IP address from pg1, verify priority now being adjusted
1475         self.pg1.unconfig_ip6()
1476         self.pg0.enable_capture()
1477         rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv)
1478         self.assertEqual(rx, adv_adjusted)
1479
1480         # add IP address to pg1, verify priority now matches configured value
1481         self.pg1.config_ip6()
1482         self.pg0.enable_capture()
1483         rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv)
1484         self.assertEqual(rx, adv_configured)
1485
1486     @unittest.skipUnless(config.extended, "part of extended tests")
1487     def test_vrrp6_master_adv_unicast(self):
1488         """IPv6 Master VR advertises (unicast)"""
1489
1490         vr_id = 100
1491         prio = 255
1492         intvl = self._default_adv
1493         intvl_s = intvl * 0.01
1494         vip = self.pg0.local_ip6
1495         flags = self._default_flags | VRRP_VR_FLAG_UNICAST
1496         unicast_peer = self.pg0.remote_hosts[4]
1497         vr = VppVRRPVirtualRouter(
1498             self, self.pg0, vr_id, prio=prio, intvl=intvl, flags=flags, vips=[vip]
1499         )
1500         self._vrs.append(vr)
1501         vr.add_vpp_config()
1502         vr.set_unicast_peers([unicast_peer.ip6])
1503
1504         # After adding the VR, it should be in the init state
1505         vr.assert_state_equals(VRRP_VR_STATE_INIT)
1506
1507         # Start VR, transition to master
1508         vr.start_stop(is_start=1)
1509         vr.assert_state_equals(VRRP_VR_STATE_MASTER)
1510
1511         self.pg0.enable_capture()
1512         rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv)
1513
1514         self.assertTrue(rx.haslayer(Ether))
1515         self.assertTrue(rx.haslayer(IPv6))
1516         self.assertTrue(rx.haslayer(VRRPv3))
1517         self.assertEqual(rx[Ether].src, self.pg0.local_mac)
1518         self.assertEqual(rx[Ether].dst, unicast_peer.mac)
1519         self.assertEqual(
1520             ip6_normalize(rx[IPv6].src), ip6_normalize(self.pg0.local_ip6_ll)
1521         )
1522         self.assertEqual(ip6_normalize(rx[IPv6].dst), ip6_normalize(unicast_peer.ip6))
1523         self.assertEqual(rx[VRRPv3].vrid, vr_id)
1524         self.assertEqual(rx[VRRPv3].priority, prio)
1525         self.assertEqual(rx[VRRPv3].ipcount, 1)
1526         self.assertEqual(rx[VRRPv3].addrlist, [vip])
1527
1528
1529 if __name__ == "__main__":
1530     unittest.main(testRunner=VppTestRunner)