Python3: resources and libraries
[csit.git] / resources / libraries / python / LispUtil.py
1 # Copyright (c) 2019 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
5 #
6 #     http://www.apache.org/licenses/LICENSE-2.0
7 #
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
13
14 """Lisp utilities library."""
15
16 from ipaddress import IPv4Address, IPv6Address
17 from robot.api import logger
18
19 from resources.libraries.python.L2Util import L2Util
20 from resources.libraries.python.PapiExecutor import PapiSocketExecutor
21 from resources.libraries.python.topology import Topology
22
23 class LispUtil:
24     """Implements keywords for Lisp tests."""
25
26     @staticmethod
27     def vpp_show_lisp_state(node):
28         """Get lisp state from VPP node.
29
30         :param node: VPP node.
31         :type node: dict
32         :returns: Lisp gpe state.
33         :rtype: dict
34         """
35         cmd = u"show_lisp_status"
36         err_msg = f"Failed to get LISP status on host {node['host']}"
37
38         with PapiSocketExecutor(node) as papi_exec:
39             reply = papi_exec.add(cmd).get_reply(err_msg)
40
41         data = dict()
42         data[u"feature_status"] = u"enabled" if reply[u"feature_status"] \
43             else u"disabled"
44         data[u"gpe_status"] = u"enabled" if reply[u"gpe_status"] \
45             else u"disabled"
46         return data
47
48     @staticmethod
49     def vpp_show_lisp_locator_set(node, items_filter):
50         """Get lisp locator_set from VPP node.
51
52         :param node: VPP node.
53         :param items_filter: Filter which specifies which items should be
54             retrieved - local, remote, empty string = both.
55         :type node: dict
56         :type items_filter: str
57         :returns: Lisp locator_set data as python list.
58         :rtype: list
59         """
60         ifilter = {u"_": 0, u"_local": 1, u"_remote": 2}
61         args = dict(
62             filter=ifilter[u"_" + items_filter]
63         )
64
65         cmd = u"lisp_locator_set_dump"
66         err_msg = f"Failed to get LISP locator set on host {node['host']}"
67
68         try:
69             with PapiSocketExecutor(node) as papi_exec:
70                 details = papi_exec.add(cmd, **args).get_details(err_msg)
71             data = list()
72             for locator in details:
73                 data.append(
74                     {u"ls_name": locator[u"ls_name"].rstrip(b'\0'),
75                      u"ls_index": locator[u"ls_index"]}
76                 )
77             return data
78         except (ValueError, LookupError) as err:
79             logger.warn(f"Failed to get LISP locator set {err}")
80             return list()
81
82     @staticmethod
83     def vpp_show_lisp_eid_table(node):
84         """Get lisp eid table from VPP node.
85
86         :param node: VPP node.
87         :type node: dict
88         :returns: Lisp eid table as python list.
89         :rtype: list
90         """
91         cmd = u"lisp_eid_table_dump"
92         err_msg = f"Failed to get LISP eid table on host {node[u'host']}"
93
94         with PapiSocketExecutor(node) as papi_exec:
95             details = papi_exec.add(cmd).get_details(err_msg)
96
97         data = list()
98         for eid_details in details:
99             eid = u"Bad eid type"
100             if eid_details[u"eid_type"] == 0:
101                 prefix = str(eid_details[u"eid_prefix_len"])
102                 eid = str(IPv4Address(eid_details[u"eid"][0:4])) + u"/" + \
103                       prefix
104             elif eid_details[u"eid_type"] == 1:
105                 prefix = str(eid_details[u"eid_prefix_len"])
106                 eid = str(IPv6Address(eid_details[u"eid"])) + u"/" + prefix
107             elif eid_details[u"eid_type"] == 2:
108                 eid = str(L2Util.bin_to_mac(eid_details[u"eid"][0:6]))
109             data.append(
110                 {
111                     u"action": eid_details[u"action"],
112                     u"is_local": eid_details[u"is_local"],
113                     u"eid": eid,
114                     u"vni": eid_details[u"vni"],
115                     u"ttl": eid_details[u"ttl"],
116                     u"authoritative": eid_details[u"authoritative"]
117                 }
118             )
119         return data
120
121     @staticmethod
122     def vpp_show_lisp_map_resolver(node):
123         """Get lisp map resolver from VPP node.
124
125         :param node: VPP node.
126         :type node: dict
127         :returns: Lisp map resolver as python list.
128         :rtype: list
129         """
130         cmd = u"lisp_map_resolver_dump"
131         err_msg = f"Failed to get LISP map resolver on host {node[u'host']}"
132
133         with PapiSocketExecutor(node) as papi_exec:
134             details = papi_exec.add(cmd).get_details(err_msg)
135
136         data = list()
137         for resolver in details:
138             address = u"Bad is_ipv6 flag"
139             if resolver[u"is_ipv6"] == 0:
140                 address = str(IPv4Address(resolver[u"ip_address"][0:4]))
141             elif resolver[u"is_ipv6"] == 1:
142                 address = str(IPv6Address(resolver[u"ip_address"]))
143             data.append({u"map resolver": address})
144         return data
145
146     @staticmethod
147     def vpp_show_lisp_map_register(node):
148         """Get LISP Map Register from VPP node.
149
150         :param node: VPP node.
151         :type node: dict
152         :returns: LISP Map Register as python dict.
153         :rtype: dict
154         """
155         cmd = u"show_lisp_map_register_state"
156         err_msg = f"Failed to get LISP map register state on host " \
157             f"{node[u'host']}"
158
159         with PapiSocketExecutor(node) as papi_exec:
160             reply = papi_exec.add(cmd).get_reply(err_msg)
161
162         data = dict()
163         data[u"state"] = u"enabled" if reply[u"is_enabled"] else u"disabled"
164         logger.info(data)
165         return data
166
167     @staticmethod
168     def vpp_show_lisp_map_request_mode(node):
169         """Get LISP Map Request mode from VPP node.
170
171         :param node: VPP node.
172         :type node: dict
173         :returns: LISP Map Request mode as python dict.
174         :rtype: dict
175         """
176         cmd = u"show_lisp_map_request_mode"
177         err_msg = f"Failed to get LISP map request mode on host {node[u'host']}"
178
179         with PapiSocketExecutor(node) as papi_exec:
180             reply = papi_exec.add(cmd).get_reply(err_msg)
181
182         data = dict()
183         data[u"map_request_mode"] = u"src-dst" if reply[u"mode"] \
184             else u"dst-only"
185         logger.info(data)
186         return data
187
188     @staticmethod
189     def vpp_show_lisp_map_server(node):
190         """Get LISP Map Server from VPP node.
191
192         :param node: VPP node.
193         :type node: dict
194         :returns: LISP Map Server as python list.
195         :rtype: list
196         """
197         cmd = u"lisp_map_server_dump"
198         err_msg = f"Failed to get LISP map server on host {node[u'host']}"
199
200         with PapiSocketExecutor(node) as papi_exec:
201             details = papi_exec.add(cmd).get_details(err_msg)
202
203         data = list()
204         for server in details:
205             address = u"Bad is_ipv6 flag"
206             if server[u"is_ipv6"] == 0:
207                 address = str(IPv4Address(server[u"ip_address"][0:4]))
208             elif server[u"is_ipv6"] == 1:
209                 address = str(IPv6Address(server[u"ip_address"]))
210             data.append({u"map-server": address})
211         logger.info(data)
212         return data
213
214     @staticmethod
215     def vpp_show_lisp_petr_config(node):
216         """Get LISP PETR configuration from VPP node.
217
218         :param node: VPP node.
219         :type node: dict
220         :returns: LISP PETR configuration as python dict.
221         :rtype: dict
222         """
223         # Note: VAT is returning ipv6 address instead of ipv4
224         cmd = u"show_lisp_use_petr"
225         err_msg = f"Failed to get LISP petr config on host {node[u'host']}"
226
227         with PapiSocketExecutor(node) as papi_exec:
228             reply = papi_exec.add(cmd).get_reply(err_msg)
229
230         data = dict()
231         data[u"status"] = u"enabled" if reply[u"status"] else u"disabled"
232         address = u"Bad is_ip4 flag"
233         if reply[u"is_ip4"] == 0:
234             address = str(IPv6Address(reply[u"address"]))
235         elif reply[u"is_ip4"] == 1:
236             address = str(IPv4Address(reply[u"address"][0:4]))
237         data[u"address"] = address
238         logger.info(data)
239         return data
240
241     @staticmethod
242     def vpp_show_lisp_rloc_config(node):
243         """Get LISP RLOC configuration from VPP node.
244
245         :param node: VPP node.
246         :type node: dict
247         :returns: LISP RLOC configuration as python dict.
248         :rtype: dict
249         """
250         cmd = u"show_lisp_rloc_probe_state"
251         err_msg = f"Failed to get LISP rloc config on host {node[u'host']}"
252
253         with PapiSocketExecutor(node) as papi_exec:
254             reply = papi_exec.add(cmd).get_reply(err_msg)
255
256         data = dict()
257         data[u"state"] = u"enabled" if reply[u"is_enabled"] else u"disabled"
258         logger.info(data)
259         return data
260
261     @staticmethod
262     def vpp_show_lisp_pitr(node):
263         """Get Lisp PITR feature config from VPP node.
264
265         :param node: VPP node.
266         :type node: dict
267         :returns: Lisp PITR config data.
268         :rtype: dict
269         """
270         cmd = u"show_lisp_pitr"
271         err_msg = f"Failed to get LISP pitr on host {node[u'host']}"
272
273         with PapiSocketExecutor(node) as papi_exec:
274             reply = papi_exec.add(cmd).get_reply(err_msg)
275
276         data = dict()
277         data[u"status"] = u"enabled" if reply[u"status"] else u"disabled"
278         return data
279
280     @staticmethod
281     def lisp_should_be_equal(lisp_val1, lisp_val2):
282         """Fail if the lisp values are not equal.
283
284         :param lisp_val1: First lisp value.
285         :param lisp_val2: Second lisp value.
286         :type lisp_val1: list
287         :type lisp_val2: list
288         """
289         len1 = len(lisp_val1)
290         len2 = len(lisp_val2)
291
292         if len1 != len2:
293             raise RuntimeError(
294                 f"Values are not same. Value 1 {lisp_val1} \n"
295                 f"Value 2 {lisp_val2}."
296             )
297
298         for tmp in lisp_val1:
299             if tmp not in lisp_val2:
300                 raise RuntimeError(
301                     f"Value {tmp} not found in vpp:\n{lisp_val2}"
302                 )
303
304     def lisp_locator_s_should_be_equal(self, locator_set1, locator_set2):
305         """Fail if the lisp values are not equal.
306
307         :param locator_set1: Generate lisp value.
308         :param locator_set2: Lisp value from VPP.
309         :type locator_set1: list
310         :type locator_set2: list
311         """
312         locator_set_list = list()
313
314         for item in locator_set1:
315             if item not in locator_set_list:
316                 locator_set_list.append(item)
317
318         self.lisp_should_be_equal(locator_set_list, locator_set2)
319
320     @staticmethod
321     def generate_unique_lisp_locator_set_data(node, locator_set_number):
322         """Generate a list of lisp locator_set we want set to VPP and
323         then check if it is set correctly. All locator_sets are unique.
324
325         :param node: VPP node.
326         :param locator_set_number: Generate n locator_set.
327         :type node: dict
328         :type locator_set_number: str
329         :returns: list of lisp locator_set, list of lisp locator_set expected
330             from VAT.
331         :rtype: tuple
332         """
333         topo = Topology()
334         locator_set_list = list()
335         locator_set_list_vat = list()
336
337         i = 0
338         for num in range(0, int(locator_set_number)):
339             locator_list = list()
340             for interface in list(node[u"interfaces"].values()):
341                 link = interface.get(u"link")
342                 i += 1
343                 if link is None:
344                     continue
345
346                 if_name = topo.get_interface_by_link_name(node, link)
347                 sw_if_index = topo.get_interface_sw_index(node, if_name)
348                 if if_name is not None:
349                     locator = {
350                         u"locator-index": sw_if_index,
351                         u"priority": i,
352                         u"weight": i
353                     }
354                     locator_list.append(locator)
355
356             l_name = f"ls{num}"
357             locator_set = {
358                 u"locator-set": l_name,
359                 u"locator": locator_list
360             }
361             locator_set_list.append(locator_set)
362
363             locator_set_vat = {
364                 u"ls_name": l_name,
365                 u"ls_index": num
366             }
367             locator_set_list_vat.append(locator_set_vat)
368
369         return locator_set_list, locator_set_list_vat
370
371     @staticmethod
372     def generate_duplicate_lisp_locator_set_data(node, locator_set_number):
373         """Generate a list of lisp locator_set we want set to VPP and
374         then check if it is set correctly. Some locator_sets are duplicated.
375
376         :param node: VPP node.
377         :param locator_set_number: Generate n locator_set.
378         :type node: dict
379         :type locator_set_number: str
380         :returns: list of lisp locator_set, list of lisp locator_set expected
381             from VAT.
382         :rtype: tuple
383         """
384         topo = Topology()
385         locator_set_list = list()
386         locator_set_list_vat = list()
387
388         i = 0
389         for num in range(0, int(locator_set_number)):
390             locator_list = []
391             for interface in list(node[u"interfaces"].values()):
392                 link = interface.get(u"link")
393                 i += 1
394                 if link is None:
395                     continue
396
397                 if_name = topo.get_interface_by_link_name(node, link)
398                 sw_if_index = topo.get_interface_sw_index(node, if_name)
399                 if if_name is not None:
400                     l_name = f"ls{num}"
401                     locator = {
402                         u"locator-index": sw_if_index,
403                         u"priority": i,
404                         u"weight": i
405                     }
406                     locator_list.append(locator)
407                     locator_set = {
408                         u"locator-set": l_name,
409                         u"locator": locator_list
410                     }
411                     locator_set_list.append(locator_set)
412
413                     locator_set_vat = {
414                         u"ls_name": l_name,
415                         u"ls_index": num
416                     }
417                     locator_set_list_vat.append(locator_set_vat)
418
419         return locator_set_list, locator_set_list_vat
420
421     def lisp_is_empty(self, lisp_params):
422         """Check if the input param are empty.
423
424         :param lisp_params: Should be empty list.
425         :type lisp_params: list
426         """
427         self.lisp_should_be_equal([], lisp_params)