"""
sw_if_index = InterfaceUtil.get_interface_index(node, interface)
- data = list()
if sw_if_index:
is_ipv6 = 1 if ip_version == 'ipv6' else 0
cmd = 'ip_address_dump'
- cmd_reply = 'ip_address_details'
args = dict(sw_if_index=sw_if_index,
is_ipv6=is_ipv6)
err_msg = 'Failed to get L2FIB dump on host {host}'.format(
host=node['host'])
with PapiExecutor(node) as papi_exec:
- papi_resp = papi_exec.add(cmd, **args).get_dump(err_msg)
-
- for item in papi_resp.reply[0]['api_reply']:
- item[cmd_reply]['ip'] = item[cmd_reply]['prefix'].split('/')[0]
- item[cmd_reply]['prefix_length'] = int(
- item[cmd_reply]['prefix'].split('/')[1])
- item[cmd_reply]['is_ipv6'] = is_ipv6
- item[cmd_reply]['netmask'] = \
+ details = papi_exec.add(cmd, **args).get_details(err_msg)
+
+ for item in details:
+ item['ip'] = item['prefix'].split('/')[0]
+ item['prefix_length'] = int(item['prefix'].split('/')[1])
+ item['is_ipv6'] = is_ipv6
+ item['netmask'] = \
str(IPv6Network(unicode('::/{pl}'.format(
- pl=item[cmd_reply]['prefix_length']))).netmask) \
+ pl=item['prefix_length']))).netmask) \
if is_ipv6 \
else str(IPv4Network(unicode('0.0.0.0/{pl}'.format(
- pl=item[cmd_reply]['prefix_length']))).netmask)
- data.append(item[cmd_reply])
+ pl=item['prefix_length']))).netmask)
- return data
+ return details
@staticmethod
def vpp_get_ip_tables(node):
PapiExecutor.run_cli_cmd(node, 'show ip6 fib')
PapiExecutor.run_cli_cmd(node, 'show ip6 fib summary')
+ @staticmethod
+ def vpp_get_ip_tables_prefix(node, address):
+ """Get dump of all IP FIB tables on a VPP node.
+
+ :param node: VPP node.
+ :type node: dict
+ """
+ addr = ip_address(unicode(address))
+
+ PapiExecutor.run_cli_cmd(
+ node, 'show {ip_ver} fib {addr}/{addr_len}'.format(
+ ip_ver='ip6' if addr.version == 6 else 'ip',
+ addr=addr,
+ addr_len=addr.max_prefixlen))
+
@staticmethod
def get_interface_vrf_table(node, interface, ip_version='ipv4'):
"""Get vrf ID for the given interface.
ifc=interface)
with PapiExecutor(node) as papi_exec:
- papi_resp = papi_exec.add(cmd, **args).get_replies(err_msg). \
- verify_reply(err_msg=err_msg)
+ reply = papi_exec.add(cmd, **args).get_reply(err_msg)
- return papi_resp['vrf_id']
+ return reply['vrf_id']
@staticmethod
def vpp_ip_source_check_setup(node, if_name):
err_msg = 'Failed to enable source check on interface {ifc}'.format(
ifc=if_name)
with PapiExecutor(node) as papi_exec:
- papi_exec.add(cmd, **args).get_replies(err_msg). \
- verify_reply(err_msg=err_msg)
+ papi_exec.add(cmd, **args).get_reply(err_msg)
@staticmethod
def vpp_ip_probe(node, interface, addr):
:type addr: str
"""
cmd = 'ip_probe_neighbor'
- cmd_reply = 'proxy_arp_intfc_enable_disable_reply'
args = dict(
sw_if_index=InterfaceUtil.get_interface_index(node, interface),
dst=str(addr))
dev=interface, ip=addr, h=node['host'])
with PapiExecutor(node) as papi_exec:
- papi_exec.add(cmd, **args).get_replies(err_msg). \
- verify_reply(cmd_reply=cmd_reply, err_msg=err_msg)
+ papi_exec.add(cmd, **args).get_reply(err_msg)
@staticmethod
def ip_addresses_should_be_equal(ip1, ip2):
err_msg = 'Failed to add IP address on interface {ifc}'.format(
ifc=interface)
with PapiExecutor(node) as papi_exec:
- papi_exec.add(cmd, **args).get_replies(err_msg). \
- verify_reply(err_msg=err_msg)
+ papi_exec.add(cmd, **args).get_reply(err_msg)
@staticmethod
def vpp_add_ip_neighbor(node, iface_key, ip_addr, mac_address):
err_msg = 'Failed to add IP neighbor on interface {ifc}'.format(
ifc=iface_key)
with PapiExecutor(node) as papi_exec:
- papi_exec.add(cmd, **args).get_replies(err_msg). \
- verify_reply(err_msg=err_msg)
+ papi_exec.add(cmd, **args).get_reply(err_msg)
@staticmethod
def union_addr(ip_addr):
if count > 100:
gateway = kwargs.get("gateway", '')
-
+ interface = kwargs.get("interface", '')
vrf = kwargs.get("vrf", None)
multipath = kwargs.get("multipath", False)
network=network,
prefix_length=prefix_len,
via='via {}'.format(gateway) if gateway else '',
+ sw_if_index='sw_if_index {}'.format(
+ InterfaceUtil.get_interface_index(node, interface))
+ if interface else '',
vrf='vrf {}'.format(vrf) if vrf else '',
count='count {}'.format(count) if count else '',
multipath='multipath' if multipath else '')
history = False if 1 < i < kwargs.get('count', 1) else True
papi_exec.add(cmd, history=history, **args)
if i > 0 and i % Constants.PAPI_MAX_API_BULK == 0:
- papi_exec.get_replies(err_msg).verify_replies(
- err_msg=err_msg)
- papi_exec.get_replies(err_msg).verify_replies(err_msg=err_msg)
+ papi_exec.get_replies(err_msg)
+ papi_exec.get_replies(err_msg)
@staticmethod
def flush_ip_addresses(node, interface):
err_msg = 'Failed to flush IP address on interface {ifc}'.format(
ifc=interface)
with PapiExecutor(node) as papi_exec:
- papi_exec.add(cmd, **args).get_replies(err_msg). \
- verify_reply(err_msg=err_msg)
+ papi_exec.add(cmd, **args).get_reply(err_msg)
@staticmethod
def add_fib_table(node, table_id, ipv6=False):
err_msg = 'Failed to add FIB table on host {host}'.format(
host=node['host'])
with PapiExecutor(node) as papi_exec:
- papi_exec.add(cmd, **args).get_replies(err_msg). \
- verify_reply(err_msg=err_msg)
+ papi_exec.add(cmd, **args).get_reply(err_msg)