5 from vpp_object import VppObject
6 from vpp_ip import VppIpPrefix
7 from vpp_papi import VppEnum
10 def get_if_dump(dump, sw_if_index):
12 if (d.sw_if_index == sw_if_index):
16 def query_all_memif_vpp_config(_test):
17 return _test.vapi.memif_dump()
20 def remove_all_memif_vpp_config(_test):
21 dump = _test.vapi.memif_dump()
23 _test.vapi.memif_delete(d.sw_if_index)
24 dump = _test.vapi.memif_socket_filename_dump()
27 _test.vapi.memif_socket_filename_add_del(
28 0, d.socket_id, d.socket_filename)
31 class VppSocketFilename(VppObject):
32 def __init__(self, test, socket_id, socket_filename,
33 add_default_folder=False):
35 self.socket_id = socket_id
36 self.socket_filename = socket_filename
38 # if True insert default socket folder before socket filename,
39 # after adding vpp config
40 self.add_default_folder = add_default_folder
42 def add_vpp_config(self):
43 rv = self._test.vapi.memif_socket_filename_add_del(
44 1, self.socket_id, self.socket_filename)
45 if self.add_default_folder:
46 self.socket_filename = b"%s/%s" % (
47 six.ensure_binary(self._test.tempdir, encoding='utf-8'),
51 def remove_vpp_config(self):
52 return self._test.vapi.memif_socket_filename_add_del(
53 0, self.socket_id, self.socket_filename)
55 def query_vpp_config(self):
56 return self._test.vapi.memif_socket_filename_dump()
59 return "socket-filename-%d-%s" % (self.socket_id, self.socket_filename)
62 class VppMemif(VppObject):
63 def __init__(self, test, role, mode, rx_queues=0, tx_queues=0, if_id=0,
64 socket_id=0, secret="", ring_size=0, buffer_size=0,
69 self.rx_queues = rx_queues
70 self.tx_queues = tx_queues
72 self.socket_id = socket_id
74 self.ring_size = ring_size
75 self.buffer_size = buffer_size
76 self.hw_addr = hw_addr
77 self.sw_if_index = None
78 self.ip_prefix = VppIpPrefix("192.168.%d.%d" %
79 (self.if_id + 1, self.role + 1), 24)
81 def add_vpp_config(self):
82 rv = self._test.vapi.memif_create(
85 rx_queues=self.rx_queues,
86 tx_queues=self.tx_queues,
88 socket_id=self.socket_id,
90 ring_size=self.ring_size,
91 buffer_size=self.buffer_size,
95 except AttributeError:
96 raise AttributeError('self: %s' % self.__dict__)
98 self.sw_if_index = rv.sw_if_index
99 except AttributeError:
100 raise AttributeError("%s %s", self, rv)
102 return self.sw_if_index
106 return self._test.vapi.sw_interface_set_flags(
107 sw_if_index=self.sw_if_index, flags=1)
109 def admin_down(self):
111 return self._test.vapi.sw_interface_set_flags(
112 sw_if_index=self.sw_if_index, flags=0)
114 def wait_for_link_up(self, timeout, step=1):
115 if not self.sw_if_index:
118 dump = self.query_vpp_config()
119 f = VppEnum.vl_api_if_status_flags_t.IF_STATUS_API_FLAG_LINK_UP
122 self._test.sleep(step)
127 def config_ip4(self):
128 return self._test.vapi.sw_interface_add_del_address(
129 sw_if_index=self.sw_if_index, prefix=self.ip_prefix.encode())
131 def remove_vpp_config(self):
132 self._test.vapi.memif_delete(self.sw_if_index)
133 self.sw_if_index = None
135 def query_vpp_config(self):
136 if not self.sw_if_index:
138 dump = self._test.vapi.memif_dump()
139 return get_if_dump(dump, self.sw_if_index)
143 return "%d:%d:%d" % (self.role, self.if_id, self.sw_if_index)
145 return "%d:%d:None" % (self.role, self.if_id)