3 from vpp_object import VppObject
17 def get_if_dump(dump, sw_if_index):
19 if (d.sw_if_index == sw_if_index):
23 def query_all_memif_vpp_config(_test):
24 return _test.vapi.memif_dump()
27 def remove_all_memif_vpp_config(_test):
28 dump = _test.vapi.memif_dump()
30 _test.vapi.memif_delete(d.sw_if_index)
31 dump = _test.vapi.memif_socket_filename_dump()
34 _test.vapi.memif_socket_filename_add_del(
35 0, d.socket_id, d.socket_filename)
38 class VppSocketFilename(VppObject):
39 def __init__(self, test, socket_id, socket_filename,
40 add_default_folder=False):
42 self.socket_id = socket_id
43 self.socket_filename = socket_filename
45 # if True insert default socket folder before socket filename,
46 # after adding vpp config
47 self.add_default_folder = add_default_folder
49 def add_vpp_config(self):
50 rv = self._test.vapi.memif_socket_filename_add_del(
51 1, self.socket_id, self.socket_filename)
52 if self.add_default_folder:
53 self.socket_filename = "/run/vpp/" + self.socket_filename
56 def remove_vpp_config(self):
57 return self._test.vapi.memif_socket_filename_add_del(
58 0, self.socket_id, self.socket_filename)
60 def query_vpp_config(self):
61 return self._test.vapi.memif_socket_filename_dump()
64 return self.object_id()
67 return "%d" % (self.socket_id)
70 class VppMemif(VppObject):
71 def __init__(self, test, role, mode, rx_queues=0, tx_queues=0, if_id=0,
72 socket_id=0, secret="", ring_size=0, buffer_size=0,
77 self.rx_queues = rx_queues
78 self.tx_queues = tx_queues
80 self.socket_id = socket_id
82 self.ring_size = ring_size
83 self.buffer_size = buffer_size
84 self.hw_addr = hw_addr
85 self.sw_if_index = None
86 self.ip4_addr = "192.168.%d.%d" % (self.if_id + 1, self.role + 1)
87 self.ip4_addr_len = 24
89 def add_vpp_config(self):
90 rv = self._test.vapi.memif_create(self.role, self.mode, self.rx_queues,
91 self.tx_queues, self.if_id,
92 self.socket_id, self.secret,
93 self.ring_size, self.buffer_size,
95 self.sw_if_index = rv.sw_if_index
96 return self.sw_if_index
100 return self._test.vapi.sw_interface_set_flags(self.sw_if_index, 1)
102 def admin_down(self):
104 return self._test.vapi.sw_interface_set_flags(self.sw_if_index, 0)
106 def wait_for_link_up(self, timeout, step=1):
107 if not self.sw_if_index:
110 dump = self.query_vpp_config()
111 if dump.link_up_down == 1:
113 self._test.sleep(step)
118 def config_ip4(self):
119 return self._test.vapi.sw_interface_add_del_address(
120 self.sw_if_index, socket.inet_pton(
121 socket.AF_INET, self.ip4_addr), self.ip4_addr_len)
123 def remove_vpp_config(self):
124 self._test.vapi.memif_delete(self.sw_if_index)
125 self.sw_if_index = None
127 def query_vpp_config(self):
128 if not self.sw_if_index:
130 dump = self._test.vapi.memif_dump()
131 return get_if_dump(dump, self.sw_if_index)
134 return self.object_id()
138 return "%d:%d:%d" % (self.role, self.if_id, self.sw_if_index)
140 return "%d:%d:None" % (self.role, self.if_id)