Code Review
/
vpp.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
review
|
tree
raw
|
inline
| side by side
npt66: add show command and rx/tx counters
[vpp.git]
/
test
/
test_vrrp.py
diff --git
a/test/test_vrrp.py
b/test/test_vrrp.py
index
6a62a88
..
9319b0f
100644
(file)
--- a/
test/test_vrrp.py
+++ b/
test/test_vrrp.py
@@
-17,9
+17,18
@@
from vpp_papi import VppEnum
from scapy.packet import raw
from scapy.layers.l2 import Ether, ARP
from scapy.layers.inet import IP, ICMP, icmptypes
from scapy.packet import raw
from scapy.layers.l2 import Ether, ARP
from scapy.layers.inet import IP, ICMP, icmptypes
-from scapy.layers.inet6 import IPv6, ipv6nh, IPv6ExtHdrHopByHop, \
- ICMPv6MLReport2, ICMPv6ND_NA, ICMPv6ND_NS, ICMPv6NDOptDstLLAddr, \
- ICMPv6NDOptSrcLLAddr, ICMPv6EchoRequest, ICMPv6EchoReply
+from scapy.layers.inet6 import (
+ IPv6,
+ ipv6nh,
+ IPv6ExtHdrHopByHop,
+ ICMPv6MLReport2,
+ ICMPv6ND_NA,
+ ICMPv6ND_NS,
+ ICMPv6NDOptDstLLAddr,
+ ICMPv6NDOptSrcLLAddr,
+ ICMPv6EchoRequest,
+ ICMPv6EchoReply,
+)
from scapy.contrib.igmpv3 import IGMPv3, IGMPv3mr, IGMPv3gr
from scapy.layers.vrrp import IPPROTO_VRRP, VRRPv3
from scapy.utils6 import in6_getnsma, in6_getnsmac
from scapy.contrib.igmpv3 import IGMPv3, IGMPv3mr, IGMPv3gr
from scapy.layers.vrrp import IPPROTO_VRRP, VRRPv3
from scapy.utils6 import in6_getnsma, in6_getnsmac
@@
-37,11
+46,11
@@
VRRP_VR_STATE_BACKUP = 1
VRRP_VR_STATE_MASTER = 2
VRRP_VR_STATE_INTF_DOWN = 3
VRRP_VR_STATE_MASTER = 2
VRRP_VR_STATE_INTF_DOWN = 3
-VRRP_INDEX_INVALID = 0x
ffffffff
+VRRP_INDEX_INVALID = 0x
FFFFFFFF
def is_non_arp(p):
def is_non_arp(p):
- """
Want to filter out advertisements, igmp, etc"""
+ """Want to filter out advertisements, igmp, etc"""
if p.haslayer(ARP):
return False
if p.haslayer(ARP):
return False
@@
-49,7
+58,7
@@
def is_non_arp(p):
def is_not_adv(p):
def is_not_adv(p):
- """
Filter out everything but advertisements. E.g. multicast RD/ND
"""
+ """
Filter out everything but advertisements. E.g. multicast RD/ND
"""
if p.haslayer(VRRPv3):
return False
if p.haslayer(VRRPv3):
return False
@@
-57,7
+66,7
@@
def is_not_adv(p):
def is_not_echo_reply(p):
def is_not_echo_reply(p):
- """
filter out advertisements and other while waiting for echo reply
"""
+ """
filter out advertisements and other while waiting for echo reply
"""
if p.haslayer(IP) and p.haslayer(ICMP):
if icmptypes[p[ICMP].type] == "echo-reply":
return False
if p.haslayer(IP) and p.haslayer(ICMP):
if icmptypes[p[ICMP].type] == "echo-reply":
return False
@@
-68,15
+77,16
@@
def is_not_echo_reply(p):
class VppVRRPVirtualRouter(VppObject):
class VppVRRPVirtualRouter(VppObject):
-
- def __init__(self,
- test,
- intf,
- vr_id,
- prio=100,
- intvl=100,
- flags=VRRP_VR_FLAG_PREEMPT,
- vips=None):
+ def __init__(
+ self,
+ test,
+ intf,
+ vr_id,
+ prio=100,
+ intvl=100,
+ flags=VRRP_VR_FLAG_PREEMPT,
+ vips=None,
+ ):
self._test = test
self._intf = intf
self._sw_if_index = self._intf.sw_if_index
self._test = test
self._intf = intf
self._sw_if_index = self._intf.sw_if_index
@@
-84,40
+94,44
@@
class VppVRRPVirtualRouter(VppObject):
self._prio = prio
self._intvl = intvl
self._flags = flags
self._prio = prio
self._intvl = intvl
self._flags = flags
- if
(flags & VRRP_VR_FLAG_IPV6)
:
+ if
flags & VRRP_VR_FLAG_IPV6
:
self._is_ipv6 = 1
self._adv_dest_mac = "33:33:00:00:00:12"
self._virtual_mac = "00:00:5e:00:02:%02x" % vr_id
self._adv_dest_ip = "ff02::12"
self._is_ipv6 = 1
self._adv_dest_mac = "33:33:00:00:00:12"
self._virtual_mac = "00:00:5e:00:02:%02x" % vr_id
self._adv_dest_ip = "ff02::12"
- self._vips =
([intf.local_ip6] if vips is None else vips)
+ self._vips =
[intf.local_ip6] if vips is None else vips
else:
self._is_ipv6 = 0
self._adv_dest_mac = "01:00:5e:00:00:12"
self._virtual_mac = "00:00:5e:00:01:%02x" % vr_id
self._adv_dest_ip = "224.0.0.18"
else:
self._is_ipv6 = 0
self._adv_dest_mac = "01:00:5e:00:00:12"
self._virtual_mac = "00:00:5e:00:01:%02x" % vr_id
self._adv_dest_ip = "224.0.0.18"
- self._vips =
([intf.local_ip4] if vips is None else vips)
+ self._vips =
[intf.local_ip4] if vips is None else vips
self._tracked_ifs = []
self._vrrp_index = VRRP_INDEX_INVALID
def add_vpp_config(self):
self._tracked_ifs = []
self._vrrp_index = VRRP_INDEX_INVALID
def add_vpp_config(self):
- self._test.vapi.vrrp_vr_add_del(is_add=1,
- sw_if_index=self._intf.sw_if_index,
- vr_id=self._vr_id,
- priority=self._prio,
- interval=self._intvl,
- flags=self._flags,
- n_addrs=len(self._vips),
- addrs=self._vips)
+ self._test.vapi.vrrp_vr_add_del(
+ is_add=1,
+ sw_if_index=self._intf.sw_if_index,
+ vr_id=self._vr_id,
+ priority=self._prio,
+ interval=self._intvl,
+ flags=self._flags,
+ n_addrs=len(self._vips),
+ addrs=self._vips,
+ )
def update_vpp_config(self):
def update_vpp_config(self):
- r = self._test.vapi.vrrp_vr_update(vrrp_index=self._vrrp_index,
- sw_if_index=self._intf.sw_if_index,
- vr_id=self._vr_id,
- priority=self._prio,
- interval=self._intvl,
- flags=self._flags,
- n_addrs=len(self._vips),
- addrs=self._vips)
+ r = self._test.vapi.vrrp_vr_update(
+ vrrp_index=self._vrrp_index,
+ sw_if_index=self._intf.sw_if_index,
+ vr_id=self._vr_id,
+ priority=self._prio,
+ interval=self._intvl,
+ flags=self._flags,
+ n_addrs=len(self._vips),
+ addrs=self._vips,
+ )
self._vrrp_index = r.vrrp_index
def delete_vpp_config(self):
self._vrrp_index = r.vrrp_index
def delete_vpp_config(self):
@@
-129,7
+143,7
@@
class VppVRRPVirtualRouter(VppObject):
if vr.config.vr_id != self._vr_id:
continue
if vr.config.vr_id != self._vr_id:
continue
- is_ipv6 =
(1 if (vr.config.flags & VRRP_VR_FLAG_IPV6) else 0)
+ is_ipv6 =
1 if (vr.config.flags & VRRP_VR_FLAG_IPV6) else 0
if is_ipv6 != self._is_ipv6:
continue
if is_ipv6 != self._is_ipv6:
continue
@@
-138,41
+152,45
@@
class VppVRRPVirtualRouter(VppObject):
return None
def remove_vpp_config(self):
return None
def remove_vpp_config(self):
- self._test.vapi.vrrp_vr_add_del(is_add=0,
- sw_if_index=self._intf.sw_if_index,
- vr_id=self._vr_id,
- priority=self._prio,
- interval=self._intvl,
- flags=self._flags,
- n_addrs=len(self._vips),
- addrs=self._vips)
+ self._test.vapi.vrrp_vr_add_del(
+ is_add=0,
+ sw_if_index=self._intf.sw_if_index,
+ vr_id=self._vr_id,
+ priority=self._prio,
+ interval=self._intvl,
+ flags=self._flags,
+ n_addrs=len(self._vips),
+ addrs=self._vips,
+ )
def start_stop(self, is_start):
def start_stop(self, is_start):
- self._test.vapi.vrrp_vr_start_stop(is_start=is_start,
- sw_if_index=self._intf.sw_if_index,
- vr_id=self._vr_id,
- is_ipv6=self._is_ipv6)
- self._start_time = (time.time() if is_start else None)
+ self._test.vapi.vrrp_vr_start_stop(
+ is_start=is_start,
+ sw_if_index=self._intf.sw_if_index,
+ vr_id=self._vr_id,
+ is_ipv6=self._is_ipv6,
+ )
+ self._start_time = time.time() if is_start else None
def add_del_tracked_interface(self, is_add, sw_if_index, prio):
args = {
def add_del_tracked_interface(self, is_add, sw_if_index, prio):
args = {
-
'sw_if_index'
: self._intf.sw_if_index,
-
'is_ipv6'
: self._is_ipv6,
-
'vr_id'
: self._vr_id,
-
'is_add'
: is_add,
-
'n_ifs'
: 1,
- 'ifs': [{'sw_if_index': sw_if_index, 'priority': prio}]
+
"sw_if_index"
: self._intf.sw_if_index,
+
"is_ipv6"
: self._is_ipv6,
+
"vr_id"
: self._vr_id,
+
"is_add"
: is_add,
+
"n_ifs"
: 1,
+ "ifs": [{"sw_if_index": sw_if_index, "priority": prio}],
}
self._test.vapi.vrrp_vr_track_if_add_del(**args)
}
self._test.vapi.vrrp_vr_track_if_add_del(**args)
- self._tracked_ifs.append(args[
'ifs'
][0])
+ self._tracked_ifs.append(args[
"ifs"
][0])
def set_unicast_peers(self, addrs):
args = {
def set_unicast_peers(self, addrs):
args = {
-
'sw_if_index'
: self._intf.sw_if_index,
-
'is_ipv6'
: self._is_ipv6,
-
'vr_id'
: self._vr_id,
-
'n_addrs'
: len(addrs),
- 'addrs': addrs
+
"sw_if_index"
: self._intf.sw_if_index,
+
"is_ipv6"
: self._is_ipv6,
+
"vr_id"
: self._vr_id,
+
"n_addrs"
: len(addrs),
+ "addrs": addrs,
}
self._test.vapi.vrrp_vr_set_peers(**args)
self._unicast_peers = addrs
}
self._test.vapi.vrrp_vr_set_peers(**args)
self._unicast_peers = addrs
@@
-210,21
+228,22
@@
class VppVRRPVirtualRouter(VppObject):
def master_down_seconds(self):
vr_details = self.query_vpp_config()
def master_down_seconds(self):
vr_details = self.query_vpp_config()
- return
(vr_details.runtime.master_down_int * 0.01)
+ return
vr_details.runtime.master_down_int * 0.01
def vrrp_adv_packet(self, prio=None, src_ip=None):
dst_ip = self._adv_dest_ip
if prio is None:
prio = self._prio
eth = Ether(dst=self._adv_dest_mac, src=self._virtual_mac)
def vrrp_adv_packet(self, prio=None, src_ip=None):
dst_ip = self._adv_dest_ip
if prio is None:
prio = self._prio
eth = Ether(dst=self._adv_dest_mac, src=self._virtual_mac)
- vrrp = VRRPv3(vrid=self._vr_id, priority=prio,
- ipcount=len(self._vips), adv=self._intvl)
+ vrrp = VRRPv3(
+ vrid=self._vr_id, priority=prio, ipcount=len(self._vips), adv=self._intvl
+ )
if self._is_ipv6:
if self._is_ipv6:
- src_ip =
(self._intf.local_ip6_ll if src_ip is None else src_ip)
+ src_ip =
self._intf.local_ip6_ll if src_ip is None else src_ip
ip = IPv6(src=src_ip, dst=dst_ip, nh=IPPROTO_VRRP, hlim=255)
vrrp.addrlist = self._vips
else:
ip = IPv6(src=src_ip, dst=dst_ip, nh=IPPROTO_VRRP, hlim=255)
vrrp.addrlist = self._vips
else:
- src_ip =
(self._intf.local_ip4 if src_ip is None else src_ip)
+ src_ip =
self._intf.local_ip4 if src_ip is None else src_ip
ip = IP(src=src_ip, dst=dst_ip, proto=IPPROTO_VRRP, ttl=255, id=0)
vrrp.addrlist = self._vips
ip = IP(src=src_ip, dst=dst_ip, proto=IPPROTO_VRRP, ttl=255, id=0)
vrrp.addrlist = self._vips
@@
-234,7
+253,7
@@
class VppVRRPVirtualRouter(VppObject):
class TestVRRP4(VppTestCase):
class TestVRRP4(VppTestCase):
- """
IPv4 VRRP Test Case
"""
+ """
IPv4 VRRP Test Case
"""
@classmethod
def setUpClass(cls):
@classmethod
def setUpClass(cls):
@@
-284,8
+303,7
@@
class TestVRRP4(VppTestCase):
self.assertEqual(ip.proto, 2)
igmp = pkt[IGMPv3]
self.assertEqual(ip.proto, 2)
igmp = pkt[IGMPv3]
- self.assertEqual(IGMPv3.igmpv3types[igmp.type],
- "Version 3 Membership Report")
+ self.assertEqual(IGMPv3.igmpv3types[igmp.type], "Version 3 Membership Report")
igmpmr = pkt[IGMPv3mr]
self.assertEqual(igmpmr.numgrp, 1)
igmpmr = pkt[IGMPv3mr]
self.assertEqual(igmpmr.numgrp, 1)
@@
-330,15
+348,15
@@
class TestVRRP4(VppTestCase):
# become master and start advertising immediately.
@unittest.skipUnless(config.extended, "part of extended tests")
def test_vrrp4_master_adv(self):
# become master and start advertising immediately.
@unittest.skipUnless(config.extended, "part of extended tests")
def test_vrrp4_master_adv(self):
- """
IPv4 Master VR advertises
"""
+ """
IPv4 Master VR advertises
"""
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
prio = 255
intvl = self._default_adv
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
prio = 255
intvl = self._default_adv
- vr = VppVRRPVirtualRouter(
self, self.pg0, 100,
- prio=prio, intvl=intvl,
-
flags=self._default_flags
)
+ vr = VppVRRPVirtualRouter(
+ self, self.pg0, 100, prio=prio, intvl=intvl, flags=self._default_flags
+ )
vr.add_vpp_config()
vr.start_stop(is_start=1)
vr.add_vpp_config()
vr.start_stop(is_start=1)
@@
-362,25
+380,30
@@
class TestVRRP4(VppTestCase):
# of parameters to test that too
@unittest.skipUnless(config.extended, "part of extended tests")
def test_vrrp4_master_adv_update(self):
# of parameters to test that too
@unittest.skipUnless(config.extended, "part of extended tests")
def test_vrrp4_master_adv_update(self):
- """
IPv4 Master VR adv + Update to Backup
"""
+ """
IPv4 Master VR adv + Update to Backup
"""
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
prio = 255
intvl = self._default_adv
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
prio = 255
intvl = self._default_adv
- vr = VppVRRPVirtualRouter(
self, self.pg0, 100,
- prio=prio, intvl=intvl,
-
flags=self._default_flags
)
+ vr = VppVRRPVirtualRouter(
+ self, self.pg0, 100, prio=prio, intvl=intvl, flags=self._default_flags
+ )
vr.update_vpp_config()
vr.start_stop(is_start=1)
self.logger.info(self.vapi.cli("show vrrp vr"))
# Update VR with lower prio and larger interval
# we need to keep old VR for the adv checks
vr.update_vpp_config()
vr.start_stop(is_start=1)
self.logger.info(self.vapi.cli("show vrrp vr"))
# Update VR with lower prio and larger interval
# we need to keep old VR for the adv checks
- upd_vr = VppVRRPVirtualRouter(self, self.pg0, 100,
- prio=100, intvl=2*intvl,
- flags=self._default_flags,
- vips=[self.pg0.remote_ip4])
+ upd_vr = VppVRRPVirtualRouter(
+ self,
+ self.pg0,
+ 100,
+ prio=100,
+ intvl=2 * intvl,
+ flags=self._default_flags,
+ vips=[self.pg0.remote_ip4],
+ )
upd_vr._vrrp_index = vr._vrrp_index
upd_vr.update_vpp_config()
start_time = time.time()
upd_vr._vrrp_index = vr._vrrp_index
upd_vr.update_vpp_config()
start_time = time.time()
@@
-403,7
+426,7
@@
class TestVRRP4(VppTestCase):
src_ip = self.pg0.remote_ip4
pkts = [upd_vr.vrrp_adv_packet(prio=110, src_ip=src_ip)]
while time.time() < end_time:
src_ip = self.pg0.remote_ip4
pkts = [upd_vr.vrrp_adv_packet(prio=110, src_ip=src_ip)]
while time.time() < end_time:
- self.send_and_assert_no_replies(self.pg0, pkts, timeout=intvl
*
0.01)
+ self.send_and_assert_no_replies(self.pg0, pkts, timeout=intvl
*
0.01)
self.logger.info(self.vapi.cli("show trace"))
upd_vr.start_stop(is_start=0)
self.logger.info(self.vapi.cli("show trace"))
upd_vr.start_stop(is_start=0)
@@
-413,7
+436,7
@@
class TestVRRP4(VppTestCase):
# long as it receives higher priority advertisements
@unittest.skipUnless(config.extended, "part of extended tests")
def test_vrrp4_backup_noadv(self):
# long as it receives higher priority advertisements
@unittest.skipUnless(config.extended, "part of extended tests")
def test_vrrp4_backup_noadv(self):
- """
IPv4 Backup VR does not advertise
"""
+ """
IPv4 Backup VR does not advertise
"""
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
@@
-421,10
+444,15
@@
class TestVRRP4(VppTestCase):
prio = 100
intvl = self._default_adv
intvl_s = intvl * 0.01
prio = 100
intvl = self._default_adv
intvl_s = intvl * 0.01
- vr = VppVRRPVirtualRouter(self, self.pg0, vr_id,
- prio=prio, intvl=intvl,
- flags=self._default_flags,
- vips=[self.pg0.remote_ip4])
+ vr = VppVRRPVirtualRouter(
+ self,
+ self.pg0,
+ vr_id,
+ prio=prio,
+ intvl=intvl,
+ flags=self._default_flags,
+ vips=[self.pg0.remote_ip4],
+ )
self._vrs.append(vr)
vr.add_vpp_config()
self._vrs.append(vr)
vr.add_vpp_config()
@@
-440,7
+468,7
@@
class TestVRRP4(VppTestCase):
# send higher prio advertisements, should not receive any
src_ip = self.pg0.remote_ip4
# send higher prio advertisements, should not receive any
src_ip = self.pg0.remote_ip4
- pkts = [vr.vrrp_adv_packet(prio=prio
+
10, src_ip=src_ip)]
+ pkts = [vr.vrrp_adv_packet(prio=prio
+
10, src_ip=src_ip)]
while time.time() < end_time:
self.send_and_assert_no_replies(self.pg0, pkts, timeout=intvl_s)
self.logger.info(self.vapi.cli("show trace"))
while time.time() < end_time:
self.send_and_assert_no_replies(self.pg0, pkts, timeout=intvl_s)
self.logger.info(self.vapi.cli("show trace"))
@@
-451,16
+479,16
@@
class TestVRRP4(VppTestCase):
self._vrs = []
def test_vrrp4_master_arp(self):
self._vrs = []
def test_vrrp4_master_arp(self):
- """
IPv4 Master VR replies to ARP
"""
+ """
IPv4 Master VR replies to ARP
"""
self.pg_start()
# VR virtual IP is the default, which is the pg local IP
vr_id = 100
prio = 255
intvl = self._default_adv
self.pg_start()
# VR virtual IP is the default, which is the pg local IP
vr_id = 100
prio = 255
intvl = self._default_adv
- vr = VppVRRPVirtualRouter(
self, self.pg0, 100,
- prio=prio, intvl=intvl,
-
flags=self._default_flags
)
+ vr = VppVRRPVirtualRouter(
+ self, self.pg0, 100, prio=prio, intvl=intvl, flags=self._default_flags
+ )
self._vrs.append(vr)
vr.add_vpp_config()
self._vrs.append(vr)
vr.add_vpp_config()
@@
-484,7
+512,7
@@
class TestVRRP4(VppTestCase):
@unittest.skipUnless(config.extended, "part of extended tests")
def test_vrrp4_backup_noarp(self):
@unittest.skipUnless(config.extended, "part of extended tests")
def test_vrrp4_backup_noarp(self):
- """
IPv4 Backup VR ignores ARP
"""
+ """
IPv4 Backup VR ignores ARP
"""
# We need an address for a virtual IP that is not the IP that
# ARP requests will originate from
# We need an address for a virtual IP that is not the IP that
# ARP requests will originate from
@@
-492,16
+520,24
@@
class TestVRRP4(VppTestCase):
prio = 100
intvl = self._default_adv
vip = self.pg0.remote_hosts[1].ip4
prio = 100
intvl = self._default_adv
vip = self.pg0.remote_hosts[1].ip4
- vr = VppVRRPVirtualRouter(self, self.pg0, vr_id,
- prio=prio, intvl=intvl,
- flags=self._default_flags,
- vips=[vip])
+ vr = VppVRRPVirtualRouter(
+ self,
+ self.pg0,
+ vr_id,
+ prio=prio,
+ intvl=intvl,
+ flags=self._default_flags,
+ vips=[vip],
+ )
self._vrs.append(vr)
vr.add_vpp_config()
self._vrs.append(vr)
vr.add_vpp_config()
- arp_req = (Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg0.remote_mac) /
- ARP(op=ARP.who_has, pdst=vip,
- psrc=self.pg0.remote_ip4, hwsrc=self.pg0.remote_mac))
+ arp_req = Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg0.remote_mac) / ARP(
+ op=ARP.who_has,
+ pdst=vip,
+ psrc=self.pg0.remote_ip4,
+ hwsrc=self.pg0.remote_mac,
+ )
# Before the VR is started make sure no reply to request for VIP
self.pg_start()
# Before the VR is started make sure no reply to request for VIP
self.pg_start()
@@
-510,7
+546,7
@@
class TestVRRP4(VppTestCase):
# VR should start in backup state and still should not reply to ARP
# send a higher priority adv to make sure it does not become master
# VR should start in backup state and still should not reply to ARP
# send a higher priority adv to make sure it does not become master
- adv = vr.vrrp_adv_packet(prio=prio
+
10, src_ip=self.pg0.remote_ip4)
+ adv = vr.vrrp_adv_packet(prio=prio
+
10, src_ip=self.pg0.remote_ip4)
vr.start_stop(is_start=1)
self.send_and_assert_no_replies(self.pg0, [adv, arp_req], timeout=1)
vr.start_stop(is_start=1)
self.send_and_assert_no_replies(self.pg0, [adv, arp_req], timeout=1)
@@
-520,17
+556,22
@@
class TestVRRP4(VppTestCase):
@unittest.skipUnless(config.extended, "part of extended tests")
def test_vrrp4_election(self):
@unittest.skipUnless(config.extended, "part of extended tests")
def test_vrrp4_election(self):
- """
IPv4 Backup VR becomes master if no advertisements received
"""
+ """
IPv4 Backup VR becomes master if no advertisements received
"""
vr_id = 100
prio = 100
intvl = self._default_adv
intvl_s = intvl * 0.01
vip = self.pg0.remote_ip4
vr_id = 100
prio = 100
intvl = self._default_adv
intvl_s = intvl * 0.01
vip = self.pg0.remote_ip4
- vr = VppVRRPVirtualRouter(self, self.pg0, vr_id,
- prio=prio, intvl=intvl,
- flags=self._default_flags,
- vips=[vip])
+ vr = VppVRRPVirtualRouter(
+ self,
+ self.pg0,
+ vr_id,
+ prio=prio,
+ intvl=intvl,
+ flags=self._default_flags,
+ vips=[vip],
+ )
self._vrs.append(vr)
vr.add_vpp_config()
self._vrs.append(vr)
vr.add_vpp_config()
@@
-557,17
+598,22
@@
class TestVRRP4(VppTestCase):
@unittest.skipUnless(config.extended, "part of extended tests")
def test_vrrp4_backup_preempts(self):
@unittest.skipUnless(config.extended, "part of extended tests")
def test_vrrp4_backup_preempts(self):
- """
IPv4 Backup VR preempts lower priority master
"""
+ """
IPv4 Backup VR preempts lower priority master
"""
vr_id = 100
prio = 100
intvl = self._default_adv
intvl_s = intvl * 0.01
vip = self.pg0.remote_ip4
vr_id = 100
prio = 100
intvl = self._default_adv
intvl_s = intvl * 0.01
vip = self.pg0.remote_ip4
- vr = VppVRRPVirtualRouter(self, self.pg0, vr_id,
- prio=prio, intvl=intvl,
- flags=self._default_flags,
- vips=[vip])
+ vr = VppVRRPVirtualRouter(
+ self,
+ self.pg0,
+ vr_id,
+ prio=prio,
+ intvl=intvl,
+ flags=self._default_flags,
+ vips=[vip],
+ )
self._vrs.append(vr)
vr.add_vpp_config()
self._vrs.append(vr)
vr.add_vpp_config()
@@
-583,7
+629,7
@@
class TestVRRP4(VppTestCase):
# send lower prio advertisements until timer expires
src_ip = self.pg0.remote_ip4
# send lower prio advertisements until timer expires
src_ip = self.pg0.remote_ip4
- pkts = [vr.vrrp_adv_packet(prio=prio
-
10, src_ip=src_ip)]
+ pkts = [vr.vrrp_adv_packet(prio=prio
-
10, src_ip=src_ip)]
while time.time() + intvl_s < end_time:
self.send_and_assert_no_replies(self.pg0, pkts, timeout=intvl_s)
self.logger.info(self.vapi.cli("show trace"))
while time.time() + intvl_s < end_time:
self.send_and_assert_no_replies(self.pg0, pkts, timeout=intvl_s)
self.logger.info(self.vapi.cli("show trace"))
@@
-595,7
+641,7
@@
class TestVRRP4(VppTestCase):
@unittest.skipUnless(config.extended, "part of extended tests")
def test_vrrp4_master_preempted(self):
@unittest.skipUnless(config.extended, "part of extended tests")
def test_vrrp4_master_preempted(self):
- """
IPv4 Master VR preempted by higher priority backup
"""
+ """
IPv4 Master VR preempted by higher priority backup
"""
# A prio 255 VR cannot be preempted so the prio has to be lower and
# we have to wait for it to take over
# A prio 255 VR cannot be preempted so the prio has to be lower and
# we have to wait for it to take over
@@
-603,10
+649,15
@@
class TestVRRP4(VppTestCase):
prio = 100
intvl = self._default_adv
vip = self.pg0.remote_ip4
prio = 100
intvl = self._default_adv
vip = self.pg0.remote_ip4
- vr = VppVRRPVirtualRouter(self, self.pg0, vr_id,
- prio=prio, intvl=intvl,
- flags=self._default_flags,
- vips=[vip])
+ vr = VppVRRPVirtualRouter(
+ self,
+ self.pg0,
+ vr_id,
+ prio=prio,
+ intvl=intvl,
+ flags=self._default_flags,
+ vips=[vip],
+ )
self._vrs.append(vr)
vr.add_vpp_config()
self._vrs.append(vr)
vr.add_vpp_config()
@@
-632,7
+683,7
@@
class TestVRRP4(VppTestCase):
@unittest.skipUnless(config.extended, "part of extended tests")
def test_vrrp4_accept_mode_disabled(self):
@unittest.skipUnless(config.extended, "part of extended tests")
def test_vrrp4_accept_mode_disabled(self):
- """
IPv4 Master VR does not reply for VIP w/ accept mode off
"""
+ """
IPv4 Master VR does not reply for VIP w/ accept mode off
"""
# accept mode only matters when prio < 255, so it will have to
# come up as a backup and take over as master after the timeout
# accept mode only matters when prio < 255, so it will have to
# come up as a backup and take over as master after the timeout
@@
-640,10
+691,15
@@
class TestVRRP4(VppTestCase):
prio = 100
intvl = self._default_adv
vip = self.pg0.remote_hosts[4].ip4
prio = 100
intvl = self._default_adv
vip = self.pg0.remote_hosts[4].ip4
- vr = VppVRRPVirtualRouter(self, self.pg0, vr_id,
- prio=prio, intvl=intvl,
- flags=self._default_flags,
- vips=[vip])
+ vr = VppVRRPVirtualRouter(
+ self,
+ self.pg0,
+ vr_id,
+ prio=prio,
+ intvl=intvl,
+ flags=self._default_flags,
+ vips=[vip],
+ )
self._vrs.append(vr)
vr.add_vpp_config()
self._vrs.append(vr)
vr.add_vpp_config()
@@
-661,9
+717,11
@@
class TestVRRP4(VppTestCase):
vr.assert_state_equals(VRRP_VR_STATE_MASTER)
# send an ICMP echo to the VR virtual IP address
vr.assert_state_equals(VRRP_VR_STATE_MASTER)
# send an ICMP echo to the VR virtual IP address
- echo = (Ether(dst=vr.virtual_mac(), src=self.pg0.remote_mac) /
- IP(dst=vip, src=self.pg0.remote_ip4) /
- ICMP(seq=1, id=self.pg0.sw_if_index, type='echo-request'))
+ echo = (
+ Ether(dst=vr.virtual_mac(), src=self.pg0.remote_mac)
+ / IP(dst=vip, src=self.pg0.remote_ip4)
+ / ICMP(seq=1, id=self.pg0.sw_if_index, type="echo-request")
+ )
self.pg_send(self.pg0, [echo])
# wait for an echo reply. none should be received
self.pg_send(self.pg0, [echo])
# wait for an echo reply. none should be received
@@
-672,7
+730,7
@@
class TestVRRP4(VppTestCase):
@unittest.skipUnless(config.extended, "part of extended tests")
def test_vrrp4_accept_mode_enabled(self):
@unittest.skipUnless(config.extended, "part of extended tests")
def test_vrrp4_accept_mode_enabled(self):
- """
IPv4 Master VR replies for VIP w/ accept mode on
"""
+ """
IPv4 Master VR replies for VIP w/ accept mode on
"""
# A prio 255 VR cannot be preempted so the prio has to be lower and
# we have to wait for it to take over
# A prio 255 VR cannot be preempted so the prio has to be lower and
# we have to wait for it to take over
@@
-680,11
+738,10
@@
class TestVRRP4(VppTestCase):
prio = 100
intvl = self._default_adv
vip = self.pg0.remote_hosts[4].ip4
prio = 100
intvl = self._default_adv
vip = self.pg0.remote_hosts[4].ip4
- flags = (VRRP_VR_FLAG_PREEMPT | VRRP_VR_FLAG_ACCEPT)
- vr = VppVRRPVirtualRouter(self, self.pg0, vr_id,
- prio=prio, intvl=intvl,
- flags=flags,
- vips=[vip])
+ flags = VRRP_VR_FLAG_PREEMPT | VRRP_VR_FLAG_ACCEPT
+ vr = VppVRRPVirtualRouter(
+ self, self.pg0, vr_id, prio=prio, intvl=intvl, flags=flags, vips=[vip]
+ )
self._vrs.append(vr)
vr.add_vpp_config()
self._vrs.append(vr)
vr.add_vpp_config()
@@
-702,15
+759,18
@@
class TestVRRP4(VppTestCase):
vr.assert_state_equals(VRRP_VR_STATE_MASTER)
# send an ICMP echo to the VR virtual IP address
vr.assert_state_equals(VRRP_VR_STATE_MASTER)
# send an ICMP echo to the VR virtual IP address
- echo = (Ether(dst=vr.virtual_mac(), src=self.pg0.remote_mac) /
- IP(dst=vip, src=self.pg0.remote_ip4) /
- ICMP(seq=1, id=self.pg0.sw_if_index, type='echo-request'))
+ echo = (
+ Ether(dst=vr.virtual_mac(), src=self.pg0.remote_mac)
+ / IP(dst=vip, src=self.pg0.remote_ip4)
+ / ICMP(seq=1, id=self.pg0.sw_if_index, type="echo-request")
+ )
self.pg_send(self.pg0, [echo])
# wait for an echo reply.
time.sleep(1)
self.pg_send(self.pg0, [echo])
# wait for an echo reply.
time.sleep(1)
- rx_pkts = self.pg0.get_capture(expected_count=1, timeout=1,
- filter_out_fn=is_not_echo_reply)
+ rx_pkts = self.pg0.get_capture(
+ expected_count=1, timeout=1, filter_out_fn=is_not_echo_reply
+ )
self.assertEqual(rx_pkts[0][IP].src, vip)
self.assertEqual(rx_pkts[0][IP].dst, self.pg0.remote_ip4)
self.assertEqual(rx_pkts[0][IP].src, vip)
self.assertEqual(rx_pkts[0][IP].dst, self.pg0.remote_ip4)
@@
-720,17
+780,22
@@
class TestVRRP4(VppTestCase):
@unittest.skipUnless(config.extended, "part of extended tests")
def test_vrrp4_intf_tracking(self):
@unittest.skipUnless(config.extended, "part of extended tests")
def test_vrrp4_intf_tracking(self):
- """
IPv4 Master VR adjusts priority based on tracked interface
"""
+ """
IPv4 Master VR adjusts priority based on tracked interface
"""
vr_id = 100
prio = 255
intvl = self._default_adv
intvl_s = intvl * 0.01
vip = self.pg0.local_ip4
vr_id = 100
prio = 255
intvl = self._default_adv
intvl_s = intvl * 0.01
vip = self.pg0.local_ip4
- vr = VppVRRPVirtualRouter(self, self.pg0, vr_id,
- prio=prio, intvl=intvl,
- flags=self._default_flags,
- vips=[vip])
+ vr = VppVRRPVirtualRouter(
+ self,
+ self.pg0,
+ vr_id,
+ prio=prio,
+ intvl=intvl,
+ flags=self._default_flags,
+ vips=[vip],
+ )
self._vrs.append(vr)
vr.add_vpp_config()
self._vrs.append(vr)
vr.add_vpp_config()
@@
-740,9
+805,9
@@
class TestVRRP4(VppTestCase):
# add pg1 as a tracked interface and start the VR
adjustment = 50
adjusted_prio = prio - adjustment
# add pg1 as a tracked interface and start the VR
adjustment = 50
adjusted_prio = prio - adjustment
- vr.add_del_tracked_interface(
is_add=1,
- sw_if_index=self.pg1.sw_if_index,
-
prio=adjustment
)
+ vr.add_del_tracked_interface(
+ is_add=1, sw_if_index=self.pg1.sw_if_index, prio=adjustment
+ )
vr.start_stop(is_start=1)
vr.assert_state_equals(VRRP_VR_STATE_MASTER)
vr.start_stop(is_start=1)
vr.assert_state_equals(VRRP_VR_STATE_MASTER)
@@
-751,53
+816,47
@@
class TestVRRP4(VppTestCase):
# tracked intf is up -> advertised priority == configured priority
self.pg0.enable_capture()
# tracked intf is up -> advertised priority == configured priority
self.pg0.enable_capture()
- rx = self.pg0.wait_for_packet(timeout=intvl_s,
- filter_out_fn=is_not_adv)
+ rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv)
self.assertEqual(rx, adv_configured)
# take down pg1, verify priority is now being adjusted
self.pg1.admin_down()
self.pg0.enable_capture()
self.assertEqual(rx, adv_configured)
# take down pg1, verify priority is now being adjusted
self.pg1.admin_down()
self.pg0.enable_capture()
- rx = self.pg0.wait_for_packet(timeout=intvl_s,
- filter_out_fn=is_not_adv)
+ rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv)
self.assertEqual(rx, adv_adjusted)
# bring up pg1, verify priority now matches configured value
self.pg1.admin_up()
self.pg0.enable_capture()
self.assertEqual(rx, adv_adjusted)
# bring up pg1, verify priority now matches configured value
self.pg1.admin_up()
self.pg0.enable_capture()
- rx = self.pg0.wait_for_packet(timeout=intvl_s,
- filter_out_fn=is_not_adv)
+ rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv)
self.assertEqual(rx, adv_configured)
# remove IP address from pg1, verify priority now being adjusted
self.pg1.unconfig_ip4()
self.pg0.enable_capture()
self.assertEqual(rx, adv_configured)
# remove IP address from pg1, verify priority now being adjusted
self.pg1.unconfig_ip4()
self.pg0.enable_capture()
- rx = self.pg0.wait_for_packet(timeout=intvl_s,
- filter_out_fn=is_not_adv)
+ rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv)
self.assertEqual(rx, adv_adjusted)
# add IP address to pg1, verify priority now matches configured value
self.pg1.config_ip4()
self.pg0.enable_capture()
self.assertEqual(rx, adv_adjusted)
# add IP address to pg1, verify priority now matches configured value
self.pg1.config_ip4()
self.pg0.enable_capture()
- rx = self.pg0.wait_for_packet(timeout=intvl_s,
- filter_out_fn=is_not_adv)
+ rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv)
self.assertEqual(rx, adv_configured)
@unittest.skipUnless(config.extended, "part of extended tests")
def test_vrrp4_master_adv_unicast(self):
self.assertEqual(rx, adv_configured)
@unittest.skipUnless(config.extended, "part of extended tests")
def test_vrrp4_master_adv_unicast(self):
- """
IPv4 Master VR advertises (unicast)
"""
+ """
IPv4 Master VR advertises (unicast)
"""
vr_id = 100
prio = 255
intvl = self._default_adv
intvl_s = intvl * 0.01
vip = self.pg0.local_ip4
vr_id = 100
prio = 255
intvl = self._default_adv
intvl_s = intvl * 0.01
vip = self.pg0.local_ip4
- flags =
(self._default_flags | VRRP_VR_FLAG_UNICAST)
+ flags =
self._default_flags | VRRP_VR_FLAG_UNICAST
unicast_peer = self.pg0.remote_hosts[4]
unicast_peer = self.pg0.remote_hosts[4]
- vr = VppVRRPVirtualRouter(self, self.pg0, vr_id,
- prio=prio, intvl=intvl,
- flags=flags,
- vips=[vip])
+ vr = VppVRRPVirtualRouter(
+ self, self.pg0, vr_id, prio=prio, intvl=intvl, flags=flags, vips=[vip]
+ )
self._vrs.append(vr)
vr.add_vpp_config()
vr.set_unicast_peers([unicast_peer.ip4])
self._vrs.append(vr)
vr.add_vpp_config()
vr.set_unicast_peers([unicast_peer.ip4])
@@
-810,8
+869,7
@@
class TestVRRP4(VppTestCase):
vr.assert_state_equals(VRRP_VR_STATE_MASTER)
self.pg0.enable_capture()
vr.assert_state_equals(VRRP_VR_STATE_MASTER)
self.pg0.enable_capture()
- rx = self.pg0.wait_for_packet(timeout=intvl_s,
- filter_out_fn=is_not_adv)
+ rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv)
self.assertTrue(rx.haslayer(Ether))
self.assertTrue(rx.haslayer(IP))
self.assertTrue(rx.haslayer(Ether))
self.assertTrue(rx.haslayer(IP))
@@
-827,7
+885,7
@@
class TestVRRP4(VppTestCase):
class TestVRRP6(VppTestCase):
class TestVRRP6(VppTestCase):
- """
IPv6 VRRP Test Case
"""
+ """
IPv6 VRRP Test Case
"""
@classmethod
def setUpClass(cls):
@classmethod
def setUpClass(cls):
@@
-849,7
+907,7
@@
class TestVRRP6(VppTestCase):
i.configure_ipv6_neighbors()
self._vrs = []
i.configure_ipv6_neighbors()
self._vrs = []
- self._default_flags =
(VRRP_VR_FLAG_IPV6 | VRRP_VR_FLAG_PREEMPT)
+ self._default_flags =
VRRP_VR_FLAG_IPV6 | VRRP_VR_FLAG_PREEMPT
self._default_adv = 100
def tearDown(self):
self._default_adv = 100
def tearDown(self):
@@
-922,15
+980,15
@@
class TestVRRP6(VppTestCase):
# become master and start advertising immediately.
@unittest.skipUnless(config.extended, "part of extended tests")
def test_vrrp6_master_adv(self):
# become master and start advertising immediately.
@unittest.skipUnless(config.extended, "part of extended tests")
def test_vrrp6_master_adv(self):
- """
IPv6 Master VR advertises
"""
+ """
IPv6 Master VR advertises
"""
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
prio = 255
intvl = self._default_adv
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
prio = 255
intvl = self._default_adv
- vr = VppVRRPVirtualRouter(
self, self.pg0, 100,
- prio=prio, intvl=intvl,
-
flags=self._default_flags
)
+ vr = VppVRRPVirtualRouter(
+ self, self.pg0, 100, prio=prio, intvl=intvl, flags=self._default_flags
+ )
self._vrs.append(vr)
vr.add_vpp_config()
self._vrs.append(vr)
vr.add_vpp_config()
@@
-956,25
+1014,30
@@
class TestVRRP6(VppTestCase):
# of parameters to test that too
@unittest.skipUnless(config.extended, "part of extended tests")
def test_vrrp6_master_adv_update(self):
# of parameters to test that too
@unittest.skipUnless(config.extended, "part of extended tests")
def test_vrrp6_master_adv_update(self):
- """
IPv6 Master VR adv + Update to Backup
"""
+ """
IPv6 Master VR adv + Update to Backup
"""
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
prio = 255
intvl = self._default_adv
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
prio = 255
intvl = self._default_adv
- vr = VppVRRPVirtualRouter(
self, self.pg0, 100,
- prio=prio, intvl=intvl,
-
flags=self._default_flags
)
+ vr = VppVRRPVirtualRouter(
+ self, self.pg0, 100, prio=prio, intvl=intvl, flags=self._default_flags
+ )
vr.update_vpp_config()
vr.start_stop(is_start=1)
self.logger.info(self.vapi.cli("show vrrp vr"))
# Update VR with lower prio and larger interval
# we need to keep old VR for the adv checks
vr.update_vpp_config()
vr.start_stop(is_start=1)
self.logger.info(self.vapi.cli("show vrrp vr"))
# Update VR with lower prio and larger interval
# we need to keep old VR for the adv checks
- upd_vr = VppVRRPVirtualRouter(self, self.pg0, 100,
- prio=100, intvl=2*intvl,
- flags=self._default_flags,
- vips=[self.pg0.remote_ip6])
+ upd_vr = VppVRRPVirtualRouter(
+ self,
+ self.pg0,
+ 100,
+ prio=100,
+ intvl=2 * intvl,
+ flags=self._default_flags,
+ vips=[self.pg0.remote_ip6],
+ )
upd_vr._vrrp_index = vr._vrrp_index
upd_vr.update_vpp_config()
start_time = time.time()
upd_vr._vrrp_index = vr._vrrp_index
upd_vr.update_vpp_config()
start_time = time.time()
@@
-1000,7
+1063,8
@@
class TestVRRP6(VppTestCase):
end_time = start_time + 2 * upd_vr.master_down_seconds()
while time.time() < end_time:
self.send_and_assert_no_replies(
end_time = start_time + 2 * upd_vr.master_down_seconds()
while time.time() < end_time:
self.send_and_assert_no_replies(
- self.pg0, pkts, timeout=0.01*upd_vr._intvl)
+ self.pg0, pkts, timeout=0.01 * upd_vr._intvl
+ )
self.logger.info(self.vapi.cli("show trace"))
vr.start_stop(is_start=0)
self.logger.info(self.vapi.cli("show trace"))
vr.start_stop(is_start=0)
@@
-1010,7
+1074,7
@@
class TestVRRP6(VppTestCase):
# long as it receives higher priority advertisements
@unittest.skipUnless(config.extended, "part of extended tests")
def test_vrrp6_backup_noadv(self):
# long as it receives higher priority advertisements
@unittest.skipUnless(config.extended, "part of extended tests")
def test_vrrp6_backup_noadv(self):
- """
IPv6 Backup VR does not advertise
"""
+ """
IPv6 Backup VR does not advertise
"""
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
@@
-1018,10
+1082,15
@@
class TestVRRP6(VppTestCase):
prio = 100
intvl = self._default_adv
intvl_s = intvl * 0.01
prio = 100
intvl = self._default_adv
intvl_s = intvl * 0.01
- vr = VppVRRPVirtualRouter(self, self.pg0, vr_id,
- prio=prio, intvl=intvl,
- flags=self._default_flags,
- vips=[self.pg0.remote_ip6])
+ vr = VppVRRPVirtualRouter(
+ self,
+ self.pg0,
+ vr_id,
+ prio=prio,
+ intvl=intvl,
+ flags=self._default_flags,
+ vips=[self.pg0.remote_ip6],
+ )
vr.add_vpp_config()
self._vrs.append(vr)
vr.add_vpp_config()
self._vrs.append(vr)
@@
-1037,7
+1106,7
@@
class TestVRRP6(VppTestCase):
# send higher prio advertisements, should not see VPP send any
src_ip = self.pg0.remote_ip6_ll
num_advs = 5
# send higher prio advertisements, should not see VPP send any
src_ip = self.pg0.remote_ip6_ll
num_advs = 5
- pkts = [vr.vrrp_adv_packet(prio=prio
+
10, src_ip=src_ip)]
+ pkts = [vr.vrrp_adv_packet(prio=prio
+
10, src_ip=src_ip)]
self.logger.info(self.vapi.cli("show vlib graph"))
while time.time() < end_time:
self.send_and_assert_no_replies(self.pg0, pkts, timeout=intvl_s)
self.logger.info(self.vapi.cli("show vlib graph"))
while time.time() < end_time:
self.send_and_assert_no_replies(self.pg0, pkts, timeout=intvl_s)
@@
-1050,16
+1119,16
@@
class TestVRRP6(VppTestCase):
self._vrs = []
def test_vrrp6_master_nd(self):
self._vrs = []
def test_vrrp6_master_nd(self):
- """
IPv6 Master VR replies to NDP
"""
+ """
IPv6 Master VR replies to NDP
"""
self.pg_start()
# VR virtual IP is the default, which is the pg local IP
vr_id = 100
prio = 255
intvl = self._default_adv
self.pg_start()
# VR virtual IP is the default, which is the pg local IP
vr_id = 100
prio = 255
intvl = self._default_adv
- vr = VppVRRPVirtualRouter(
self, self.pg0, 100,
- prio=prio, intvl=intvl,
-
flags=self._default_flags
)
+ vr = VppVRRPVirtualRouter(
+ self, self.pg0, 100, prio=prio, intvl=intvl, flags=self._default_flags
+ )
vr.add_vpp_config()
self._vrs.append(vr)
vr.add_vpp_config()
self._vrs.append(vr)
@@
-1082,7
+1151,7
@@
class TestVRRP6(VppTestCase):
@unittest.skipUnless(config.extended, "part of extended tests")
def test_vrrp6_backup_nond(self):
@unittest.skipUnless(config.extended, "part of extended tests")
def test_vrrp6_backup_nond(self):
- """
IPv6 Backup VR ignores NDP
"""
+ """
IPv6 Backup VR ignores NDP
"""
# We need an address for a virtual IP that is not the IP that
# ARP requests will originate from
# We need an address for a virtual IP that is not the IP that
# ARP requests will originate from
@@
-1091,10
+1160,15
@@
class TestVRRP6(VppTestCase):
intvl = self._default_adv
intvl_s = intvl * 0.01
vip = self.pg0.remote_hosts[1].ip6
intvl = self._default_adv
intvl_s = intvl * 0.01
vip = self.pg0.remote_hosts[1].ip6
- vr = VppVRRPVirtualRouter(self, self.pg0, vr_id,
- prio=prio, intvl=intvl,
- flags=self._default_flags,
- vips=[vip])
+ vr = VppVRRPVirtualRouter(
+ self,
+ self.pg0,
+ vr_id,
+ prio=prio,
+ intvl=intvl,
+ flags=self._default_flags,
+ vips=[vip],
+ )
vr.add_vpp_config()
self._vrs.append(vr)
vr.add_vpp_config()
self._vrs.append(vr)
@@
-1102,36
+1176,43
@@
class TestVRRP6(VppTestCase):
dmac = in6_getnsmac(nsma)
dst_ip = inet_ntop(socket.AF_INET6, nsma)
dmac = in6_getnsmac(nsma)
dst_ip = inet_ntop(socket.AF_INET6, nsma)
- ndp_req = (Ether(dst=dmac, src=self.pg0.remote_mac) /
- IPv6(dst=dst_ip, src=self.pg0.remote_ip6) /
- ICMPv6ND_NS(tgt=vip) /
- ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac))
+ ndp_req = (
+ Ether(dst=dmac, src=self.pg0.remote_mac)
+ / IPv6(dst=dst_ip, src=self.pg0.remote_ip6)
+ / ICMPv6ND_NS(tgt=vip)
+ / ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac)
+ )
# Before the VR is started make sure no reply to request for VIP
self.send_and_assert_no_replies(self.pg0, [ndp_req], timeout=1)
# VR should start in backup state and still should not reply to NDP
# send a higher priority adv to make sure it does not become master
# Before the VR is started make sure no reply to request for VIP
self.send_and_assert_no_replies(self.pg0, [ndp_req], timeout=1)
# VR should start in backup state and still should not reply to NDP
# send a higher priority adv to make sure it does not become master
- adv = vr.vrrp_adv_packet(prio=prio
+
10, src_ip=self.pg0.remote_ip6)
+ adv = vr.vrrp_adv_packet(prio=prio
+
10, src_ip=self.pg0.remote_ip6)
pkts = [adv, ndp_req]
vr.start_stop(is_start=1)
pkts = [adv, ndp_req]
vr.start_stop(is_start=1)
- self.send_and_assert_no_replies(self.pg0, pkts,
timeout=intvl_s)
+ self.send_and_assert_no_replies(self.pg0, pkts, timeout=intvl_s)
vr.start_stop(is_start=0)
@unittest.skipUnless(config.extended, "part of extended tests")
def test_vrrp6_election(self):
vr.start_stop(is_start=0)
@unittest.skipUnless(config.extended, "part of extended tests")
def test_vrrp6_election(self):
- """
IPv6 Backup VR becomes master if no advertisements received
"""
+ """
IPv6 Backup VR becomes master if no advertisements received
"""
vr_id = 100
prio = 100
intvl = self._default_adv
intvl_s = intvl * 0.01
vip = self.pg0.remote_ip6
vr_id = 100
prio = 100
intvl = self._default_adv
intvl_s = intvl * 0.01
vip = self.pg0.remote_ip6
- vr = VppVRRPVirtualRouter(self, self.pg0, vr_id,
- prio=prio, intvl=intvl,
- flags=self._default_flags,
- vips=[vip])
+ vr = VppVRRPVirtualRouter(
+ self,
+ self.pg0,
+ vr_id,
+ prio=prio,
+ intvl=intvl,
+ flags=self._default_flags,
+ vips=[vip],
+ )
self._vrs.append(vr)
vr.add_vpp_config()
self._vrs.append(vr)
vr.add_vpp_config()
@@
-1158,17
+1239,22
@@
class TestVRRP6(VppTestCase):
@unittest.skipUnless(config.extended, "part of extended tests")
def test_vrrp6_backup_preempts(self):
@unittest.skipUnless(config.extended, "part of extended tests")
def test_vrrp6_backup_preempts(self):
- """
IPv6 Backup VR preempts lower priority master
"""
+ """
IPv6 Backup VR preempts lower priority master
"""
vr_id = 100
prio = 100
intvl = self._default_adv
intvl_s = intvl * 0.01
vip = self.pg0.remote_ip6
vr_id = 100
prio = 100
intvl = self._default_adv
intvl_s = intvl * 0.01
vip = self.pg0.remote_ip6
- vr = VppVRRPVirtualRouter(self, self.pg0, vr_id,
- prio=prio, intvl=intvl,
- flags=self._default_flags,
- vips=[vip])
+ vr = VppVRRPVirtualRouter(
+ self,
+ self.pg0,
+ vr_id,
+ prio=prio,
+ intvl=intvl,
+ flags=self._default_flags,
+ vips=[vip],
+ )
self._vrs.append(vr)
vr.add_vpp_config()
self._vrs.append(vr)
vr.add_vpp_config()
@@
-1184,7
+1270,7
@@
class TestVRRP6(VppTestCase):
# send lower prio advertisements until timer expires
src_ip = self.pg0.remote_ip6
# send lower prio advertisements until timer expires
src_ip = self.pg0.remote_ip6
- pkts = [vr.vrrp_adv_packet(prio=prio
-
10, src_ip=src_ip)]
+ pkts = [vr.vrrp_adv_packet(prio=prio
-
10, src_ip=src_ip)]
while (time.time() + intvl_s) < end_time:
self.send_and_assert_no_replies(self.pg0, pkts, timeout=intvl_s)
self.logger.info(self.vapi.cli("show trace"))
while (time.time() + intvl_s) < end_time:
self.send_and_assert_no_replies(self.pg0, pkts, timeout=intvl_s)
self.logger.info(self.vapi.cli("show trace"))
@@
-1196,7
+1282,7
@@
class TestVRRP6(VppTestCase):
@unittest.skipUnless(config.extended, "part of extended tests")
def test_vrrp6_master_preempted(self):
@unittest.skipUnless(config.extended, "part of extended tests")
def test_vrrp6_master_preempted(self):
- """
IPv6 Master VR preempted by higher priority backup
"""
+ """
IPv6 Master VR preempted by higher priority backup
"""
# A prio 255 VR cannot be preempted so the prio has to be lower and
# we have to wait for it to take over
# A prio 255 VR cannot be preempted so the prio has to be lower and
# we have to wait for it to take over
@@
-1204,10
+1290,15
@@
class TestVRRP6(VppTestCase):
prio = 100
intvl = self._default_adv
vip = self.pg0.remote_ip6
prio = 100
intvl = self._default_adv
vip = self.pg0.remote_ip6
- vr = VppVRRPVirtualRouter(self, self.pg0, vr_id,
- prio=prio, intvl=intvl,
- flags=self._default_flags,
- vips=[vip])
+ vr = VppVRRPVirtualRouter(
+ self,
+ self.pg0,
+ vr_id,
+ prio=prio,
+ intvl=intvl,
+ flags=self._default_flags,
+ vips=[vip],
+ )
self._vrs.append(vr)
vr.add_vpp_config()
self._vrs.append(vr)
vr.add_vpp_config()
@@
-1233,7
+1324,7
@@
class TestVRRP6(VppTestCase):
@unittest.skipUnless(config.extended, "part of extended tests")
def test_vrrp6_accept_mode_disabled(self):
@unittest.skipUnless(config.extended, "part of extended tests")
def test_vrrp6_accept_mode_disabled(self):
- """
IPv6 Master VR does not reply for VIP w/ accept mode off
"""
+ """
IPv6 Master VR does not reply for VIP w/ accept mode off
"""
# accept mode only matters when prio < 255, so it will have to
# come up as a backup and take over as master after the timeout
# accept mode only matters when prio < 255, so it will have to
# come up as a backup and take over as master after the timeout
@@
-1241,10
+1332,15
@@
class TestVRRP6(VppTestCase):
prio = 100
intvl = self._default_adv
vip = self.pg0.remote_hosts[4].ip6
prio = 100
intvl = self._default_adv
vip = self.pg0.remote_hosts[4].ip6
- vr = VppVRRPVirtualRouter(self, self.pg0, vr_id,
- prio=prio, intvl=intvl,
- flags=self._default_flags,
- vips=[vip])
+ vr = VppVRRPVirtualRouter(
+ self,
+ self.pg0,
+ vr_id,
+ prio=prio,
+ intvl=intvl,
+ flags=self._default_flags,
+ vips=[vip],
+ )
self._vrs.append(vr)
vr.add_vpp_config()
self._vrs.append(vr)
vr.add_vpp_config()
@@
-1262,9
+1358,11
@@
class TestVRRP6(VppTestCase):
vr.assert_state_equals(VRRP_VR_STATE_MASTER)
# send an ICMPv6 echo to the VR virtual IP address
vr.assert_state_equals(VRRP_VR_STATE_MASTER)
# send an ICMPv6 echo to the VR virtual IP address
- echo = (Ether(dst=vr.virtual_mac(), src=self.pg0.remote_mac) /
- IPv6(dst=vip, src=self.pg0.remote_ip6) /
- ICMPv6EchoRequest(seq=1, id=self.pg0.sw_if_index))
+ echo = (
+ Ether(dst=vr.virtual_mac(), src=self.pg0.remote_mac)
+ / IPv6(dst=vip, src=self.pg0.remote_ip6)
+ / ICMPv6EchoRequest(seq=1, id=self.pg0.sw_if_index)
+ )
self.pg_send(self.pg0, [echo])
# wait for an echo reply. none should be received
self.pg_send(self.pg0, [echo])
# wait for an echo reply. none should be received
@@
-1273,7
+1371,7
@@
class TestVRRP6(VppTestCase):
@unittest.skipUnless(config.extended, "part of extended tests")
def test_vrrp6_accept_mode_enabled(self):
@unittest.skipUnless(config.extended, "part of extended tests")
def test_vrrp6_accept_mode_enabled(self):
- """
IPv6 Master VR replies for VIP w/ accept mode on
"""
+ """
IPv6 Master VR replies for VIP w/ accept mode on
"""
# A prio 255 VR cannot be preempted so the prio has to be lower and
# we have to wait for it to take over
# A prio 255 VR cannot be preempted so the prio has to be lower and
# we have to wait for it to take over
@@
-1281,11
+1379,10
@@
class TestVRRP6(VppTestCase):
prio = 100
intvl = self._default_adv
vip = self.pg0.remote_hosts[4].ip6
prio = 100
intvl = self._default_adv
vip = self.pg0.remote_hosts[4].ip6
- flags = (self._default_flags | VRRP_VR_FLAG_ACCEPT)
- vr = VppVRRPVirtualRouter(self, self.pg0, vr_id,
- prio=prio, intvl=intvl,
- flags=flags,
- vips=[vip])
+ flags = self._default_flags | VRRP_VR_FLAG_ACCEPT
+ vr = VppVRRPVirtualRouter(
+ self, self.pg0, vr_id, prio=prio, intvl=intvl, flags=flags, vips=[vip]
+ )
self._vrs.append(vr)
vr.add_vpp_config()
self._vrs.append(vr)
vr.add_vpp_config()
@@
-1303,15
+1400,18
@@
class TestVRRP6(VppTestCase):
vr.assert_state_equals(VRRP_VR_STATE_MASTER)
# send an ICMP echo to the VR virtual IP address
vr.assert_state_equals(VRRP_VR_STATE_MASTER)
# send an ICMP echo to the VR virtual IP address
- echo = (Ether(dst=vr.virtual_mac(), src=self.pg0.remote_mac) /
- IPv6(dst=vip, src=self.pg0.remote_ip6) /
- ICMPv6EchoRequest(seq=1, id=self.pg0.sw_if_index))
+ echo = (
+ Ether(dst=vr.virtual_mac(), src=self.pg0.remote_mac)
+ / IPv6(dst=vip, src=self.pg0.remote_ip6)
+ / ICMPv6EchoRequest(seq=1, id=self.pg0.sw_if_index)
+ )
self.pg_send(self.pg0, [echo])
# wait for an echo reply.
time.sleep(1)
self.pg_send(self.pg0, [echo])
# wait for an echo reply.
time.sleep(1)
- rx_pkts = self.pg0.get_capture(expected_count=1, timeout=1,
- filter_out_fn=is_not_echo_reply)
+ rx_pkts = self.pg0.get_capture(
+ expected_count=1, timeout=1, filter_out_fn=is_not_echo_reply
+ )
self.assertEqual(rx_pkts[0][IPv6].src, vip)
self.assertEqual(rx_pkts[0][IPv6].dst, self.pg0.remote_ip6)
self.assertEqual(rx_pkts[0][IPv6].src, vip)
self.assertEqual(rx_pkts[0][IPv6].dst, self.pg0.remote_ip6)
@@
-1320,17
+1420,22
@@
class TestVRRP6(VppTestCase):
@unittest.skipUnless(config.extended, "part of extended tests")
def test_vrrp6_intf_tracking(self):
@unittest.skipUnless(config.extended, "part of extended tests")
def test_vrrp6_intf_tracking(self):
- """
IPv6 Master VR adjusts priority based on tracked interface
"""
+ """
IPv6 Master VR adjusts priority based on tracked interface
"""
vr_id = 100
prio = 255
intvl = self._default_adv
intvl_s = intvl * 0.01
vip = self.pg0.local_ip6
vr_id = 100
prio = 255
intvl = self._default_adv
intvl_s = intvl * 0.01
vip = self.pg0.local_ip6
- vr = VppVRRPVirtualRouter(self, self.pg0, vr_id,
- prio=prio, intvl=intvl,
- flags=self._default_flags,
- vips=[vip])
+ vr = VppVRRPVirtualRouter(
+ self,
+ self.pg0,
+ vr_id,
+ prio=prio,
+ intvl=intvl,
+ flags=self._default_flags,
+ vips=[vip],
+ )
self._vrs.append(vr)
vr.add_vpp_config()
self._vrs.append(vr)
vr.add_vpp_config()
@@
-1340,9
+1445,9
@@
class TestVRRP6(VppTestCase):
# add pg1 as a tracked interface and start the VR
adjustment = 50
adjusted_prio = prio - adjustment
# add pg1 as a tracked interface and start the VR
adjustment = 50
adjusted_prio = prio - adjustment
- vr.add_del_tracked_interface(
is_add=1,
- sw_if_index=self.pg1.sw_if_index,
-
prio=adjustment
)
+ vr.add_del_tracked_interface(
+ is_add=1, sw_if_index=self.pg1.sw_if_index, prio=adjustment
+ )
vr.start_stop(is_start=1)
vr.assert_state_equals(VRRP_VR_STATE_MASTER)
vr.start_stop(is_start=1)
vr.assert_state_equals(VRRP_VR_STATE_MASTER)
@@
-1351,53
+1456,47
@@
class TestVRRP6(VppTestCase):
# tracked intf is up -> advertised priority == configured priority
self.pg0.enable_capture()
# tracked intf is up -> advertised priority == configured priority
self.pg0.enable_capture()
- rx = self.pg0.wait_for_packet(timeout=intvl_s,
- filter_out_fn=is_not_adv)
+ rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv)
self.assertEqual(rx, adv_configured)
# take down pg1, verify priority is now being adjusted
self.pg1.admin_down()
self.pg0.enable_capture()
self.assertEqual(rx, adv_configured)
# take down pg1, verify priority is now being adjusted
self.pg1.admin_down()
self.pg0.enable_capture()
- rx = self.pg0.wait_for_packet(timeout=intvl_s,
- filter_out_fn=is_not_adv)
+ rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv)
self.assertEqual(rx, adv_adjusted)
# bring up pg1, verify priority now matches configured value
self.pg1.admin_up()
self.pg0.enable_capture()
self.assertEqual(rx, adv_adjusted)
# bring up pg1, verify priority now matches configured value
self.pg1.admin_up()
self.pg0.enable_capture()
- rx = self.pg0.wait_for_packet(timeout=intvl_s,
- filter_out_fn=is_not_adv)
+ rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv)
self.assertEqual(rx, adv_configured)
# remove IP address from pg1, verify priority now being adjusted
self.pg1.unconfig_ip6()
self.pg0.enable_capture()
self.assertEqual(rx, adv_configured)
# remove IP address from pg1, verify priority now being adjusted
self.pg1.unconfig_ip6()
self.pg0.enable_capture()
- rx = self.pg0.wait_for_packet(timeout=intvl_s,
- filter_out_fn=is_not_adv)
+ rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv)
self.assertEqual(rx, adv_adjusted)
# add IP address to pg1, verify priority now matches configured value
self.pg1.config_ip6()
self.pg0.enable_capture()
self.assertEqual(rx, adv_adjusted)
# add IP address to pg1, verify priority now matches configured value
self.pg1.config_ip6()
self.pg0.enable_capture()
- rx = self.pg0.wait_for_packet(timeout=intvl_s,
- filter_out_fn=is_not_adv)
+ rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv)
self.assertEqual(rx, adv_configured)
@unittest.skipUnless(config.extended, "part of extended tests")
def test_vrrp6_master_adv_unicast(self):
self.assertEqual(rx, adv_configured)
@unittest.skipUnless(config.extended, "part of extended tests")
def test_vrrp6_master_adv_unicast(self):
- """
IPv6 Master VR advertises (unicast)
"""
+ """
IPv6 Master VR advertises (unicast)
"""
vr_id = 100
prio = 255
intvl = self._default_adv
intvl_s = intvl * 0.01
vip = self.pg0.local_ip6
vr_id = 100
prio = 255
intvl = self._default_adv
intvl_s = intvl * 0.01
vip = self.pg0.local_ip6
- flags =
(self._default_flags | VRRP_VR_FLAG_UNICAST)
+ flags =
self._default_flags | VRRP_VR_FLAG_UNICAST
unicast_peer = self.pg0.remote_hosts[4]
unicast_peer = self.pg0.remote_hosts[4]
- vr = VppVRRPVirtualRouter(self, self.pg0, vr_id,
- prio=prio, intvl=intvl,
- flags=flags,
- vips=[vip])
+ vr = VppVRRPVirtualRouter(
+ self, self.pg0, vr_id, prio=prio, intvl=intvl, flags=flags, vips=[vip]
+ )
self._vrs.append(vr)
vr.add_vpp_config()
vr.set_unicast_peers([unicast_peer.ip6])
self._vrs.append(vr)
vr.add_vpp_config()
vr.set_unicast_peers([unicast_peer.ip6])
@@
-1410,23
+1509,22
@@
class TestVRRP6(VppTestCase):
vr.assert_state_equals(VRRP_VR_STATE_MASTER)
self.pg0.enable_capture()
vr.assert_state_equals(VRRP_VR_STATE_MASTER)
self.pg0.enable_capture()
- rx = self.pg0.wait_for_packet(timeout=intvl_s,
- filter_out_fn=is_not_adv)
+ rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv)
self.assertTrue(rx.haslayer(Ether))
self.assertTrue(rx.haslayer(IPv6))
self.assertTrue(rx.haslayer(VRRPv3))
self.assertEqual(rx[Ether].src, self.pg0.local_mac)
self.assertEqual(rx[Ether].dst, unicast_peer.mac)
self.assertTrue(rx.haslayer(Ether))
self.assertTrue(rx.haslayer(IPv6))
self.assertTrue(rx.haslayer(VRRPv3))
self.assertEqual(rx[Ether].src, self.pg0.local_mac)
self.assertEqual(rx[Ether].dst, unicast_peer.mac)
- self.assertEqual(
ip6_normalize(rx[IPv6].src),
-
ip6_normalize(self.pg0.local_ip6_ll)
)
- self.assertEqual(ip6_normalize(rx[IPv6].dst),
-
ip6_normalize(unicast_peer.ip6))
+ self.assertEqual(
+
ip6_normalize(rx[IPv6].src), ip6_normalize(self.pg0.local_ip6_ll
)
+ )
+
self.assertEqual(ip6_normalize(rx[IPv6].dst),
ip6_normalize(unicast_peer.ip6))
self.assertEqual(rx[VRRPv3].vrid, vr_id)
self.assertEqual(rx[VRRPv3].priority, prio)
self.assertEqual(rx[VRRPv3].ipcount, 1)
self.assertEqual(rx[VRRPv3].addrlist, [vip])
self.assertEqual(rx[VRRPv3].vrid, vr_id)
self.assertEqual(rx[VRRPv3].priority, prio)
self.assertEqual(rx[VRRPv3].ipcount, 1)
self.assertEqual(rx[VRRPv3].addrlist, [vip])
-if __name__ ==
'__main__'
:
+if __name__ ==
"__main__"
:
unittest.main(testRunner=VppTestRunner)
unittest.main(testRunner=VppTestRunner)