2 from ipaddress import IPv4Network
4 from vpp_object import VppObject
5 from vpp_papi import VppEnum
8 def get_if_dump(dump, sw_if_index):
10 if d.sw_if_index == sw_if_index:
14 def query_all_memif_vpp_config(_test):
15 return _test.vapi.memif_dump()
18 def remove_all_memif_vpp_config(_test):
19 dump = _test.vapi.memif_dump()
21 _test.vapi.memif_delete(d.sw_if_index)
22 dump = _test.vapi.memif_socket_filename_dump()
25 _test.vapi.memif_socket_filename_add_del(0, d.socket_id, d.socket_filename)
28 class VppSocketFilename(VppObject):
29 def __init__(self, test, socket_id, socket_filename, add_default_folder=False):
31 self.socket_id = socket_id
32 self.socket_filename = socket_filename
34 # if True insert default socket folder before socket filename,
35 # after adding vpp config
36 self.add_default_folder = add_default_folder
38 def add_vpp_config(self):
39 rv = self._test.vapi.memif_socket_filename_add_del(
40 1, self.socket_id, self.socket_filename
42 if self.add_default_folder:
43 self.socket_filename = "%s/%s" % (self._test.tempdir, self.socket_filename)
46 def remove_vpp_config(self):
47 return self._test.vapi.memif_socket_filename_add_del(
48 0, self.socket_id, self.socket_filename
51 def query_vpp_config(self):
52 return self._test.vapi.memif_socket_filename_dump()
55 return "socket-filename-%d-%s" % (self.socket_id, self.socket_filename)
58 class VppMemif(VppObject):
76 self.rx_queues = rx_queues
77 self.tx_queues = tx_queues
79 self.socket_id = socket_id
81 self.ring_size = ring_size
82 self.buffer_size = buffer_size
83 self.hw_addr = hw_addr
84 self.sw_if_index = None
85 self.ip_prefix = IPv4Network(
86 "192.168.%d.%d/24" % (self.if_id + 1, self.role + 1), strict=False
89 def add_vpp_config(self):
90 rv = self._test.vapi.memif_create(
93 rx_queues=self.rx_queues,
94 tx_queues=self.tx_queues,
96 socket_id=self.socket_id,
98 ring_size=self.ring_size,
99 buffer_size=self.buffer_size,
100 hw_addr=self.hw_addr,
103 self.sw_if_index = rv.sw_if_index
104 except AttributeError:
105 # rv doesn't have .sw_if_index attribute
106 raise AttributeError("%s %s" % (self, rv))
108 return self.sw_if_index
112 return self._test.vapi.sw_interface_set_flags(
113 sw_if_index=self.sw_if_index, flags=1
116 def admin_down(self):
118 return self._test.vapi.sw_interface_set_flags(
119 sw_if_index=self.sw_if_index, flags=0
122 def wait_for_link_up(self, timeout, step=1):
123 if not self.sw_if_index:
126 dump = self.query_vpp_config()
127 f = VppEnum.vl_api_if_status_flags_t.IF_STATUS_API_FLAG_LINK_UP
130 self._test.sleep(step)
135 def config_ip4(self):
136 return self._test.vapi.sw_interface_add_del_address(
137 sw_if_index=self.sw_if_index, prefix=self.ip_prefix
140 def remove_vpp_config(self):
141 self._test.vapi.memif_delete(self.sw_if_index)
142 self.sw_if_index = None
144 def query_vpp_config(self):
145 if not self.sw_if_index:
147 dump = self._test.vapi.memif_dump()
148 return get_if_dump(dump, self.sw_if_index)
152 return "%d:%d:%d" % (self.role, self.if_id, self.sw_if_index)
154 return "%d:%d:None" % (self.role, self.if_id)