X-Git-Url: https://gerrit.fd.io/r/gitweb?p=csit.git;a=blobdiff_plain;f=resources%2Flibraries%2Fpython%2FL2Util.py;h=a49c55665340bed3b9f70f3807dbd9713fb55d53;hp=4ca0c47308ce3dbab9739400edcdb2a08ee0c3be;hb=d68951ac245150eeefa6e0f4156e4c1b5c9e9325;hpb=ed0258a440cfad7023d643f717ab78ac568dc59b diff --git a/resources/libraries/python/L2Util.py b/resources/libraries/python/L2Util.py index 4ca0c47308..a49c556653 100644 --- a/resources/libraries/python/L2Util.py +++ b/resources/libraries/python/L2Util.py @@ -13,9 +13,6 @@ """L2 Utilities Library.""" -import binascii -from textwrap import wrap - from enum import IntEnum from resources.libraries.python.Constants import Constants @@ -37,7 +34,7 @@ class L2VtrOp(IntEnum): L2_VTR_TRANSLATE_2_2 = 8 -class L2Util(object): +class L2Util: """Utilities for l2 configuration.""" @staticmethod @@ -50,7 +47,7 @@ class L2Util(object): :returns: Integer representation of MAC address. :rtype: int """ - return int(mac_str.replace(':', ''), 16) + return int(mac_str.replace(u":", u""), 16) @staticmethod def int_to_mac(mac_int): @@ -62,7 +59,9 @@ class L2Util(object): :returns: String representation of MAC address. :rtype: str """ - return ':'.join(wrap("{:012x}".format(mac_int), width=2)) + return u":".join( + f"{hex(mac_int)[2:]:0>12}"[i:i+2] for i in range(0, 12, 2) + ) @staticmethod def mac_to_bin(mac_str): @@ -72,9 +71,9 @@ class L2Util(object): :param mac_str: MAC address in string representation. :type mac_str: str :returns: Binary representation of MAC address. - :rtype: binary + :rtype: bytes """ - return binascii.unhexlify(mac_str.replace(':', '')) + return bytes.fromhex(mac_str.replace(u":", u"")) @staticmethod def bin_to_mac(mac_bin): @@ -82,17 +81,15 @@ class L2Util(object): (\x01\x02\x03\x04\x05\x06) to string format (e.g. 01:02:03:04:05:06). :param mac_bin: MAC address in binary representation. - :type mac_bin: binary + :type mac_bin: bytes :returns: String representation of MAC address. :rtype: str """ - mac_str = ':'.join(binascii.hexlify(mac_bin)[i:i + 2] - for i in range(0, 12, 2)) - return str(mac_str.decode('ascii')) + return u":".join(mac_bin.hex()[i:i + 2] for i in range(0, 12, 2)) @staticmethod - def vpp_add_l2fib_entry(node, mac, interface, bd_id, static_mac=1, - filter_mac=0, bvi_mac=0): + def vpp_add_l2fib_entry( + node, mac, interface, bd_id, static_mac=1, filter_mac=0, bvi_mac=0): """ Create a static L2FIB entry on a VPP node. :param node: Node to add L2FIB entry on. @@ -113,28 +110,29 @@ class L2Util(object): :type filter_mac: int or str :type bvi_mac: int or str """ - - if isinstance(interface, basestring): + if isinstance(interface, str): sw_if_index = Topology.get_interface_sw_index(node, interface) else: sw_if_index = interface - cmd = 'l2fib_add_del' - err_msg = 'Failed to add L2FIB entry on host {host}'.format( - host=node['host']) - args = dict(mac=L2Util.mac_to_bin(mac), - bd_id=int(bd_id), - sw_if_index=sw_if_index, - is_add=1, - static_mac=int(static_mac), - filter_mac=int(filter_mac), - bvi_mac=int(bvi_mac)) + cmd = u"l2fib_add_del" + err_msg = f"Failed to add L2FIB entry on host {node[u'host']}" + args = dict( + mac=L2Util.mac_to_bin(mac), + bd_id=int(bd_id), + sw_if_index=sw_if_index, + is_add=1, + static_mac=int(static_mac), + filter_mac=int(filter_mac), + bvi_mac=int(bvi_mac) + + ) with PapiSocketExecutor(node) as papi_exec: papi_exec.add(cmd, **args).get_reply(err_msg) @staticmethod - def create_l2_bd(node, bd_id, flood=1, uu_flood=1, forward=1, learn=1, - arp_term=0): + def create_l2_bd( + node, bd_id, flood=1, uu_flood=1, forward=1, learn=1, arp_term=0): """Create an L2 bridge domain on a VPP node. :param node: Node where we wish to crate the L2 bridge domain. @@ -157,17 +155,17 @@ class L2Util(object): :type learn: int or str :type arp_term: int or str """ - - cmd = 'bridge_domain_add_del' - err_msg = 'Failed to create L2 bridge domain on host {host}'.format( - host=node['host']) - args = dict(bd_id=int(bd_id), - flood=int(flood), - uu_flood=int(uu_flood), - forward=int(forward), - learn=int(learn), - arp_term=int(arp_term), - is_add=1) + cmd = u"bridge_domain_add_del" + err_msg = f"Failed to create L2 bridge domain on host {node[u'host']}" + args = dict( + bd_id=int(bd_id), + flood=int(flood), + uu_flood=int(uu_flood), + forward=int(forward), + learn=int(learn), + arp_term=int(arp_term), + is_add=1 + ) with PapiSocketExecutor(node) as papi_exec: papi_exec.add(cmd, **args).get_reply(err_msg) @@ -189,17 +187,19 @@ class L2Util(object): :type shg: int or str :type port_type: int or str """ - sw_if_index = Topology.get_interface_sw_index(node, interface) - cmd = 'sw_interface_set_l2_bridge' - err_msg = 'Failed to add interface {ifc} to L2 bridge domain on host ' \ - '{host}'.format(ifc=interface, host=node['host']) - args = dict(rx_sw_if_index=sw_if_index, - bd_id=int(bd_id), - shg=int(shg), - port_type=int(port_type), - enable=1) + cmd = u"sw_interface_set_l2_bridge" + err_msg = f"Failed to add interface {interface} to L2 bridge domain " \ + f"on host {node[u'host']}" + args = dict( + rx_sw_if_index=sw_if_index, + bd_id=int(bd_id), + shg=int(shg), + port_type=int(port_type), + enable=1 + ) + with PapiSocketExecutor(node) as papi_exec: papi_exec.add(cmd, **args).get_reply(err_msg) @@ -218,35 +218,40 @@ class L2Util(object): :type port_2: str :type learn: bool """ - sw_if_index1 = Topology.get_interface_sw_index(node, port_1) sw_if_index2 = Topology.get_interface_sw_index(node, port_2) learn_int = 1 if learn else 0 - cmd1 = 'bridge_domain_add_del' - args1 = dict(bd_id=int(bd_id), - flood=1, - uu_flood=1, - forward=1, - learn=learn_int, - arp_term=0, - is_add=1) - - cmd2 = 'sw_interface_set_l2_bridge' - args2 = dict(rx_sw_if_index=sw_if_index1, - bd_id=int(bd_id), - shg=0, - port_type=0, - enable=1) - - args3 = dict(rx_sw_if_index=sw_if_index2, - bd_id=int(bd_id), - shg=0, - port_type=0, - enable=1) - - err_msg = 'Failed to add L2 bridge domain with 2 interfaces on host' \ - ' {host}'.format(host=node['host']) + cmd1 = u"bridge_domain_add_del" + args1 = dict( + bd_id=int(bd_id), + flood=1, + uu_flood=1, + forward=1, + learn=learn_int, + arp_term=0, + is_add=1 + ) + + cmd2 = u"sw_interface_set_l2_bridge" + args2 = dict( + rx_sw_if_index=sw_if_index1, + bd_id=int(bd_id), + shg=0, + port_type=0, + enable=1 + ) + + args3 = dict( + rx_sw_if_index=sw_if_index2, + bd_id=int(bd_id), + shg=0, + port_type=0, + enable=1 + ) + + err_msg = f"Failed to add L2 bridge domain with 2 interfaces " \ + f"on host {node[u'host']}" with PapiSocketExecutor(node) as papi_exec: papi_exec.add(cmd1, **args1).add(cmd2, **args2).add(cmd2, **args3) @@ -263,27 +268,29 @@ class L2Util(object): :type interface1: str or int :type interface2: str or int """ - - if isinstance(interface1, basestring): + if isinstance(interface1, str): sw_iface1 = Topology().get_interface_sw_index(node, interface1) else: sw_iface1 = interface1 - if isinstance(interface2, basestring): + if isinstance(interface2, str): sw_iface2 = Topology().get_interface_sw_index(node, interface2) else: sw_iface2 = interface2 - cmd = 'sw_interface_set_l2_xconnect' - args1 = dict(rx_sw_if_index=sw_iface1, - tx_sw_if_index=sw_iface2, - enable=1) - args2 = dict(rx_sw_if_index=sw_iface2, - tx_sw_if_index=sw_iface1, - enable=1) - - err_msg = 'Failed to add L2 cross-connect between two interfaces on' \ - ' host {host}'.format(host=node['host']) + cmd = u"sw_interface_set_l2_xconnect" + args1 = dict( + rx_sw_if_index=sw_iface1, + tx_sw_if_index=sw_iface2, + enable=1 + ) + args2 = dict( + rx_sw_if_index=sw_iface2, + tx_sw_if_index=sw_iface1, + enable=1 + ) + err_msg = f"Failed to add L2 cross-connect between two interfaces " \ + f"on host {node['host']}" with PapiSocketExecutor(node) as papi_exec: papi_exec.add(cmd, **args1).add(cmd, **args2).get_replies(err_msg) @@ -299,27 +306,29 @@ class L2Util(object): :type interface1: str or int :type interface2: str or int """ - - if isinstance(interface1, basestring): + if isinstance(interface1, str): sw_iface1 = Topology().get_interface_sw_index(node, interface1) else: sw_iface1 = interface1 - if isinstance(interface2, basestring): + if isinstance(interface2, str): sw_iface2 = Topology().get_interface_sw_index(node, interface2) else: sw_iface2 = interface2 - cmd = 'l2_patch_add_del' - args1 = dict(rx_sw_if_index=sw_iface1, - tx_sw_if_index=sw_iface2, - is_add=1) - args2 = dict(rx_sw_if_index=sw_iface2, - tx_sw_if_index=sw_iface1, - is_add=1) - - err_msg = 'Failed to add L2 patch between two interfaces on' \ - ' host {host}'.format(host=node['host']) + cmd = u"l2_patch_add_del" + args1 = dict( + rx_sw_if_index=sw_iface1, + tx_sw_if_index=sw_iface2, + is_add=1 + ) + args2 = dict( + rx_sw_if_index=sw_iface2, + tx_sw_if_index=sw_iface1, + is_add=1 + ) + err_msg = f"Failed to add L2 patch between two interfaces " \ + f"on host {node['host']}" with PapiSocketExecutor(node) as papi_exec: papi_exec.add(cmd, **args1).add(cmd, **args2).get_replies(err_msg) @@ -340,15 +349,17 @@ class L2Util(object): :type if_2: str :type set_up: bool """ - - cmd = 'brctl addbr {0}'.format(br_name) + cmd = f"brctl addbr {br_name}" exec_cmd_no_error(node, cmd, sudo=True) - cmd = 'brctl addif {0} {1}'.format(br_name, if_1) + + cmd = f"brctl addif {br_name} {if_1}" exec_cmd_no_error(node, cmd, sudo=True) - cmd = 'brctl addif {0} {1}'.format(br_name, if_2) + + cmd = f"brctl addif {br_name} {if_2}" exec_cmd_no_error(node, cmd, sudo=True) + if set_up: - cmd = 'ip link set dev {0} up'.format(br_name) + cmd = f"ip link set dev {br_name} up" exec_cmd_no_error(node, cmd, sudo=True) @staticmethod @@ -366,15 +377,15 @@ class L2Util(object): :type br_name: str :type set_down: bool """ - if set_down: - cmd = 'ip link set dev {0} down'.format(br_name) + cmd = f"ip link set dev {br_name} down" exec_cmd_no_error(node, cmd, sudo=True) - cmd = 'brctl delbr {0}'.format(br_name) + + cmd = f"brctl delbr {br_name}" exec_cmd_no_error(node, cmd, sudo=True) @staticmethod - def vpp_get_bridge_domain_data(node, bd_id=0xffffffff): + def vpp_get_bridge_domain_data(node, bd_id=Constants.BITWISE_NON_ZERO): """Get all bridge domain data from a VPP node. If a domain ID number is provided, return only data for the matching bridge domain. @@ -386,23 +397,27 @@ class L2Util(object): or a single dictionary for the specified bridge domain. :rtype: list or dict """ + cmd = u"bridge_domain_dump" + args = dict( + bd_id=int(bd_id) + ) + err_msg = f"Failed to get L2FIB dump on host {node[u'host']}" - cmd = 'bridge_domain_dump' - args = dict(bd_id=int(bd_id)) - err_msg = 'Failed to get L2FIB dump on host {host}'.format( - host=node['host']) with PapiSocketExecutor(node) as papi_exec: details = papi_exec.add(cmd, **args).get_details(err_msg) - if bd_id == Constants.BITWISE_NON_ZERO: - return details + retval = details if bd_id == Constants.BITWISE_NON_ZERO else None + for bridge_domain in details: - if bridge_domain['bd_id'] == bd_id: - return bridge_domain + if bridge_domain[u"bd_id"] == bd_id: + retval = bridge_domain + + return retval @staticmethod - def l2_vlan_tag_rewrite(node, interface, tag_rewrite_method, - push_dot1q=True, tag1_id=None, tag2_id=None): + def l2_vlan_tag_rewrite( + node, interface, tag_rewrite_method, push_dot1q=True, tag1_id=None, + tag2_id=None): """Rewrite tags in ethernet frame. :param node: Node to rewrite tags. @@ -419,27 +434,29 @@ class L2Util(object): :type tag1_id: int :type tag2_id: int """ - tag1_id = int(tag1_id) if tag1_id else 0 tag2_id = int(tag2_id) if tag2_id else 0 - vtr_oper = getattr(L2VtrOp, 'L2_VTR_{}'.format( - tag_rewrite_method.replace('-', '_').upper())) + vtr_oper = getattr( + L2VtrOp, f"L2_VTR_{tag_rewrite_method.replace(u'-', u'_').upper()}" + ) - if isinstance(interface, basestring): + if isinstance(interface, str): iface_key = Topology.get_interface_by_name(node, interface) sw_if_index = Topology.get_interface_sw_index(node, iface_key) else: sw_if_index = interface - cmd = 'l2_interface_vlan_tag_rewrite' - args = dict(sw_if_index=sw_if_index, - vtr_op=int(vtr_oper), - push_dot1q=int(push_dot1q), - tag1=tag1_id, - tag2=tag2_id) - err_msg = 'Failed to set VLAN TAG rewrite on host {host}'.format( - host=node['host']) + cmd = u"l2_interface_vlan_tag_rewrite" + args = dict( + sw_if_index=sw_if_index, + vtr_op=int(vtr_oper), + push_dot1q=int(push_dot1q), + tag1=tag1_id, + tag2=tag2_id + ) + err_msg = f"Failed to set VLAN TAG rewrite on host {node['host']}" + with PapiSocketExecutor(node) as papi_exec: papi_exec.add(cmd, **args).get_reply(err_msg) @@ -454,16 +471,17 @@ class L2Util(object): :returns: L2 FIB table. :rtype: list """ + cmd = u"l2_fib_table_dump" + args = dict( + bd_id=int(bd_id) + ) + err_msg = f"Failed to get L2FIB dump on host {node['host']}" - cmd = 'l2_fib_table_dump' - args = dict(bd_id=int(bd_id)) - err_msg = 'Failed to get L2FIB dump on host {host}'.format( - host=node['host']) with PapiSocketExecutor(node) as papi_exec: details = papi_exec.add(cmd, **args).get_details(err_msg) for fib_item in details: - fib_item['mac'] = L2Util.bin_to_mac(fib_item['mac']) + fib_item[u"mac"] = L2Util.bin_to_mac(fib_item[u"mac"]) return details @@ -480,13 +498,12 @@ class L2Util(object): :returns: L2 FIB entry :rtype: dict """ - bd_data = L2Util.vpp_get_bridge_domain_data(node) - bd_id = bd_data[bd_index-1]['bd_id'] + bd_id = bd_data[bd_index-1][u"bd_id"] table = L2Util.get_l2_fib_table(node, bd_id) for entry in table: - if entry['mac'] == mac: + if entry[u"mac"] == mac: return entry return {}