VAT-to-PAPI: LISPUtil
[csit.git] / resources / libraries / python / LispUtil.py
1 # Copyright (c) 2019 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
5 #
6 #     http://www.apache.org/licenses/LICENSE-2.0
7 #
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
13
14 """Lisp utilities library."""
15
16 from robot.api import logger
17 from ipaddress import IPv4Address, IPv6Address
18
19 from resources.libraries.python.topology import Topology
20 from resources.libraries.python.PapiExecutor import PapiSocketExecutor
21 from resources.libraries.python.L2Util import L2Util
22
23 class LispUtil(object):
24     """Implements keywords for Lisp tests."""
25
26     def __init__(self):
27         pass
28
29     @staticmethod
30     def vpp_show_lisp_state(node):
31         """Get lisp state from VPP node.
32
33         :param node: VPP node.
34         :type node: dict
35         :returns: Lisp gpe state.
36         :rtype: dict
37         """
38         cmd = 'show_lisp_status'
39         err_msg = "Failed to get LISP status on host {host}".format(
40             host=node['host'])
41
42         with PapiSocketExecutor(node) as papi_exec:
43             reply = papi_exec.add(cmd).get_reply(err_msg)
44
45         data = dict()
46         data["feature_status"] = "enabled" if reply["feature_status"] else \
47             "disabled"
48         data["gpe_status"] = "enabled" if reply["gpe_status"] else "disabled"
49         return data
50
51     @staticmethod
52     def vpp_show_lisp_locator_set(node, items_filter):
53         """Get lisp locator_set from VPP node.
54
55         :param node: VPP node.
56         :param items_filter: Filter which specifies which items should be
57             retrieved - local, remote, empty string = both.
58         :type node: dict
59         :type items_filter: str
60         :returns: Lisp locator_set data as python list.
61         :rtype: list
62         """
63
64         ifilter = {"_": 0, "_local": 1, "_remote": 2}
65         args = dict(filter=ifilter["_" + items_filter])
66
67         cmd = 'lisp_locator_set_dump'
68         err_msg = "Failed to get LISP locator set on host {host}".format(
69             host=node['host'])
70
71         try:
72             with PapiSocketExecutor(node) as papi_exec:
73                 details = papi_exec.add(cmd, **args).get_details(err_msg)
74             data = []
75             for locator in details:
76                 data.append({"ls_name": locator["ls_name"].rstrip('\x00'),
77                              "ls_index": locator["ls_index"]})
78             return data
79         except (ValueError, LookupError):
80             return []
81
82     @staticmethod
83     def vpp_show_lisp_eid_table(node):
84         """Get lisp eid table from VPP node.
85
86         :param node: VPP node.
87         :type node: dict
88         :returns: Lisp eid table as python list.
89         :rtype: list
90         """
91
92         cmd = 'lisp_eid_table_dump'
93         err_msg = "Failed to get LISP eid table on host {host}".format(
94             host=node['host'])
95
96         with PapiSocketExecutor(node) as papi_exec:
97             details = papi_exec.add(cmd).get_details(err_msg)
98
99         data = []
100         for eid_details in details:
101             eid = 'Bad eid type'
102             if eid_details["eid_type"] == 0:
103                 prefix = str(eid_details["eid_prefix_len"])
104                 eid = str(IPv4Address(eid_details["eid"][0:4])) + "/" + prefix
105             elif eid_details["eid_type"] == 1:
106                 prefix = str(eid_details["eid_prefix_len"])
107                 eid = str(IPv6Address(eid_details["eid"])) + "/" + prefix
108             elif eid_details["eid_type"] == 2:
109                 eid = str(L2Util.bin_to_mac(eid_details["eid"][0:6]))
110             data.append({"action": eid_details["action"],
111                          "is_local": eid_details["is_local"],
112                          "eid": eid,
113                          "vni": eid_details["vni"],
114                          "ttl": eid_details["ttl"],
115                          "authoritative": eid_details["authoritative"]})
116         return data
117
118     @staticmethod
119     def vpp_show_lisp_map_resolver(node):
120         """Get lisp map resolver from VPP node.
121
122         :param node: VPP node.
123         :type node: dict
124         :returns: Lisp map resolver as python list.
125         :rtype: list
126         """
127
128         cmd = 'lisp_map_resolver_dump'
129         err_msg = "Failed to get LISP map resolver on host {host}".format(
130             host=node['host'])
131
132         with PapiSocketExecutor(node) as papi_exec:
133             details = papi_exec.add(cmd).get_details(err_msg)
134
135         data = []
136         for resolver in details:
137             address = 'Bad is_ipv6 flag'
138             if resolver["is_ipv6"] == 0:
139                 address = str(IPv4Address(resolver["ip_address"][0:4]))
140             elif resolver["is_ipv6"] == 1:
141                 address = str(IPv6Address(resolver["ip_address"]))
142             data.append({"map resolver": address})
143         return data
144
145     @staticmethod
146     def vpp_show_lisp_map_register(node):
147         """Get LISP Map Register from VPP node.
148
149         :param node: VPP node.
150         :type node: dict
151         :returns: LISP Map Register as python dict.
152         :rtype: dict
153         """
154
155         cmd = 'show_lisp_map_register_state'
156         err_msg = "Failed to get LISP map register state on host {host}".format(
157             host=node['host'])
158
159         with PapiSocketExecutor(node) as papi_exec:
160             reply = papi_exec.add(cmd).get_reply(err_msg)
161
162         data = dict()
163         data["state"] = "enabled" if reply["is_enabled"] else "disabled"
164         logger.info(data)
165         return data
166
167     @staticmethod
168     def vpp_show_lisp_map_request_mode(node):
169         """Get LISP Map Request mode from VPP node.
170
171         :param node: VPP node.
172         :type node: dict
173         :returns: LISP Map Request mode as python dict.
174         :rtype: dict
175         """
176
177         cmd = 'show_lisp_map_request_mode'
178         err_msg = "Failed to get LISP map request mode on host {host}".format(
179             host=node['host'])
180
181         with PapiSocketExecutor(node) as papi_exec:
182             reply = papi_exec.add(cmd).get_reply(err_msg)
183
184         data = dict()
185         data["map_request_mode"] = "src-dst" if reply["mode"] else "dst-only"
186         logger.info(data)
187         return data
188
189     @staticmethod
190     def vpp_show_lisp_map_server(node):
191         """Get LISP Map Server from VPP node.
192
193         :param node: VPP node.
194         :type node: dict
195         :returns: LISP Map Server as python list.
196         :rtype: list
197         """
198
199         cmd = 'lisp_map_server_dump'
200         err_msg = "Failed to get LISP map server on host {host}".format(
201             host=node['host'])
202
203         with PapiSocketExecutor(node) as papi_exec:
204             details = papi_exec.add(cmd).get_details(err_msg)
205
206         data = []
207         for server in details:
208             address = 'Bad is_ipv6 flag'
209             if server["is_ipv6"] == 0:
210                 address = str(IPv4Address(server["ip_address"][0:4]))
211             elif server["is_ipv6"] == 1:
212                 address = str(IPv6Address(server["ip_address"]))
213             data.append({"map-server": address})
214         logger.info(data)
215         return data
216
217     @staticmethod
218     def vpp_show_lisp_petr_config(node):
219         """Get LISP PETR configuration from VPP node.
220
221         :param node: VPP node.
222         :type node: dict
223         :returns: LISP PETR configuration as python dict.
224         :rtype: dict
225         """
226
227 # Note: VAT is returning ipv6 address instead of ipv4
228
229         cmd = 'show_lisp_use_petr'
230         err_msg = "Failed to get LISP petr config on host {host}".format(
231             host=node['host'])
232
233         with PapiSocketExecutor(node) as papi_exec:
234             reply = papi_exec.add(cmd).get_reply(err_msg)
235
236         data = dict()
237         data["status"] = "enabled" if reply["status"] else "disabled"
238         address = 'Bad is_ip4 flag'
239         if reply["is_ip4"] == 0:
240             address = str(IPv6Address(reply["address"]))
241         elif reply["is_ip4"] == 1:
242             address = str(IPv4Address(reply["address"][0:4]))
243         data["address"] = address
244         logger.info(data)
245         return data
246
247     @staticmethod
248     def vpp_show_lisp_rloc_config(node):
249         """Get LISP RLOC configuration from VPP node.
250
251         :param node: VPP node.
252         :type node: dict
253         :returns: LISP RLOC configuration as python dict.
254         :rtype: dict
255         """
256
257         cmd = 'show_lisp_rloc_probe_state'
258         err_msg = "Failed to get LISP rloc config on host {host}".format(
259             host=node['host'])
260
261         with PapiSocketExecutor(node) as papi_exec:
262             reply = papi_exec.add(cmd).get_reply(err_msg)
263
264         data = dict()
265         data["state"] = "enabled" if reply["is_enabled"] else "disabled"
266         logger.info(data)
267         return data
268
269     @staticmethod
270     def vpp_show_lisp_pitr(node):
271         """Get Lisp PITR feature config from VPP node.
272
273         :param node: VPP node.
274         :type node: dict
275         :returns: Lisp PITR config data.
276         :rtype: dict
277         """
278
279         cmd = 'show_lisp_pitr'
280         err_msg = "Failed to get LISP pitr on host {host}".format(
281             host=node['host'])
282
283         with PapiSocketExecutor(node) as papi_exec:
284             reply = papi_exec.add(cmd).get_reply(err_msg)
285
286         data = dict()
287         data["status"] = "enabled" if reply["status"] else "disabled"
288         return data
289
290     @staticmethod
291     def lisp_should_be_equal(lisp_val1, lisp_val2):
292         """Fail if the lisp values are not equal.
293
294         :param lisp_val1: First lisp value.
295         :param lisp_val2: Second lisp value.
296         :type lisp_val1: list
297         :type lisp_val2: list
298         """
299
300         len1 = len(lisp_val1)
301         len2 = len(lisp_val2)
302         if len1 != len2:
303             raise RuntimeError('Values are not same. '
304                                'Value 1 {} \n'
305                                'Value 2 {}.'.format(lisp_val1,
306                                                     lisp_val2))
307
308         for tmp in lisp_val1:
309             if tmp not in lisp_val2:
310                 raise RuntimeError('Value {} not found in vpp:\n'
311                                    '{}'.format(tmp, lisp_val2))
312
313     def lisp_locator_s_should_be_equal(self, locator_set1, locator_set2):
314         """Fail if the lisp values are not equal.
315
316         :param locator_set1: Generate lisp value.
317         :param locator_set2: Lisp value from VPP.
318         :type locator_set1: list
319         :type locator_set2: list
320         """
321
322         locator_set_list = []
323         for item in locator_set1:
324             if item not in locator_set_list:
325                 locator_set_list.append(item)
326         self.lisp_should_be_equal(locator_set_list, locator_set2)
327
328     @staticmethod
329     def generate_unique_lisp_locator_set_data(node, locator_set_number):
330         """Generate a list of lisp locator_set we want set to VPP and
331         then check if it is set correctly. All locator_sets are unique.
332
333         :param node: VPP node.
334         :param locator_set_number: Generate n locator_set.
335         :type node: dict
336         :type locator_set_number: str
337         :returns: list of lisp locator_set, list of lisp locator_set expected
338             from VAT.
339         :rtype: tuple
340         """
341
342         topo = Topology()
343
344         locator_set_list = []
345         locator_set_list_vat = []
346         i = 0
347         for num in range(0, int(locator_set_number)):
348             locator_list = []
349             for interface in node['interfaces'].values():
350                 link = interface.get('link')
351                 i += 1
352                 if link is None:
353                     continue
354
355                 if_name = topo.get_interface_by_link_name(node, link)
356                 sw_if_index = topo.get_interface_sw_index(node, if_name)
357                 if if_name is not None:
358                     locator = {'locator-index': sw_if_index,
359                                'priority': i,
360                                'weight': i}
361                     locator_list.append(locator)
362
363             l_name = 'ls{0}'.format(num)
364             locator_set = {'locator-set': l_name,
365                            'locator': locator_list}
366             locator_set_list.append(locator_set)
367
368             locator_set_vat = {"ls_name": l_name,
369                                "ls_index": num}
370             locator_set_list_vat.append(locator_set_vat)
371
372         return locator_set_list, locator_set_list_vat
373
374     @staticmethod
375     def generate_duplicate_lisp_locator_set_data(node, locator_set_number):
376         """Generate a list of lisp locator_set we want set to VPP and
377         then check if it is set correctly. Some locator_sets are duplicated.
378
379         :param node: VPP node.
380         :param locator_set_number: Generate n locator_set.
381         :type node: dict
382         :type locator_set_number: str
383         :returns: list of lisp locator_set, list of lisp locator_set expected
384             from VAT.
385         :rtype: tuple
386         """
387
388         topo = Topology()
389         locator_set_list = []
390         locator_set_list_vat = []
391         i = 0
392         for num in range(0, int(locator_set_number)):
393             locator_list = []
394             for interface in node['interfaces'].values():
395                 link = interface.get('link')
396                 i += 1
397                 if link is None:
398                     continue
399
400                 if_name = topo.get_interface_by_link_name(node, link)
401                 sw_if_index = topo.get_interface_sw_index(node, if_name)
402                 if if_name is not None:
403                     l_name = 'ls{0}'.format(num)
404                     locator = {'locator-index': sw_if_index,
405                                'priority': i,
406                                'weight': i}
407                     locator_list.append(locator)
408                     locator_set = {'locator-set': l_name,
409                                    'locator': locator_list}
410                     locator_set_list.append(locator_set)
411
412                     locator_set_vat = {"ls_name": l_name,
413                                        "ls_index": num}
414                     locator_set_list_vat.append(locator_set_vat)
415
416         return locator_set_list, locator_set_list_vat
417
418     def lisp_is_empty(self, lisp_params):
419         """Check if the input param are empty.
420
421         :param lisp_params: Should be empty list.
422         :type lisp_params: list
423         """
424
425         self.lisp_should_be_equal([], lisp_params)