X-Git-Url: https://gerrit.fd.io/r/gitweb?p=csit.git;a=blobdiff_plain;f=resources%2Flibraries%2Fpython%2FTestConfig.py;h=9f83fbb5fc6dfa1729224b1b79dd62a73b793d33;hp=48f67e5a0f420e5f6af61a2b167270c8d8ddc290;hb=d68951ac245150eeefa6e0f4156e4c1b5c9e9325;hpb=ed0258a440cfad7023d643f717ab78ac568dc59b diff --git a/resources/libraries/python/TestConfig.py b/resources/libraries/python/TestConfig.py index 48f67e5a0f..9f83fbb5fc 100644 --- a/resources/libraries/python/TestConfig.py +++ b/resources/libraries/python/TestConfig.py @@ -25,7 +25,7 @@ from resources.libraries.python.topology import Topology from resources.libraries.python.VatExecutor import VatExecutor -class TestConfig(object): +class TestConfig: """Contains special test configurations implemented in python for faster execution.""" @@ -69,17 +69,20 @@ class TestConfig(object): # configure IPs, create VXLAN interfaces and VLAN sub-interfaces vxlan_count = TestConfig.vpp_create_vxlan_and_vlan_interfaces( node, node_vxlan_if, node_vlan_if, n_tunnels, vni_start, - src_ip_start, dst_ip_start, ip_step) + src_ip_start, dst_ip_start, ip_step + ) # update topology with VXLAN interfaces and VLAN sub-interfaces data # and put interfaces up TestConfig.vpp_put_vxlan_and_vlan_interfaces_up( - node, vxlan_count, node_vlan_if) + node, vxlan_count, node_vlan_if + ) # configure bridge domains, ARPs and routes TestConfig.vpp_put_vxlan_and_vlan_interfaces_to_bridge_domain( node, node_vxlan_if, vxlan_count, op_node, op_node_if, dst_ip_start, - ip_step, bd_id_start) + ip_step, bd_id_start + ) @staticmethod def vpp_create_vxlan_and_vlan_interfaces( @@ -110,47 +113,49 @@ class TestConfig(object): :returns: Number of created VXLAN interfaces. :rtype: int """ - src_ip_addr_start = ip_address(unicode(src_ip_start)) - dst_ip_addr_start = ip_address(unicode(dst_ip_start)) + src_ip_start = ip_address(src_ip_start) + dst_ip_start = ip_address(dst_ip_start) if vxlan_count > 10: commands = list() - tmp_fn = '/tmp/create_vxlan_interfaces.config' - for i in xrange(0, vxlan_count): + for i in range(0, vxlan_count): try: - src_ip = src_ip_addr_start + i * ip_step - dst_ip = dst_ip_addr_start + i * ip_step + src_ip = src_ip_start + i * ip_step + dst_ip = dst_ip_start + i * ip_step except AddressValueError: - logger.warn("Can't do more iterations - IP address limit " - "has been reached.") + logger.warn( + u"Can't do more iterations - IP address limit " + u"has been reached." + ) vxlan_count = i break commands.append( - 'sw_interface_add_del_address sw_if_index {sw_idx} ' - '{ip}/{ip_len}\n'.format( - sw_idx=Topology.get_interface_sw_index( - node, node_vxlan_if), - ip=src_ip, - ip_len=128 if src_ip.version == 6 else 32)) + f"sw_interface_add_del_address sw_if_index " + f"{Topology.get_interface_sw_index(node, node_vxlan_if)} " + f"{src_ip}/{128 if src_ip.version == 6 else 32}\n" + ) commands.append( - 'vxlan_add_del_tunnel src {srcip} dst {dstip} vni {vni}\n'\ - .format(srcip=src_ip, dstip=dst_ip, - vni=vni_start + i)) + f"vxlan_add_del_tunnel src {src_ip} dst {dst_ip} " + f"vni {vni_start + i}\n" + ) commands.append( - 'create_vlan_subif sw_if_index {sw_idx} vlan {vlan}\n'\ - .format(sw_idx=Topology.get_interface_sw_index( - node, node_vlan_if), vlan=i + 1)) - VatExecutor().write_and_execute_script(node, tmp_fn, commands) + f"create_vlan_subif sw_if_index " + f"{Topology.get_interface_sw_index(node, node_vlan_if)} " + f"vlan {i + 1}\n" + ) + VatExecutor().write_and_execute_script( + node, u"/tmp/create_vxlan_interfaces.config", commands + ) return vxlan_count - cmd1 = 'sw_interface_add_del_address' + cmd1 = u"sw_interface_add_del_address" args1 = dict( sw_if_index=InterfaceUtil.get_interface_index(node, node_vxlan_if), is_add=True, del_all=False, prefix=None ) - cmd2 = 'vxlan_add_del_tunnel' + cmd2 = u"vxlan_add_del_tunnel" args2 = dict( is_add=1, is_ipv6=0, @@ -162,7 +167,7 @@ class TestConfig(object): decap_next_index=Constants.BITWISE_NON_ZERO, vni=None ) - cmd3 = 'create_vlan_subif' + cmd3 = u"create_vlan_subif" args3 = dict( sw_if_index=InterfaceUtil.get_interface_index( node, node_vlan_if), @@ -170,22 +175,25 @@ class TestConfig(object): ) with PapiSocketExecutor(node) as papi_exec: - for i in xrange(0, vxlan_count): + for i in range(0, vxlan_count): try: - src_ip = src_ip_addr_start + i * ip_step - dst_ip = dst_ip_addr_start + i * ip_step + src_ip = src_ip_start + i * ip_step + dst_ip = dst_ip_start + i * ip_step except AddressValueError: - logger.warn("Can't do more iterations - IP address limit " - "has been reached.") + logger.warn( + u"Can't do more iterations - IP address limit " + u"has been reached." + ) vxlan_count = i break - args1['prefix'] = IPUtil.create_prefix_object( - src_ip, 128 if src_ip_addr_start.version == 6 else 32) - args2['src_address'] = getattr(src_ip, 'packed') - args2['dst_address'] = getattr(dst_ip, 'packed') - args2['vni'] = int(vni_start) + i - args3['vlan_id'] = i + 1 - history = False if 1 < i < vxlan_count else True + args1[u"prefix"] = IPUtil.create_prefix_object( + src_ip, 128 if src_ip_start.version == 6 else 32 + ) + args2[u"src_address"] = getattr(src_ip, u"packed") + args2[u"dst_address"] = getattr(dst_ip, u"packed") + args2[u"vni"] = int(vni_start) + i + args3[u"vlan_id"] = i + 1 + history = bool(not 1 < i < vxlan_count - 1) papi_exec.add(cmd1, history=history, **args1).\ add(cmd2, history=history, **args2).\ add(cmd3, history=history, **args3) @@ -207,50 +215,51 @@ class TestConfig(object): :type vxlan_count: int :type node_vlan_if: str """ - if_data = InterfaceUtil.vpp_get_interface_data(node) - vlan_if_name = Topology.get_interface_name(node, node_vlan_if) - if vxlan_count > 10: - tmp_fn = '/tmp/put_subinterfaces_up.config' commands = list() - for i in xrange(0, vxlan_count): - vxlan_subif_key = Topology.add_new_port(node, 'vxlan_tunnel') - vxlan_subif_name = 'vxlan_tunnel{nr}'.format(nr=i) - vxlan_found = False + for i in range(0, vxlan_count): + vxlan_subif_key = Topology.add_new_port(node, u"vxlan_tunnel") + vxlan_subif_name = f"vxlan_tunnel{i}" + founds = dict(vxlan=False, vlan=False) vxlan_subif_idx = None - vlan_subif_key = Topology.add_new_port(node, 'vlan_subif') - vlan_subif_name = '{if_name}.{vlan}'.format( - if_name=vlan_if_name, vlan=i + 1) - vlan_found = False + vlan_subif_key = Topology.add_new_port(node, u"vlan_subif") + vlan_subif_name = \ + f"{Topology.get_interface_name(node, node_vlan_if)}.{i + 1}" vlan_idx = None - for data in if_data: - if_name = data['interface_name'] - if not vxlan_found and if_name == vxlan_subif_name: - vxlan_subif_idx = data['sw_if_index'] - vxlan_found = True - elif not vlan_found and if_name == vlan_subif_name: - vlan_idx = data['sw_if_index'] - vlan_found = True - if vxlan_found and vlan_found: + for data in InterfaceUtil.vpp_get_interface_data(node): + if_name = data[u"interface_name"] + if not founds[u"vxlan"] and if_name == vxlan_subif_name: + vxlan_subif_idx = data[u"sw_if_index"] + founds[u"vxlan"] = True + elif not founds[u"vlan"] and if_name == vlan_subif_name: + vlan_idx = data[u"sw_if_index"] + founds[u"vlan"] = True + if founds[u"vxlan"] and founds[u"vlan"]: break Topology.update_interface_sw_if_index( node, vxlan_subif_key, vxlan_subif_idx) Topology.update_interface_name( node, vxlan_subif_key, vxlan_subif_name) commands.append( - 'sw_interface_set_flags sw_if_index {sw_idx} admin-up ' - 'link-up\n'.format(sw_idx=vxlan_subif_idx)) + f"sw_interface_set_flags sw_if_index {vxlan_subif_idx} " + f"admin-up link-up\n" + ) Topology.update_interface_sw_if_index( - node, vlan_subif_key, vlan_idx) + node, vlan_subif_key, vlan_idx + ) Topology.update_interface_name( - node, vlan_subif_key, vlan_subif_name) + node, vlan_subif_key, vlan_subif_name + ) commands.append( - 'sw_interface_set_flags sw_if_index {sw_idx} admin-up ' - 'link-up\n'.format(sw_idx=vlan_idx)) - VatExecutor().write_and_execute_script(node, tmp_fn, commands) + f"sw_interface_set_flags sw_if_index {vlan_idx} admin-up " + f"link-up\n" + ) + VatExecutor().write_and_execute_script( + node, u"/tmp/put_subinterfaces_up.config", commands + ) return - cmd = 'sw_interface_set_flags' + cmd = u"sw_interface_set_flags" args1 = dict( sw_if_index=None, flags=InterfaceStatusFlags.IF_STATUS_API_FLAG_ADMIN_UP.value @@ -261,38 +270,41 @@ class TestConfig(object): ) with PapiSocketExecutor(node) as papi_exec: - for i in xrange(0, vxlan_count): - vxlan_subif_key = Topology.add_new_port(node, 'vxlan_tunnel') - vxlan_subif_name = 'vxlan_tunnel{nr}'.format(nr=i) - vxlan_found = False + for i in range(0, vxlan_count): + vxlan_subif_key = Topology.add_new_port(node, u"vxlan_tunnel") + vxlan_subif_name = f"vxlan_tunnel{i}" + founds = dict(vxlan=False, vlan=False) vxlan_subif_idx = None - vlan_subif_key = Topology.add_new_port(node, 'vlan_subif') - vlan_subif_name = '{if_name}.{vlan}'.format( - if_name=vlan_if_name, vlan=i+1) - vlan_found = False + vlan_subif_key = Topology.add_new_port(node, u"vlan_subif") + vlan_subif_name = \ + f"{Topology.get_interface_name(node, node_vlan_if)}.{i+1}" vlan_idx = None - for data in if_data: - if not vxlan_found \ - and data['interface_name'] == vxlan_subif_name: - vxlan_subif_idx = data['sw_if_index'] - vxlan_found = True - elif not vlan_found \ - and data['interface_name'] == vlan_subif_name: - vlan_idx = data['sw_if_index'] - vlan_found = True - if vxlan_found and vlan_found: + for data in InterfaceUtil.vpp_get_interface_data(node): + if not founds[u"vxlan"] \ + and data[u"interface_name"] == vxlan_subif_name: + vxlan_subif_idx = data[u"sw_if_index"] + founds[u"vxlan"] = True + elif not founds[u"vlan"] \ + and data[u"interface_name"] == vlan_subif_name: + vlan_idx = data[u"sw_if_index"] + founds[u"vlan"] = True + if founds[u"vxlan"] and founds[u"vlan"]: break Topology.update_interface_sw_if_index( - node, vxlan_subif_key, vxlan_subif_idx) + node, vxlan_subif_key, vxlan_subif_idx + ) Topology.update_interface_name( - node, vxlan_subif_key, vxlan_subif_name) - args1['sw_if_index'] = vxlan_subif_idx + node, vxlan_subif_key, vxlan_subif_name + ) + args1[u"sw_if_index"] = vxlan_subif_idx Topology.update_interface_sw_if_index( - node, vlan_subif_key, vlan_idx) + node, vlan_subif_key, vlan_idx + ) Topology.update_interface_name( - node, vlan_subif_key, vlan_subif_name) - args2['sw_if_index'] = vlan_idx - history = False if 1 < i < vxlan_count else True + node, vlan_subif_key, vlan_subif_name + ) + args2[u"sw_if_index"] = vlan_idx + history = bool(not 1 < i < vxlan_count - 1) papi_exec.add(cmd, history=history, **args1). \ add(cmd, history=history, **args2) papi_exec.add(cmd, **args1).add(cmd, **args2) @@ -325,92 +337,101 @@ class TestConfig(object): :type ip_step: int :type bd_id_start: int """ - dst_ip_addr_start = ip_address(unicode(dst_ip_start)) + dst_ip_start = ip_address(dst_ip_start) if vxlan_count > 1: sw_idx_vxlan = Topology.get_interface_sw_index(node, node_vxlan_if) - tmp_fn = '/tmp/configure_routes_and_bridge_domains.config' commands = list() - for i in xrange(0, vxlan_count): - dst_ip = dst_ip_addr_start + i * ip_step + for i in range(0, vxlan_count): + dst_ip = dst_ip_start + i * ip_step commands.append( - 'ip_neighbor_add_del sw_if_index {sw_idx} dst {ip} ' - 'mac {mac}\n'.format( - sw_idx=sw_idx_vxlan, - ip=dst_ip, - mac=Topology.get_interface_mac(op_node, op_node_if))) + f"ip_neighbor_add_del sw_if_index {sw_idx_vxlan} " + f"dst {dst_ip} " + f"mac {Topology.get_interface_mac(op_node, op_node_if)}\n" + ) commands.append( - 'ip_route_add_del {ip}/{ip_len} count 1 via {ip} ' - 'sw_if_index {sw_idx}\n'.format( - ip=dst_ip, - ip_len=128 if dst_ip.version == 6 else 32, - sw_idx=sw_idx_vxlan)) + f"ip_route_add_del " + f"{dst_ip}/{128 if dst_ip.version == 6 else 32} count 1 " + f"via {dst_ip} sw_if_index {sw_idx_vxlan}\n" + ) + sw_idx_vxlan = Topology.get_interface_sw_index( + node, f"vxlan_tunnel{i + 1}" + ) commands.append( - 'sw_interface_set_l2_bridge sw_if_index {sw_idx} ' - 'bd_id {bd_id} shg 0 enable\n'.format( - sw_idx=Topology.get_interface_sw_index( - node, 'vxlan_tunnel{nr}'.format(nr=i + 1)), - bd_id=bd_id_start + i)) + f"sw_interface_set_l2_bridge sw_if_index {sw_idx_vxlan} " + f"bd_id {bd_id_start + i} shg 0 enable\n" + ) + sw_idx_vlan = Topology.get_interface_sw_index( + node, f"vlan_subif{i + 1}" + ) commands.append( - 'sw_interface_set_l2_bridge sw_if_index {sw_idx} ' - 'bd_id {bd_id} shg 0 enable\n'.format( - sw_idx=Topology.get_interface_sw_index( - node, 'vlan_subif{nr}'.format(nr=i + 1)), - bd_id=bd_id_start + i)) - VatExecutor().write_and_execute_script(node, tmp_fn, commands) + f"sw_interface_set_l2_bridge sw_if_index {sw_idx_vlan} " + f"bd_id {bd_id_start + i} shg 0 enable\n" + ) + VatExecutor().write_and_execute_script( + node, u"/tmp/configure_routes_and_bridge_domains.config", + commands + ) return - cmd1 = 'ip_neighbor_add_del' + cmd1 = u"ip_neighbor_add_del" neighbor = dict( sw_if_index=Topology.get_interface_sw_index(node, node_vxlan_if), flags=0, mac_address=Topology.get_interface_mac(op_node, op_node_if), - ip_address='') + ip_address=u"" + ) args1 = dict( is_add=1, - neighbor=neighbor) - cmd2 = 'ip_route_add_del' + neighbor=neighbor + ) + cmd2 = u"ip_route_add_del" kwargs = dict( interface=node_vxlan_if, - gateway=str(dst_ip_addr_start)) + gateway=str(dst_ip_start) + ) route = IPUtil.compose_vpp_route_structure( - node, - str(dst_ip_addr_start), - 128 if dst_ip_addr_start.version == 6 else 32, - **kwargs) + node, str(dst_ip_start), + 128 if dst_ip_start.version == 6 else 32, **kwargs + ) args2 = dict( is_add=1, is_multipath=0, - route=route) - cmd3 = 'sw_interface_set_l2_bridge' + route=route + ) + cmd3 = u"sw_interface_set_l2_bridge" args3 = dict( rx_sw_if_index=None, bd_id=None, shg=0, port_type=0, - enable=1) + enable=1 + ) args4 = dict( rx_sw_if_index=None, bd_id=None, shg=0, port_type=0, - enable=1) + enable=1 + ) with PapiSocketExecutor(node) as papi_exec: - for i in xrange(0, vxlan_count): - dst_ip = dst_ip_addr_start + i * ip_step - args1['neighbor']['ip_address'] = str(dst_ip) - args2['route']['prefix']['address']['un'] = \ - IPUtil.union_addr(dst_ip) - args2['route']['paths'][0]['nh']['address'] = \ - IPUtil.union_addr(dst_ip) - args3['rx_sw_if_index'] = Topology.get_interface_sw_index( - node, 'vxlan_tunnel{nr}'.format(nr=i+1)) - args3['bd_id'] = int(bd_id_start+i) - args4['rx_sw_if_index'] = Topology.get_interface_sw_index( - node, 'vlan_subif{nr}'.format(nr=i+1)) - args4['bd_id'] = int(bd_id_start+i) - history = False if 1 < i < vxlan_count else True + for i in range(0, vxlan_count): + args1[u"neighbor"][u"ip_address"] = \ + str(dst_ip_start + i * ip_step) + args2[u"route"][u"prefix"][u"address"][u"un"] = \ + IPUtil.union_addr(dst_ip_start + i * ip_step) + args2[u"route"][u"paths"][0][u"nh"][u"address"] = \ + IPUtil.union_addr(dst_ip_start + i * ip_step) + args3[u"rx_sw_if_index"] = Topology.get_interface_sw_index( + node, f"vxlan_tunnel{i+1}" + ) + args3[u"bd_id"] = int(bd_id_start+i) + args4[u"rx_sw_if_index"] = Topology.get_interface_sw_index( + node, f"vlan_subif{i+1}" + ) + args4[u"bd_id"] = int(bd_id_start+i) + history = bool(not 1 < i < vxlan_count - 1) papi_exec.add(cmd1, history=history, **args1). \ add(cmd2, history=history, **args2). \ add(cmd3, history=history, **args3). \