class FibPathFlags(IntEnum):
"""FIB path flags."""
FIB_PATH_FLAG_NONE = 0
- FIB_PATH_FLAG_RESOLVE_VIA_ATTACHED = 1
+ FIB_PATH_FLAG_RESOLVE_VIA_ATTACHED = 1 #pylint: disable=invalid-name
FIB_PATH_FLAG_RESOLVE_VIA_HOST = 2
"""Get dump of all IP FIB tables on a VPP node.
:param node: VPP node.
+ :param address: IP address.
:type node: dict
+ :type address: str
"""
addr = ip_address(unicode(address))
"""
sw_if_index = InterfaceUtil.get_interface_index(node, interface)
- is_ipv6 = 1 if ip_version == 'ipv6' else 0
-
cmd = 'sw_interface_get_table'
- args = dict(sw_if_index=sw_if_index,
- is_ipv6=is_ipv6)
+ args = dict(
+ sw_if_index=sw_if_index,
+ is_ipv6=True if ip_version == 'ipv6' else False
+ )
err_msg = 'Failed to get VRF id assigned to interface {ifc}'.format(
ifc=interface)
args = dict(
sw_if_index=InterfaceUtil.get_interface_index(node, if_name),
is_add=1,
- loose=0)
+ loose=0
+ )
err_msg = 'Failed to enable source check on interface {ifc}'.format(
ifc=if_name)
with PapiSocketExecutor(node) as papi_exec:
cmd = 'sw_interface_add_del_address'
args = dict(
sw_if_index=InterfaceUtil.get_interface_index(node, interface),
- is_add=1,
- is_ipv6=1 if ip_addr.version == 6 else 0,
- del_all=0,
- address_length=int(prefix_length) if prefix_length else 128
- if ip_addr.version == 6 else 32,
- address=ip_addr.packed)
+ is_add=True,
+ del_all=False,
+ prefix=IPUtil.create_prefix_object(
+ ip_addr,
+ prefix_length if prefix_length else 128
+ if ip_addr.version == 6 else 32)
+ )
err_msg = 'Failed to add IP address on interface {ifc}'.format(
ifc=interface)
with PapiSocketExecutor(node) as papi_exec:
else 'ADDRESS_IP4').value,
un=IPUtil.union_addr(ip_addr))
+ @staticmethod
+ def create_prefix_object(ip_addr, addr_len):
+ """Create prefix object.
+
+ :param ip_addr: IPv4 or IPv6 address.
+ :para, addr_len: Length of IP address.
+ :type ip_addr: IPv4Address or IPv6Address
+ :type addr_len: int
+ :returns: Prefix object.
+ :rtype: dict
+ """
+ addr = IPUtil.create_ip_address_object(ip_addr)
+
+ return dict(
+ len=int(addr_len),
+ address=addr
+ )
+
@staticmethod
def compose_vpp_route_structure(node, network, prefix_len, **kwargs):
"""Create route object for ip_route_add_del api call.
net_addr = ip_address(unicode(network))
- addr = IPUtil.create_ip_address_object(net_addr)
- prefix = dict(
- len=int(prefix_len),
- address=addr)
+ prefix = IPUtil.create_prefix_object(net_addr, prefix_len)
paths = list()
n_hop = dict(
address=IPUtil.union_addr(ip_address(unicode(gateway))) if gateway
else 0,
via_label=MPLS_LABEL_INVALID,
- obj_id=Constants.BITWISE_NON_ZERO)
+ obj_id=Constants.BITWISE_NON_ZERO
+ )
path = dict(
sw_if_index=InterfaceUtil.get_interface_index(node, interface)
if interface else Constants.BITWISE_NON_ZERO,
else 'FIB_PATH_NH_PROTO_IP4').value,
nh=n_hop,
n_labels=0,
- label_stack=list(0 for _ in range(16)))
+ label_stack=list(0 for _ in range(16))
+ )
paths.append(path)
route = dict(
table_id=int(kwargs.get('vrf', 0)),
prefix=prefix,
n_paths=len(paths),
- paths=paths)
+ paths=paths
+ )
return route
@staticmethod
net_addr = ip_address(unicode(network))
cmd = 'ip_route_add_del'
- route = IPUtil.compose_vpp_route_structure(
- node, network, prefix_len, **kwargs)
args = dict(
is_add=1,
is_multipath=int(kwargs.get('multipath', False)),
- route=route)
+ route=None
+ )
err_msg = 'Failed to add route(s) on host {host}'.format(
host=node['host'])
with PapiSocketExecutor(node) as papi_exec:
for i in xrange(kwargs.get('count', 1)):
- args['route']['prefix']['address']['un'] = \
- IPUtil.union_addr(net_addr + i)
+ args['route'] = IPUtil.compose_vpp_route_structure(
+ node, net_addr + i, prefix_len, **kwargs)
history = False if 1 < i < kwargs.get('count', 1) else True
papi_exec.add(cmd, history=history, **args)
papi_exec.get_replies(err_msg)
cmd = 'sw_interface_add_del_address'
args = dict(
sw_if_index=InterfaceUtil.get_interface_index(node, interface),
- del_all=1)
+ is_add=False,
+ del_all=True
+ )
err_msg = 'Failed to flush IP address on interface {ifc}'.format(
ifc=interface)
with PapiSocketExecutor(node) as papi_exec: