5 from vpp_object import VppObject
19 def get_if_dump(dump, sw_if_index):
21 if (d.sw_if_index == sw_if_index):
25 def query_all_memif_vpp_config(_test):
26 return _test.vapi.memif_dump()
29 def remove_all_memif_vpp_config(_test):
30 dump = _test.vapi.memif_dump()
32 _test.vapi.memif_delete(d.sw_if_index)
33 dump = _test.vapi.memif_socket_filename_dump()
36 _test.vapi.memif_socket_filename_add_del(
37 0, d.socket_id, d.socket_filename)
40 class VppSocketFilename(VppObject):
41 def __init__(self, test, socket_id, socket_filename,
42 add_default_folder=False):
44 self.socket_id = socket_id
45 self.socket_filename = socket_filename
47 # if True insert default socket folder before socket filename,
48 # after adding vpp config
49 self.add_default_folder = add_default_folder
51 def add_vpp_config(self):
52 rv = self._test.vapi.memif_socket_filename_add_del(
53 1, self.socket_id, self.socket_filename)
54 if self.add_default_folder:
55 self.socket_filename = b"%s/%s" % (
56 six.ensure_binary(self._test.tempdir, encoding='utf-8'),
60 def remove_vpp_config(self):
61 return self._test.vapi.memif_socket_filename_add_del(
62 0, self.socket_id, self.socket_filename)
64 def query_vpp_config(self):
65 return self._test.vapi.memif_socket_filename_dump()
68 return "socket-filename-%d-%s" % (self.socket_id, self.socket_filename)
71 class VppMemif(VppObject):
72 def __init__(self, test, role, mode, rx_queues=0, tx_queues=0, if_id=0,
73 socket_id=0, secret="", ring_size=0, buffer_size=0,
78 self.rx_queues = rx_queues
79 self.tx_queues = tx_queues
81 self.socket_id = socket_id
83 self.ring_size = ring_size
84 self.buffer_size = buffer_size
85 self.hw_addr = hw_addr
86 self.sw_if_index = None
87 self.ip4_addr = "192.168.%d.%d" % (self.if_id + 1, self.role + 1)
88 self.ip4_addr_len = 24
90 def add_vpp_config(self):
91 rv = self._test.vapi.memif_create(
94 rx_queues=self.rx_queues,
95 tx_queues=self.tx_queues,
97 socket_id=self.socket_id,
99 ring_size=self.ring_size,
100 buffer_size=self.buffer_size,
101 hw_addr=self.hw_addr)
104 except AttributeError:
105 raise AttributeError('self: %s' % self.__dict__)
107 self.sw_if_index = rv.sw_if_index
108 except AttributeError:
109 raise AttributeError("%s %s", self, rv)
111 return self.sw_if_index
115 return self._test.vapi.sw_interface_set_flags(self.sw_if_index, 1)
117 def admin_down(self):
119 return self._test.vapi.sw_interface_set_flags(self.sw_if_index, 0)
121 def wait_for_link_up(self, timeout, step=1):
122 if not self.sw_if_index:
125 dump = self.query_vpp_config()
126 if dump.link_up_down == 1:
128 self._test.sleep(step)
133 def config_ip4(self):
134 return self._test.vapi.sw_interface_add_del_address(
135 sw_if_index=self.sw_if_index, address=socket.inet_pton(
136 socket.AF_INET, self.ip4_addr),
137 address_length=self.ip4_addr_len)
139 def remove_vpp_config(self):
140 self._test.vapi.memif_delete(self.sw_if_index)
141 self.sw_if_index = None
143 def query_vpp_config(self):
144 if not self.sw_if_index:
146 dump = self._test.vapi.memif_dump()
147 return get_if_dump(dump, self.sw_if_index)
151 return "%d:%d:%d" % (self.role, self.if_id, self.sw_if_index)
153 return "%d:%d:None" % (self.role, self.if_id)