5 from vpp_object import VppObject
6 from vpp_ip import VppIpPrefix
7 from vpp_papi import VppEnum
21 def get_if_dump(dump, sw_if_index):
23 if (d.sw_if_index == sw_if_index):
27 def query_all_memif_vpp_config(_test):
28 return _test.vapi.memif_dump()
31 def remove_all_memif_vpp_config(_test):
32 dump = _test.vapi.memif_dump()
34 _test.vapi.memif_delete(d.sw_if_index)
35 dump = _test.vapi.memif_socket_filename_dump()
38 _test.vapi.memif_socket_filename_add_del(
39 0, d.socket_id, d.socket_filename)
42 class VppSocketFilename(VppObject):
43 def __init__(self, test, socket_id, socket_filename,
44 add_default_folder=False):
46 self.socket_id = socket_id
47 self.socket_filename = socket_filename
49 # if True insert default socket folder before socket filename,
50 # after adding vpp config
51 self.add_default_folder = add_default_folder
53 def add_vpp_config(self):
54 rv = self._test.vapi.memif_socket_filename_add_del(
55 1, self.socket_id, self.socket_filename)
56 if self.add_default_folder:
57 self.socket_filename = b"%s/%s" % (
58 six.ensure_binary(self._test.tempdir, encoding='utf-8'),
62 def remove_vpp_config(self):
63 return self._test.vapi.memif_socket_filename_add_del(
64 0, self.socket_id, self.socket_filename)
66 def query_vpp_config(self):
67 return self._test.vapi.memif_socket_filename_dump()
70 return "socket-filename-%d-%s" % (self.socket_id, self.socket_filename)
73 class VppMemif(VppObject):
74 def __init__(self, test, role, mode, rx_queues=0, tx_queues=0, if_id=0,
75 socket_id=0, secret="", ring_size=0, buffer_size=0,
80 self.rx_queues = rx_queues
81 self.tx_queues = tx_queues
83 self.socket_id = socket_id
85 self.ring_size = ring_size
86 self.buffer_size = buffer_size
87 self.hw_addr = hw_addr
88 self.sw_if_index = None
89 self.ip_prefix = VppIpPrefix("192.168.%d.%d" %
90 (self.if_id + 1, self.role + 1), 24)
92 def add_vpp_config(self):
93 rv = self._test.vapi.memif_create(
96 rx_queues=self.rx_queues,
97 tx_queues=self.tx_queues,
99 socket_id=self.socket_id,
101 ring_size=self.ring_size,
102 buffer_size=self.buffer_size,
103 hw_addr=self.hw_addr)
106 except AttributeError:
107 raise AttributeError('self: %s' % self.__dict__)
109 self.sw_if_index = rv.sw_if_index
110 except AttributeError:
111 raise AttributeError("%s %s", self, rv)
113 return self.sw_if_index
117 return self._test.vapi.sw_interface_set_flags(
118 sw_if_index=self.sw_if_index, flags=1)
120 def admin_down(self):
122 return self._test.vapi.sw_interface_set_flags(
123 sw_if_index=self.sw_if_index, flags=0)
125 def wait_for_link_up(self, timeout, step=1):
126 if not self.sw_if_index:
129 dump = self.query_vpp_config()
130 if dump.link_up_down == 1:
132 self._test.sleep(step)
137 def config_ip4(self):
138 return self._test.vapi.sw_interface_add_del_address(
139 sw_if_index=self.sw_if_index, prefix=self.ip_prefix.encode())
141 def remove_vpp_config(self):
142 self._test.vapi.memif_delete(self.sw_if_index)
143 self.sw_if_index = None
145 def query_vpp_config(self):
146 if not self.sw_if_index:
148 dump = self._test.vapi.memif_dump()
149 return get_if_dump(dump, self.sw_if_index)
153 return "%d:%d:%d" % (self.role, self.if_id, self.sw_if_index)
155 return "%d:%d:None" % (self.role, self.if_id)