3 from vpp_object import VppObject
17 def get_if_dump(dump, sw_if_index):
19 if (d.sw_if_index == sw_if_index):
23 def query_all_memif_vpp_config(_test):
24 return _test.vapi.memif_dump()
27 def remove_all_memif_vpp_config(_test):
28 dump = _test.vapi.memif_dump()
30 _test.vapi.memif_delete(d.sw_if_index)
31 dump = _test.vapi.memif_socket_filename_dump()
34 _test.vapi.memif_socket_filename_add_del(
35 0, d.socket_id, d.socket_filename)
38 class VppSocketFilename(VppObject):
39 def __init__(self, test, socket_id, socket_filename,
40 add_default_folder=False):
42 self.socket_id = socket_id
43 self.socket_filename = socket_filename
45 # if True insert default socket folder before socket filename,
46 # after adding vpp config
47 self.add_default_folder = add_default_folder
49 def add_vpp_config(self):
50 rv = self._test.vapi.memif_socket_filename_add_del(
51 1, self.socket_id, self.socket_filename)
52 if self.add_default_folder:
53 self.socket_filename = self._test.tempdir + "/" \
54 + self.socket_filename
57 def remove_vpp_config(self):
58 return self._test.vapi.memif_socket_filename_add_del(
59 0, self.socket_id, self.socket_filename)
61 def query_vpp_config(self):
62 return self._test.vapi.memif_socket_filename_dump()
65 return self.object_id()
68 return "%d" % (self.socket_id)
71 class VppMemif(VppObject):
72 def __init__(self, test, role, mode, rx_queues=0, tx_queues=0, if_id=0,
73 socket_id=0, secret="", ring_size=0, buffer_size=0,
78 self.rx_queues = rx_queues
79 self.tx_queues = tx_queues
81 self.socket_id = socket_id
83 self.ring_size = ring_size
84 self.buffer_size = buffer_size
85 self.hw_addr = hw_addr
86 self.sw_if_index = None
87 self.ip4_addr = "192.168.%d.%d" % (self.if_id + 1, self.role + 1)
88 self.ip4_addr_len = 24
90 def add_vpp_config(self):
91 rv = self._test.vapi.memif_create(self.role, self.mode, self.rx_queues,
92 self.tx_queues, self.if_id,
93 self.socket_id, self.secret,
94 self.ring_size, self.buffer_size,
96 self.sw_if_index = rv.sw_if_index
97 return self.sw_if_index
101 return self._test.vapi.sw_interface_set_flags(self.sw_if_index, 1)
103 def admin_down(self):
105 return self._test.vapi.sw_interface_set_flags(self.sw_if_index, 0)
107 def wait_for_link_up(self, timeout, step=1):
108 if not self.sw_if_index:
111 dump = self.query_vpp_config()
112 if dump.link_up_down == 1:
114 self._test.sleep(step)
119 def config_ip4(self):
120 return self._test.vapi.sw_interface_add_del_address(
121 self.sw_if_index, socket.inet_pton(
122 socket.AF_INET, self.ip4_addr), self.ip4_addr_len)
124 def remove_vpp_config(self):
125 self._test.vapi.memif_delete(self.sw_if_index)
126 self.sw_if_index = None
128 def query_vpp_config(self):
129 if not self.sw_if_index:
131 dump = self._test.vapi.memif_dump()
132 return get_if_dump(dump, self.sw_if_index)
135 return self.object_id()
139 return "%d:%d:%d" % (self.role, self.if_id, self.sw_if_index)
141 return "%d:%d:None" % (self.role, self.if_id)