nat: nat44-ed add session timing out indicator in api
[vpp.git] / test / vpp_memif.py
1 import socket
2 from ipaddress import IPv4Network
3
4 from vpp_object import VppObject
5 from vpp_papi import VppEnum
6
7
8 def get_if_dump(dump, sw_if_index):
9     for d in dump:
10         if (d.sw_if_index == sw_if_index):
11             return d
12
13
14 def query_all_memif_vpp_config(_test):
15     return _test.vapi.memif_dump()
16
17
18 def remove_all_memif_vpp_config(_test):
19     dump = _test.vapi.memif_dump()
20     for d in dump:
21         _test.vapi.memif_delete(d.sw_if_index)
22     dump = _test.vapi.memif_socket_filename_dump()
23     for d in dump:
24         if d.socket_id != 0:
25             _test.vapi.memif_socket_filename_add_del(
26                 0, d.socket_id, d.socket_filename)
27
28
29 class VppSocketFilename(VppObject):
30     def __init__(self, test, socket_id, socket_filename,
31                  add_default_folder=False):
32         self._test = test
33         self.socket_id = socket_id
34         self.socket_filename = socket_filename
35
36         # if True insert default socket folder before socket filename,
37         # after adding vpp config
38         self.add_default_folder = add_default_folder
39
40     def add_vpp_config(self):
41         rv = self._test.vapi.memif_socket_filename_add_del(
42             1, self.socket_id, self.socket_filename)
43         if self.add_default_folder:
44             self.socket_filename = "%s/%s" % (self._test.tempdir,
45                                               self.socket_filename)
46         return rv
47
48     def remove_vpp_config(self):
49         return self._test.vapi.memif_socket_filename_add_del(
50             0, self.socket_id, self.socket_filename)
51
52     def query_vpp_config(self):
53         return self._test.vapi.memif_socket_filename_dump()
54
55     def object_id(self):
56         return "socket-filename-%d-%s" % (self.socket_id, self.socket_filename)
57
58
59 class VppMemif(VppObject):
60     def __init__(self, test, role, mode, rx_queues=0, tx_queues=0, if_id=0,
61                  socket_id=0, secret="", ring_size=0, buffer_size=0,
62                  hw_addr=""):
63         self._test = test
64         self.role = role
65         self.mode = mode
66         self.rx_queues = rx_queues
67         self.tx_queues = tx_queues
68         self.if_id = if_id
69         self.socket_id = socket_id
70         self.secret = secret
71         self.ring_size = ring_size
72         self.buffer_size = buffer_size
73         self.hw_addr = hw_addr
74         self.sw_if_index = None
75         self.ip_prefix = IPv4Network("192.168.%d.%d/24" %
76                                      (self.if_id + 1, self.role + 1),
77                                      strict=False)
78
79     def add_vpp_config(self):
80         rv = self._test.vapi.memif_create(
81             role=self.role,
82             mode=self.mode,
83             rx_queues=self.rx_queues,
84             tx_queues=self.tx_queues,
85             id=self.if_id,
86             socket_id=self.socket_id,
87             secret=self.secret,
88             ring_size=self.ring_size,
89             buffer_size=self.buffer_size,
90             hw_addr=self.hw_addr)
91         try:
92             self.sw_if_index = rv.sw_if_index
93         except AttributeError:
94             # rv doesn't have .sw_if_index attribute
95             raise AttributeError("%s %s" % (self, rv))
96
97         return self.sw_if_index
98
99     def admin_up(self):
100         if self.sw_if_index:
101             return self._test.vapi.sw_interface_set_flags(
102                 sw_if_index=self.sw_if_index, flags=1)
103
104     def admin_down(self):
105         if self.sw_if_index:
106             return self._test.vapi.sw_interface_set_flags(
107                 sw_if_index=self.sw_if_index, flags=0)
108
109     def wait_for_link_up(self, timeout, step=1):
110         if not self.sw_if_index:
111             return False
112         while True:
113             dump = self.query_vpp_config()
114             f = VppEnum.vl_api_if_status_flags_t.IF_STATUS_API_FLAG_LINK_UP
115             if dump.flags & f:
116                 return True
117             self._test.sleep(step)
118             timeout -= step
119             if timeout <= 0:
120                 return False
121
122     def config_ip4(self):
123         return self._test.vapi.sw_interface_add_del_address(
124             sw_if_index=self.sw_if_index, prefix=self.ip_prefix)
125
126     def remove_vpp_config(self):
127         self._test.vapi.memif_delete(self.sw_if_index)
128         self.sw_if_index = None
129
130     def query_vpp_config(self):
131         if not self.sw_if_index:
132             return None
133         dump = self._test.vapi.memif_dump()
134         return get_if_dump(dump, self.sw_if_index)
135
136     def object_id(self):
137         if self.sw_if_index:
138             return "%d:%d:%d" % (self.role, self.if_id, self.sw_if_index)
139         else:
140             return "%d:%d:None" % (self.role, self.if_id)