import time
from collections import deque
-from six import moves
+from six import moves, iteritems
from vpp_papi import VPP, mac_pton
from hook import Hook
from vpp_l2 import L2_PORT_TYPE
INFO = 6
DBG = 7
+#
+# Dictionary keyed on message name to override default values for
+# named parameters
+#
+defaultmapping = {'map_add_domain': {'mtu': 1280},
+ 'syslog_set_sender': {'collector_port': 514,
+ 'max_msg_size': 480}}
+
class UnexpectedApiReturnValueError(Exception):
""" exception raised when the API return value is unexpected """
self.test_class.logger.debug("New event: %s: %s" % (name, event))
self._events.append(event)
+ def factory(self, name, apifn):
+ def f(*a, **ka):
+ fields = apifn._func.msg.fields
+
+ # add positional and kw arguments
+ d = ka
+ for i, o in enumerate(fields[3:]):
+ try:
+ d[o] = a[i]
+ except:
+ break
+
+ # Default override
+ if name in defaultmapping:
+ for k, v in iteritems(defaultmapping[name]):
+ if k in d:
+ continue
+ d[k] = v
+ return self.api(apifn, d)
+ return f
+
+ def __getattr__(self, name):
+ try:
+ return getattr(self, name)
+ except:
+ return self.factory(name, getattr(self.papi, name))
+
def connect(self):
"""Connect the API to VPP"""
self.vpp.connect(self.name, self.shm_prefix)
return self.api(self.papi.ip_unnumbered_dump,
{'sw_if_index': sw_if_index})
- def sw_interface_enable_disable_mpls(self, sw_if_index,
- is_enable=1):
+ def sw_interface_set_mpls_enable(self, sw_if_index,
+ is_enable=1):
"""
Enable/Disable MPLS on the interface
:param sw_if_index:
{'sw_if_index': sw_if_index,
'enable': is_enable})
- def sw_interface_ra_suppress(self, sw_if_index, suppress=1):
- return self.api(self.papi.sw_interface_ip6nd_ra_config,
- {'sw_if_index': sw_if_index,
- 'suppress': suppress})
-
def set_ip_flow_hash(self,
table_id,
src=1,
'reverse': reverse,
'is_ipv6': is_ip6})
- def ip6_nd_proxy(self, ip, sw_if_index, is_del=0):
+ def ip6nd_proxy_add_del(self, ip, sw_if_index, is_del=0):
return self.api(self.papi.ip6nd_proxy_add_del,
{'ip': ip,
'sw_if_index': sw_if_index,
'is_del': is_del})
- def ip6_sw_interface_ra_config(self, sw_if_index,
- no,
- suppress,
- send_unicast):
- return self.api(self.papi.sw_interface_ip6nd_ra_config,
- {'sw_if_index': sw_if_index,
- 'is_no': no,
- 'suppress': suppress,
- 'send_unicast': send_unicast})
-
- def ip6_sw_interface_ra_prefix(self,
- sw_if_index,
- address,
- address_length,
- use_default=0,
- no_advertise=0,
- off_link=0,
- no_autoconfig=0,
- no_onlink=0,
- is_no=0,
- val_lifetime=0xffffffff,
- pref_lifetime=0xffffffff):
+ def sw_interface_ip6nd_ra_prefix(self,
+ sw_if_index,
+ address,
+ address_length,
+ use_default=0,
+ no_advertise=0,
+ off_link=0,
+ no_autoconfig=0,
+ no_onlink=0,
+ is_no=0,
+ val_lifetime=0xffffffff,
+ pref_lifetime=0xffffffff):
return self.api(self.papi.sw_interface_ip6nd_ra_prefix,
{'sw_if_index': sw_if_index,
'prefix': {
'val_lifetime': val_lifetime,
'pref_lifetime': pref_lifetime})
- def ip6_sw_interface_enable_disable(self, sw_if_index, enable):
+ def sw_interface_ip6_enable_disable(self, sw_if_index, enable):
"""
Enable/Disable An interface for IPv6
"""
{'enable_disable': enable_disable,
'pid': os.getpid(), })
- def want_macs_learn_events(self, enable_disable=1, scan_delay=0,
- max_macs_in_event=0, learn_limit=0):
+ def want_l2_macs_events(self, enable_disable=1, scan_delay=0,
+ max_macs_in_event=0, learn_limit=0):
return self.api(self.papi.want_l2_macs_events,
{'enable_disable': enable_disable,
'scan_delay': scan_delay,
'tx_sw_if_index': tx_sw_if_index,
'enable': enable})
- def sw_interface_set_l2_tag_rewrite(
+ def l2_interface_vlan_tag_rewrite(
self,
sw_if_index,
vtr_oper,
'tag1': tag1,
'tag2': tag2})
- def sw_interface_set_l2_emulation(
+ def l2_emulation(
self,
sw_if_index,
enable=1):
{'sw_if_index': sw_if_index,
'mtu': mtu})
- def sw_interface_set_promiscuous(self, sw_if_index, enable):
- """
- :param sw_if_index:
- :param enable:
-
- """
- return self.api(self.papi.sw_interface_set_promiscuous,
- {'sw_if_index': sw_if_index,
- 'enable': enable})
-
def sw_interface_set_mac_address(self, sw_if_index, mac):
return self.api(self.papi.sw_interface_set_mac_address,
{'sw_if_index': sw_if_index,
'outer_vlan_id': outer_vlan,
'inner_vlan_id': inner_vlan})
- def create_p2pethernet_subif(self, sw_if_index, remote_mac, subif_id):
+ def p2p_ethernet_add(self, sw_if_index, remote_mac, subif_id):
"""Create p2p ethernet subinterface
:param sw_if_index: main (parent) interface
return self.api(self.papi.delete_subif,
{'sw_if_index': sw_if_index})
- def delete_p2pethernet_subif(self, sw_if_index, remote_mac):
+ def p2p_ethernet_del(self, sw_if_index, remote_mac):
"""Delete p2p ethernet subinterface
:param sw_if_index: main (parent) interface
}
)
- def reset_vrf(self,
- vrf_id,
- is_ipv6=0,
- ):
- """ Reset VRF (remove all routes etc.) request.
-
- :param int vrf_id: ID of the FIB table / VRF to reset.
- :param int is_ipv6: 1 for IPv6 neighbor, 0 for IPv4. (Default = 0)
- """
-
- return self.api(
- self.papi.reset_vrf,
- {'vrf_id': vrf_id,
- 'is_ipv6': is_ipv6,
- }
- )
-
def reset_fib(self,
vrf_id,
is_ipv6=0,
'is_l2': is_l2,
})
- def gre_tunnel_add_del(self,
+ def gre_add_del_tunnel(self,
src_address,
dst_address,
outer_fib_id=0,
def udp_encap_dump(self):
return self.api(self.papi.udp_encap_dump, {})
- def want_udp_encap_stats(self, enable=1):
- return self.api(self.papi.want_udp_encap_stats,
- {'enable': enable,
- 'pid': os.getpid()})
-
def mpls_fib_dump(self):
return self.api(self.papi.mpls_fib_dump, {})
"""
return self.api(self.papi.nat_show_config, {})
- def nat44_add_interface_addr(
+ def nat44_add_del_interface_addr(
self,
sw_if_index,
twice_nat=0,
"""
return self.api(self.papi.nat44_interface_addr_dump, {})
- def nat_ipfix(
+ def nat_ipfix_enable_disable(
self,
domain_id=1,
src_port=4739,
"""
return self.api(self.papi.nat64_bib_dump, {'proto': protocol})
- def nat64_set_timeouts(self, udp=300, icmp=60, tcp_trans=240, tcp_est=7440,
- tcp_incoming_syn=6):
- """Set values of timeouts for NAT64 (in seconds)
-
- :param udpi: UDP timeout (Default value = 300)
- :param icmp: ICMP timeout (Default value = 60)
- :param tcp_trans: TCP transitory timeout (Default value = 240)
- :param tcp_est: TCP established timeout (Default value = 7440)
- :param tcp_incoming_syn: TCP incoming SYN timeout (Default value = 6)
- """
- return self.api(
- self.papi.nat64_set_timeouts,
- {'udp': udp,
- 'icmp': icmp,
- 'tcp_trans': tcp_trans,
- 'tcp_est': tcp_est,
- 'tcp_incoming_syn': tcp_incoming_syn})
-
- def nat64_get_timeouts(self):
- """Get values of timeouts for NAT64
-
- :return: Timeouts for NAT64 (in seconds)
- """
- return self.api(self.papi.nat64_get_timeouts, {})
-
def nat64_st_dump(self, protocol=255):
"""Dump NAT64 session table
"""
return self.api(self.papi.nat64_prefix_dump, {})
- def nat64_add_interface_addr(
+ def nat64_add_del_interface_addr(
self,
sw_if_index,
is_add=1):
def dhcp_client_dump(self):
return self.api(self.papi.dhcp_client_dump, {})
- def dhcp_client(self,
- sw_if_index,
- hostname,
- client_id='',
- is_add=1,
- set_broadcast_flag=1,
- want_dhcp_events=0):
+ def dhcp_client_config(self,
+ sw_if_index,
+ hostname,
+ client_id='',
+ is_add=1,
+ set_broadcast_flag=1,
+ want_dhcp_events=0):
return self.api(
self.papi.dhcp_client_config,
{
'is_en': is_enabled,
})
- def lisp_locator_set(self,
- ls_name,
- is_add=1):
+ def lisp_add_del_locator_set(self,
+ ls_name,
+ is_add=1):
return self.api(
self.papi.lisp_add_del_locator_set,
{
def lisp_locator_set_dump(self):
return self.api(self.papi.lisp_locator_set_dump, {})
- def lisp_locator(self,
- ls_name,
- sw_if_index,
- priority=1,
- weight=1,
- is_add=1):
+ def lisp_add_del_locator(self,
+ ls_name,
+ sw_if_index,
+ priority=1,
+ weight=1,
+ is_add=1):
return self.api(
self.papi.lisp_add_del_locator,
{
'ls_index': ls_index,
})
- def lisp_local_mapping(self,
- ls_name,
- eid_type,
- eid,
- prefix_len,
- vni=0,
- key_id=0,
- key="",
- is_add=1):
+ def lisp_add_del_local_eid(self,
+ ls_name,
+ eid_type,
+ eid,
+ prefix_len,
+ vni=0,
+ key_id=0,
+ key="",
+ is_add=1):
return self.api(
self.papi.lisp_add_del_local_eid,
{
'filter': filter_opt,
})
- def lisp_remote_mapping(self,
- eid_type,
- eid,
- eid_prefix_len=0,
- vni=0,
- rlocs=[],
- rlocs_num=0,
- is_src_dst=0,
- is_add=1):
+ def lisp_add_del_remote_mapping(self,
+ eid_type,
+ eid,
+ eid_prefix_len=0,
+ vni=0,
+ rlocs=[],
+ rlocs_num=0,
+ is_src_dst=0,
+ is_add=1):
return self.api(
self.papi.lisp_add_del_remote_mapping,
{
'is_src_dst': is_src_dst,
})
- def lisp_adjacency(self,
- leid,
- reid,
- leid_len,
- reid_len,
- eid_type,
- is_add=1,
- vni=0):
+ def lisp_add_del_adjacency(self,
+ leid,
+ reid,
+ leid_len,
+ reid_len,
+ eid_type,
+ is_add=1,
+ vni=0):
return self.api(
self.papi.lisp_add_del_adjacency,
{
'vni': vni
})
- def map_add_domain(self,
- ip6_prefix,
- ip6_src,
- ip4_prefix,
- ea_bits_len=0,
- psid_offset=0,
- psid_length=0,
- mtu=1280):
-
- return self.api(
- self.papi.map_add_domain,
- {
- 'ip6_prefix': ip6_prefix,
- 'ip4_prefix': ip4_prefix,
- 'ip6_src': ip6_src,
- 'ea_bits_len': ea_bits_len,
- 'psid_offset': psid_offset,
- 'psid_length': psid_length,
- 'mtu': mtu
- })
-
- def map_if_enable_disable(self, is_enable, sw_if_index, is_translation):
- return self.api(
- self.papi.map_if_enable_disable,
- {
- 'is_enable': is_enable,
- 'sw_if_index': sw_if_index,
- 'is_translation': is_translation,
- })
-
- def map_param_set_tcp(self, tcp_mss):
- return self.api(
- self.papi.map_param_set_tcp,
- {
- 'tcp_mss': tcp_mss,
- })
-
def gtpu_add_del_tunnel(
self,
src_addr,
def ipsec_backend_dump(self):
return self.api(self.papi.ipsec_backend_dump, {})
- def app_namespace_add(self,
- namespace_id,
- ip4_fib_id=0,
- ip6_fib_id=0,
- sw_if_index=0xFFFFFFFF,
- secret=0):
+ def app_namespace_add_del(self,
+ namespace_id,
+ ip4_fib_id=0,
+ ip6_fib_id=0,
+ sw_if_index=0xFFFFFFFF,
+ secret=0):
return self.api(
self.papi.app_namespace_add_del,
{'secret': secret,
{'_no_type_conversion': True})
def gbp_endpoint_group_add(self, epg, sclass, bd,
- rd, uplink_sw_if_index):
+ rd, uplink_sw_if_index,
+ retention):
""" GBP endpoint group Add """
return self.api(self.papi.gbp_endpoint_group_add,
{'epg':
'bd_id': bd,
'rd_id': rd,
'epg_id': epg,
- 'sclass': sclass
+ 'sclass': sclass,
+ 'retention': retention
}})
def gbp_endpoint_group_del(self, epg):
""" GBP contract Dump """
return self.api(self.papi.gbp_contract_dump, {})
- def gbp_endpoint_learn_set_inactive_threshold(self, threshold):
- """ GBP set inactive threshold """
- return self.api(self.papi.gbp_endpoint_learn_set_inactive_threshold,
- {'threshold': threshold})
-
def gbp_vxlan_tunnel_add(self, vni, bd_rd_id, mode):
""" GBP VXLAN tunnel add """
return self.api(self.papi.gbp_vxlan_tunnel_add,
def svs_dump(self):
return self.api(self.papi.svs_dump, {})
-
- def syslog_set_sender(
- self,
- collector,
- src,
- collector_port=514,
- vrf_id=0,
- max_msg_size=480):
- """Set syslog sender configuration
-
- :param collector: colector IP address
- :param src: source IP address
- :param collector_port: collector UDP port (Default value = 514)
- :param vrf_id: VRF id (Default value = 0)
- :param max_msg_size: maximum message length (Default value = 480)
- """
- return self.api(self.papi.syslog_set_sender,
- {'collector_address': collector,
- 'src_address': src,
- 'collector_port': collector_port,
- 'vrf_id': vrf_id,
- 'max_msg_size': max_msg_size})
-
- def syslog_get_sender(self):
- """Return syslog sender configuration"""
- return self.api(self.papi.syslog_get_sender, {})
-
- def syslog_set_filter(self, severity):
- """Set syslog filter parameters
-
- :param severity: severity filter (specified severity and greater match)
- """
- return self.api(self.papi.syslog_set_filter, {'severity': severity})
-
- def syslog_get_filter(self):
- """Return syslog filter parameters"""
- return self.api(self.papi.syslog_get_filter, {})