import socket
-
-import six
+from ipaddress import IPv4Network
from vpp_object import VppObject
-
-
-class MEMIF_ROLE:
- MASTER = 0
- SLAVE = 1
-
-
-class MEMIF_MODE:
- ETHERNET = 0
- IP = 1
- PUNT_INJECT = 2
+from vpp_papi import VppEnum
def get_if_dump(dump, sw_if_index):
rv = self._test.vapi.memif_socket_filename_add_del(
1, self.socket_id, self.socket_filename)
if self.add_default_folder:
- self.socket_filename = b"%s/%s" % (
- six.ensure_binary(self._test.tempdir, encoding='utf-8'),
- self.socket_filename)
+ self.socket_filename = "%s/%s" % (self._test.tempdir,
+ self.socket_filename)
return rv
def remove_vpp_config(self):
def query_vpp_config(self):
return self._test.vapi.memif_socket_filename_dump()
- def __str__(self):
- return self.object_id()
-
def object_id(self):
- return "%d" % (self.socket_id)
+ return "socket-filename-%d-%s" % (self.socket_id, self.socket_filename)
class VppMemif(VppObject):
self.buffer_size = buffer_size
self.hw_addr = hw_addr
self.sw_if_index = None
- self.ip4_addr = "192.168.%d.%d" % (self.if_id + 1, self.role + 1)
- self.ip4_addr_len = 24
+ self.ip_prefix = IPv4Network("192.168.%d.%d/24" %
+ (self.if_id + 1, self.role + 1),
+ strict=False)
def add_vpp_config(self):
- rv = self._test.vapi.memif_create(self.role, self.mode, self.rx_queues,
- self.tx_queues, self.if_id,
- self.socket_id, self.secret,
- self.ring_size, self.buffer_size,
- self.hw_addr)
- self.sw_if_index = rv.sw_if_index
+ rv = self._test.vapi.memif_create(
+ role=self.role,
+ mode=self.mode,
+ rx_queues=self.rx_queues,
+ tx_queues=self.tx_queues,
+ id=self.if_id,
+ socket_id=self.socket_id,
+ secret=self.secret,
+ ring_size=self.ring_size,
+ buffer_size=self.buffer_size,
+ hw_addr=self.hw_addr)
+ try:
+ self.sw_if_index = rv.sw_if_index
+ except AttributeError:
+ # rv doesn't have .sw_if_index attribute
+ raise AttributeError("%s %s" % (self, rv))
+
return self.sw_if_index
def admin_up(self):
if self.sw_if_index:
- return self._test.vapi.sw_interface_set_flags(self.sw_if_index, 1)
+ return self._test.vapi.sw_interface_set_flags(
+ sw_if_index=self.sw_if_index, flags=1)
def admin_down(self):
if self.sw_if_index:
- return self._test.vapi.sw_interface_set_flags(self.sw_if_index, 0)
+ return self._test.vapi.sw_interface_set_flags(
+ sw_if_index=self.sw_if_index, flags=0)
def wait_for_link_up(self, timeout, step=1):
if not self.sw_if_index:
return False
while True:
dump = self.query_vpp_config()
- if dump.link_up_down == 1:
+ f = VppEnum.vl_api_if_status_flags_t.IF_STATUS_API_FLAG_LINK_UP
+ if dump.flags & f:
return True
self._test.sleep(step)
timeout -= step
def config_ip4(self):
return self._test.vapi.sw_interface_add_del_address(
- sw_if_index=self.sw_if_index, address=socket.inet_pton(
- socket.AF_INET, self.ip4_addr),
- address_length=self.ip4_addr_len)
+ sw_if_index=self.sw_if_index, prefix=self.ip_prefix)
def remove_vpp_config(self):
self._test.vapi.memif_delete(self.sw_if_index)
dump = self._test.vapi.memif_dump()
return get_if_dump(dump, self.sw_if_index)
- def __str__(self):
- return self.object_id()
-
def object_id(self):
if self.sw_if_index:
return "%d:%d:%d" % (self.role, self.if_id, self.sw_if_index)