-# Copyright (c) 2019 Cisco and/or its affiliates.
+# Copyright (c) 2021 Cisco and/or its affiliates.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
from robot.api import logger
from resources.libraries.python.Constants import Constants
-from resources.libraries.python.InterfaceUtil import InterfaceUtil
+from resources.libraries.python.InterfaceUtil import InterfaceUtil, \
+ InterfaceStatusFlags
+from resources.libraries.python.IPAddress import IPAddress
from resources.libraries.python.IPUtil import IPUtil
-from resources.libraries.python.PapiExecutor import PapiExecutor
+from resources.libraries.python.PapiExecutor import PapiSocketExecutor
from resources.libraries.python.topology import Topology
from resources.libraries.python.VatExecutor import VatExecutor
-class TestConfig(object):
+class TestConfig:
"""Contains special test configurations implemented in python for faster
execution."""
# configure IPs, create VXLAN interfaces and VLAN sub-interfaces
vxlan_count = TestConfig.vpp_create_vxlan_and_vlan_interfaces(
node, node_vxlan_if, node_vlan_if, n_tunnels, vni_start,
- src_ip_start, dst_ip_start, ip_step)
+ src_ip_start, dst_ip_start, ip_step
+ )
# update topology with VXLAN interfaces and VLAN sub-interfaces data
# and put interfaces up
TestConfig.vpp_put_vxlan_and_vlan_interfaces_up(
- node, vxlan_count, node_vlan_if)
+ node, vxlan_count, node_vlan_if
+ )
# configure bridge domains, ARPs and routes
TestConfig.vpp_put_vxlan_and_vlan_interfaces_to_bridge_domain(
node, node_vxlan_if, vxlan_count, op_node, op_node_if, dst_ip_start,
- ip_step, bd_id_start)
+ ip_step, bd_id_start
+ )
@staticmethod
def vpp_create_vxlan_and_vlan_interfaces(
:returns: Number of created VXLAN interfaces.
:rtype: int
"""
- src_ip_addr_start = ip_address(unicode(src_ip_start))
- dst_ip_addr_start = ip_address(unicode(dst_ip_start))
+ src_ip_start = ip_address(src_ip_start)
+ dst_ip_start = ip_address(dst_ip_start)
if vxlan_count > 10:
commands = list()
- tmp_fn = '/tmp/create_vxlan_interfaces.config'
- for i in xrange(0, vxlan_count):
+ for i in range(0, vxlan_count):
try:
- src_ip = src_ip_addr_start + i * ip_step
- dst_ip = dst_ip_addr_start + i * ip_step
+ src_ip = src_ip_start + i * ip_step
+ dst_ip = dst_ip_start + i * ip_step
except AddressValueError:
- logger.warn("Can't do more iterations - IP address limit "
- "has been reached.")
+ logger.warn(
+ u"Can't do more iterations - IP address limit "
+ u"has been reached."
+ )
vxlan_count = i
break
commands.append(
- 'sw_interface_add_del_address sw_if_index {sw_idx} '
- '{ip}/{ip_len}\n'.format(
- sw_idx=Topology.get_interface_sw_index(
- node, node_vxlan_if),
- ip=src_ip,
- ip_len=128 if src_ip.version == 6 else 32))
+ f"sw_interface_add_del_address sw_if_index "
+ f"{Topology.get_interface_sw_index(node, node_vxlan_if)} "
+ f"{src_ip}/{128 if src_ip.version == 6 else 32}\n"
+ )
commands.append(
- 'vxlan_add_del_tunnel src {src_ip} dst {dst_ip} vni {vni}\n'
- .format(src_ip=src_ip, dst_ip=dst_ip,
- vni=vni_start + i))
+ f"vxlan_add_del_tunnel src {src_ip} dst {dst_ip} "
+ f"vni {vni_start + i}\n"
+ )
commands.append(
- 'create_vlan_subif sw_if_index {sw_idx} vlan {vlan}\n'
- .format(sw_idx=Topology.get_interface_sw_index(
- node, node_vlan_if), vlan=i + 1))
- VatExecutor().write_and_execute_script(node, tmp_fn, commands)
+ f"create_vlan_subif sw_if_index "
+ f"{Topology.get_interface_sw_index(node, node_vlan_if)} "
+ f"vlan {i + 1}\n"
+ )
+ VatExecutor().write_and_execute_script(
+ node, u"/tmp/create_vxlan_interfaces.config", commands
+ )
return vxlan_count
- cmd1 = 'sw_interface_add_del_address'
+ cmd1 = u"sw_interface_add_del_address"
args1 = dict(
sw_if_index=InterfaceUtil.get_interface_index(node, node_vxlan_if),
- is_add=1,
- is_ipv6=1 if src_ip_addr_start.version == 6 else 0,
- del_all=0,
- address_length=128 if src_ip_addr_start.version == 6 else 32,
- address=None)
- cmd2 = 'vxlan_add_del_tunnel'
+ is_add=True,
+ del_all=False,
+ prefix=None
+ )
+ cmd2 = u"vxlan_add_del_tunnel_v3"
args2 = dict(
- is_add=1,
- is_ipv6=0,
+ is_add=True,
instance=Constants.BITWISE_NON_ZERO,
src_address=None,
dst_address=None,
mcast_sw_if_index=Constants.BITWISE_NON_ZERO,
encap_vrf_id=0,
decap_next_index=Constants.BITWISE_NON_ZERO,
- vni=None)
- cmd3 = 'create_vlan_subif'
+ vni=None
+ )
+ cmd3 = u"create_vlan_subif"
args3 = dict(
sw_if_index=InterfaceUtil.get_interface_index(
node, node_vlan_if),
- vlan_id=None)
- err_msg = 'Failed to create VXLAN and VLAN interfaces on host {host}'.\
- format(host=node['host'])
+ vlan_id=None
+ )
- with PapiExecutor(node) as papi_exec:
- for i in xrange(0, vxlan_count):
+ with PapiSocketExecutor(node) as papi_exec:
+ for i in range(0, vxlan_count):
try:
- src_ip = src_ip_addr_start + i * ip_step
- dst_ip = dst_ip_addr_start + i * ip_step
+ src_ip = src_ip_start + i * ip_step
+ dst_ip = dst_ip_start + i * ip_step
except AddressValueError:
- logger.warn("Can't do more iterations - IP address limit "
- "has been reached.")
+ logger.warn(
+ u"Can't do more iterations - IP address limit "
+ u"has been reached."
+ )
vxlan_count = i
break
- args1['address'] = src_ip.packed
- args2['src_address'] = src_ip.packed
- args2['dst_address'] = dst_ip.packed
- args2['vni'] = int(vni_start) + i
- args3['vlan_id'] = i + 1
- history = False if 1 < i < vxlan_count else True
+ args1[u"prefix"] = IPUtil.create_prefix_object(
+ src_ip, 128 if src_ip_start.version == 6 else 32
+ )
+ args2[u"src_address"] = IPAddress.create_ip_address_object(
+ src_ip
+ )
+ args2[u"dst_address"] = IPAddress.create_ip_address_object(
+ dst_ip
+ )
+ args2[u"vni"] = int(vni_start) + i
+ args3[u"vlan_id"] = i + 1
+ history = bool(not 1 < i < vxlan_count - 1)
papi_exec.add(cmd1, history=history, **args1).\
add(cmd2, history=history, **args2).\
add(cmd3, history=history, **args3)
- if i > 0 and i % (Constants.PAPI_MAX_API_BULK / 3) == 0:
- papi_exec.get_replies(err_msg).verify_replies(
- err_msg=err_msg)
- papi_exec.get_replies().verify_replies()
+ papi_exec.get_replies()
return vxlan_count
:type node_vlan_if: str
"""
if_data = InterfaceUtil.vpp_get_interface_data(node)
- vlan_if_name = Topology.get_interface_name(node, node_vlan_if)
-
if vxlan_count > 10:
- tmp_fn = '/tmp/put_subinterfaces_up.config'
commands = list()
- for i in xrange(0, vxlan_count):
- vxlan_subif_key = Topology.add_new_port(node, 'vxlan_tunnel')
- vxlan_subif_name = 'vxlan_tunnel{nr}'.format(nr=i)
- vxlan_found = False
+ for i in range(0, vxlan_count):
+ vxlan_subif_key = Topology.add_new_port(node, u"vxlan_tunnel")
+ vxlan_subif_name = f"vxlan_tunnel{i}"
+ founds = dict(vxlan=False, vlan=False)
vxlan_subif_idx = None
- vlan_subif_key = Topology.add_new_port(node, 'vlan_subif')
- vlan_subif_name = '{if_name}.{vlan}'.format(
- if_name=vlan_if_name, vlan=i + 1)
- vlan_found = False
+ vlan_subif_key = Topology.add_new_port(node, u"vlan_subif")
+ vlan_subif_name = \
+ f"{Topology.get_interface_name(node, node_vlan_if)}.{i + 1}"
vlan_idx = None
for data in if_data:
- if_name = data['interface_name']
- if not vxlan_found and if_name == vxlan_subif_name:
- vxlan_subif_idx = data['sw_if_index']
- vxlan_found = True
- elif not vlan_found and if_name == vlan_subif_name:
- vlan_idx = data['sw_if_index']
- vlan_found = True
- if vxlan_found and vlan_found:
+ if_name = data[u"interface_name"]
+ if not founds[u"vxlan"] and if_name == vxlan_subif_name:
+ vxlan_subif_idx = data[u"sw_if_index"]
+ founds[u"vxlan"] = True
+ elif not founds[u"vlan"] and if_name == vlan_subif_name:
+ vlan_idx = data[u"sw_if_index"]
+ founds[u"vlan"] = True
+ if founds[u"vxlan"] and founds[u"vlan"]:
break
Topology.update_interface_sw_if_index(
node, vxlan_subif_key, vxlan_subif_idx)
Topology.update_interface_name(
node, vxlan_subif_key, vxlan_subif_name)
commands.append(
- 'sw_interface_set_flags sw_if_index {sw_idx} admin-up '
- 'link-up\n'.format(sw_idx=vxlan_subif_idx))
+ f"sw_interface_set_flags sw_if_index {vxlan_subif_idx} "
+ f"admin-up link-up\n"
+ )
Topology.update_interface_sw_if_index(
- node, vlan_subif_key, vlan_idx)
+ node, vlan_subif_key, vlan_idx
+ )
Topology.update_interface_name(
- node, vlan_subif_key, vlan_subif_name)
+ node, vlan_subif_key, vlan_subif_name
+ )
commands.append(
- 'sw_interface_set_flags sw_if_index {sw_idx} admin-up '
- 'link-up\n'.format(sw_idx=vlan_idx))
- VatExecutor().write_and_execute_script(node, tmp_fn, commands)
+ f"sw_interface_set_flags sw_if_index {vlan_idx} admin-up "
+ f"link-up\n"
+ )
+ VatExecutor().write_and_execute_script(
+ node, u"/tmp/put_subinterfaces_up.config", commands
+ )
return
- cmd = 'sw_interface_set_flags'
+ cmd = u"sw_interface_set_flags"
args1 = dict(
sw_if_index=None,
- admin_up_down=1)
+ flags=InterfaceStatusFlags.IF_STATUS_API_FLAG_ADMIN_UP.value
+ )
args2 = dict(
sw_if_index=None,
- admin_up_down=1)
- err_msg = 'Failed to put VXLAN and VLAN interfaces up on host {host}'. \
- format(host=node['host'])
+ flags=InterfaceStatusFlags.IF_STATUS_API_FLAG_ADMIN_UP.value
+ )
- with PapiExecutor(node) as papi_exec:
- for i in xrange(0, vxlan_count):
- vxlan_subif_key = Topology.add_new_port(node, 'vxlan_tunnel')
- vxlan_subif_name = 'vxlan_tunnel{nr}'.format(nr=i)
- vxlan_found = False
+ with PapiSocketExecutor(node) as papi_exec:
+ for i in range(0, vxlan_count):
+ vxlan_subif_key = Topology.add_new_port(node, u"vxlan_tunnel")
+ vxlan_subif_name = f"vxlan_tunnel{i}"
+ founds = dict(vxlan=False, vlan=False)
vxlan_subif_idx = None
- vlan_subif_key = Topology.add_new_port(node, 'vlan_subif')
- vlan_subif_name = '{if_name}.{vlan}'.format(
- if_name=vlan_if_name, vlan=i+1)
- vlan_found = False
+ vlan_subif_key = Topology.add_new_port(node, u"vlan_subif")
+ vlan_subif_name = \
+ f"{Topology.get_interface_name(node, node_vlan_if)}.{i+1}"
vlan_idx = None
for data in if_data:
- if not vxlan_found \
- and data['interface_name'] == vxlan_subif_name:
- vxlan_subif_idx = data['sw_if_index']
- vxlan_found = True
- elif not vlan_found \
- and data['interface_name'] == vlan_subif_name:
- vlan_idx = data['sw_if_index']
- vlan_found = True
- if vxlan_found and vlan_found:
+ if not founds[u"vxlan"] \
+ and data[u"interface_name"] == vxlan_subif_name:
+ vxlan_subif_idx = data[u"sw_if_index"]
+ founds[u"vxlan"] = True
+ elif not founds[u"vlan"] \
+ and data[u"interface_name"] == vlan_subif_name:
+ vlan_idx = data[u"sw_if_index"]
+ founds[u"vlan"] = True
+ if founds[u"vxlan"] and founds[u"vlan"]:
break
Topology.update_interface_sw_if_index(
- node, vxlan_subif_key, vxlan_subif_idx)
+ node, vxlan_subif_key, vxlan_subif_idx
+ )
Topology.update_interface_name(
- node, vxlan_subif_key, vxlan_subif_name)
- args1['sw_if_index'] = vxlan_subif_idx
+ node, vxlan_subif_key, vxlan_subif_name
+ )
+ args1[u"sw_if_index"] = vxlan_subif_idx
Topology.update_interface_sw_if_index(
- node, vlan_subif_key, vlan_idx)
+ node, vlan_subif_key, vlan_idx
+ )
Topology.update_interface_name(
- node, vlan_subif_key, vlan_subif_name)
- args2['sw_if_index'] = vlan_idx
- history = False if 1 < i < vxlan_count else True
+ node, vlan_subif_key, vlan_subif_name
+ )
+ args2[u"sw_if_index"] = vlan_idx
+ history = bool(not 1 < i < vxlan_count - 1)
papi_exec.add(cmd, history=history, **args1). \
add(cmd, history=history, **args2)
- if i > 0 and i % (Constants.PAPI_MAX_API_BULK / 2) == 0:
- papi_exec.get_replies(err_msg).verify_replies(
- err_msg=err_msg)
papi_exec.add(cmd, **args1).add(cmd, **args2)
- papi_exec.get_replies().verify_replies()
+ papi_exec.get_replies()
@staticmethod
def vpp_put_vxlan_and_vlan_interfaces_to_bridge_domain(
:type ip_step: int
:type bd_id_start: int
"""
- dst_ip_addr_start = ip_address(unicode(dst_ip_start))
+ dst_ip_start = ip_address(dst_ip_start)
if vxlan_count > 1:
- sw_idx_vxlan = Topology.get_interface_sw_index(node, node_vxlan_if)
- tmp_fn = '/tmp/configure_routes_and_bridge_domains.config'
+ idx_vxlan_if = Topology.get_interface_sw_index(node, node_vxlan_if)
commands = list()
- for i in xrange(0, vxlan_count):
- dst_ip = dst_ip_addr_start + i * ip_step
+ for i in range(0, vxlan_count):
+ dst_ip = dst_ip_start + i * ip_step
commands.append(
- 'ip_neighbor_add_del sw_if_index {sw_idx} dst {ip} '
- 'mac {mac}\n'.format(
- sw_idx=sw_idx_vxlan,
- ip=dst_ip,
- mac=Topology.get_interface_mac(op_node, op_node_if)))
+ f"exec ip neighbor "
+ f"{Topology.get_interface_name(node, node_vxlan_if)} "
+ f"{dst_ip} "
+ f"{Topology.get_interface_mac(op_node, op_node_if)} static "
+ f"\n"
+ )
commands.append(
- 'ip_route_add_del {ip}/{ip_len} count 1 via {ip} '
- 'sw_if_index {sw_idx}\n'.format(
- ip=dst_ip,
- ip_len=128 if dst_ip.version == 6 else 32,
- sw_idx=sw_idx_vxlan))
+ f"ip_route_add_del "
+ f"{dst_ip}/{128 if dst_ip.version == 6 else 32} count 1 "
+ f"via {dst_ip} sw_if_index {idx_vxlan_if}\n"
+ )
+ sw_idx_vxlan = Topology.get_interface_sw_index(
+ node, f"vxlan_tunnel{i + 1}"
+ )
commands.append(
- 'sw_interface_set_l2_bridge sw_if_index {sw_idx} '
- 'bd_id {bd_id} shg 0 enable\n'.format(
- sw_idx=Topology.get_interface_sw_index(
- node, 'vxlan_tunnel{nr}'.format(nr=i + 1)),
- bd_id=bd_id_start + i))
+ f"sw_interface_set_l2_bridge sw_if_index {sw_idx_vxlan} "
+ f"bd_id {bd_id_start + i} shg 0 enable\n"
+ )
+ sw_idx_vlan = Topology.get_interface_sw_index(
+ node, f"vlan_subif{i + 1}"
+ )
commands.append(
- 'sw_interface_set_l2_bridge sw_if_index {sw_idx} '
- 'bd_id {bd_id} shg 0 enable\n'.format(
- sw_idx=Topology.get_interface_sw_index(
- node, 'vlan_subif{nr}'.format(nr=i + 1)),
- bd_id=bd_id_start + i))
- VatExecutor().write_and_execute_script(node, tmp_fn, commands)
+ f"sw_interface_set_l2_bridge sw_if_index {sw_idx_vlan} "
+ f"bd_id {bd_id_start + i} shg 0 enable\n"
+ )
+ VatExecutor().write_and_execute_script(
+ node, u"/tmp/configure_routes_and_bridge_domains.config",
+ commands
+ )
return
- cmd1 = 'ip_neighbor_add_del'
+ cmd1 = u"ip_neighbor_add_del"
neighbor = dict(
sw_if_index=Topology.get_interface_sw_index(node, node_vxlan_if),
flags=0,
mac_address=Topology.get_interface_mac(op_node, op_node_if),
- ip_address='')
+ ip_address=u""
+ )
args1 = dict(
is_add=1,
- neighbor=neighbor)
- cmd2 = 'ip_route_add_del'
+ neighbor=neighbor
+ )
+ cmd2 = u"ip_route_add_del"
kwargs = dict(
interface=node_vxlan_if,
- gateway=str(dst_ip_addr_start))
+ gateway=str(dst_ip_start)
+ )
route = IPUtil.compose_vpp_route_structure(
- node,
- str(dst_ip_addr_start),
- 128 if dst_ip_addr_start.version == 6 else 32,
- **kwargs)
+ node, str(dst_ip_start),
+ 128 if dst_ip_start.version == 6 else 32, **kwargs
+ )
args2 = dict(
is_add=1,
is_multipath=0,
- route=route)
- cmd3 = 'sw_interface_set_l2_bridge'
+ route=route
+ )
+ cmd3 = u"sw_interface_set_l2_bridge"
args3 = dict(
rx_sw_if_index=None,
bd_id=None,
shg=0,
port_type=0,
- enable=1)
+ enable=1
+ )
args4 = dict(
rx_sw_if_index=None,
bd_id=None,
shg=0,
port_type=0,
- enable=1)
- err_msg = 'Failed to put VXLAN and VLAN interfaces to bridge domain ' \
- 'on host {host}'.format(host=node['host'])
+ enable=1
+ )
- with PapiExecutor(node) as papi_exec:
- for i in xrange(0, vxlan_count):
- dst_ip = dst_ip_addr_start + i * ip_step
- args1['neighbor']['ip_address'] = str(dst_ip)
- args2['route']['prefix']['address']['un'] = \
- IPUtil.union_addr(dst_ip)
- args2['route']['paths'][0]['nh']['address'] = \
- IPUtil.union_addr(dst_ip)
- args3['rx_sw_if_index'] = Topology.get_interface_sw_index(
- node, 'vxlan_tunnel{nr}'.format(nr=i+1))
- args3['bd_id'] = int(bd_id_start+i)
- args4['rx_sw_if_index'] = Topology.get_interface_sw_index(
- node, 'vlan_subif{nr}'.format(nr=i+1))
- args4['bd_id'] = int(bd_id_start+i)
- history = False if 1 < i < vxlan_count else True
+ with PapiSocketExecutor(node) as papi_exec:
+ for i in range(0, vxlan_count):
+ args1[u"neighbor"][u"ip_address"] = \
+ str(dst_ip_start + i * ip_step)
+ args2[u"route"][u"prefix"][u"address"][u"un"] = \
+ IPAddress.union_addr(dst_ip_start + i * ip_step)
+ args2[u"route"][u"paths"][0][u"nh"][u"address"] = \
+ IPAddress.union_addr(dst_ip_start + i * ip_step)
+ args3[u"rx_sw_if_index"] = Topology.get_interface_sw_index(
+ node, f"vxlan_tunnel{i+1}"
+ )
+ args3[u"bd_id"] = int(bd_id_start+i)
+ args4[u"rx_sw_if_index"] = Topology.get_interface_sw_index(
+ node, f"vlan_subif{i+1}"
+ )
+ args4[u"bd_id"] = int(bd_id_start+i)
+ history = bool(not 1 < i < vxlan_count - 1)
papi_exec.add(cmd1, history=history, **args1). \
add(cmd2, history=history, **args2). \
add(cmd3, history=history, **args3). \
add(cmd3, history=history, **args4)
- if i > 0 and i % (Constants.PAPI_MAX_API_BULK / 4) == 0:
- papi_exec.get_replies(err_msg).verify_replies(
- err_msg=err_msg)
- papi_exec.get_replies().verify_replies()
+ papi_exec.get_replies()