import os
-import socket
import fnmatch
import time
from hook import Hook
for filename in fnmatch.filter(filenames, '*.api.json'):
jsonfiles.append(os.path.join(root, filename))
- self.papi = VPP(jsonfiles)
+ self.vpp = VPP(jsonfiles)
self._events = deque()
def __enter__(self):
def connect(self):
"""Connect the API to VPP"""
- self.papi.connect(self.name, self.shm_prefix)
- self.papi.register_event_callback(self)
+ self.vpp.connect(self.name, self.shm_prefix)
+ self.papi = self.vpp.api
+ self.vpp.register_event_callback(self)
def disconnect(self):
"""Disconnect the API from VPP"""
- self.papi.disconnect()
+ self.vpp.disconnect()
def api(self, api_fn, api_args, expected_retval=0):
""" Call API function and check it's return value.
def show_version(self):
""" """
- return self.papi.show_version()
+ return self.api(self.papi.show_version, {})
def pg_create_interface(self, pg_index):
"""
'suppress': suppress,
'send_unicast': send_unicast})
+ def ip6_sw_interface_ra_prefix(self,
+ sw_if_index,
+ address,
+ address_length,
+ use_default=0,
+ no_advertise=0,
+ off_link=0,
+ no_autoconfig=0,
+ no_onlink=0,
+ is_no=0,
+ val_lifetime=0xffffffff,
+ pref_lifetime=0xffffffff):
+ return self.api(self.papi.sw_interface_ip6nd_ra_prefix,
+ {'sw_if_index': sw_if_index,
+ 'address': address,
+ 'address_length': address_length,
+ 'use_default': use_default,
+ 'no_advertise': no_advertise,
+ 'off_link': off_link,
+ 'no_autoconfig': no_autoconfig,
+ 'no_onlink': no_onlink,
+ 'is_no': is_no,
+ 'val_lifetime': val_lifetime,
+ 'pref_lifetime': pref_lifetime})
+
def ip6_sw_interface_enable_disable(self, sw_if_index, enable):
"""
Enable/Disable An interface for IPv6
'src_port': src_port,
'enable': enable})
+ def snat_user_session_dump(
+ self,
+ ip_address,
+ vrf_id):
+ """Dump S-NAT user's sessions
+
+ :param ip_address: ip adress of the user to be dumped
+ :param cpu_index: cpu_index on which the user is
+ :param vrf_id: VRF ID
+ :return: Dictionary of S-NAT sessions
+ """
+ return self.api(
+ self.papi.snat_user_session_dump,
+ {'ip_address': ip_address,
+ 'vrf_id': vrf_id})
+
+ def snat_user_dump(self):
+ """Dump S-NAT users
+
+ :return: Dictionary of S-NAT users
+ """
+ return self.api(self.papi.snat_user_dump, {})
+
def control_ping(self):
self.api(self.papi.control_ping)