X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;ds=sidebyside;f=src%2Fvpp-api%2Fpython%2Fvpp_papi%2Fvpp_serializer.py;h=d8461398765ff2157ea8868cd786ae843f47a761;hb=ac0babd412e3b5282136a5c5c5be2c4cc4be6895;hp=7c7f331b04c68d934a02dbecc3c4eb293fda9585;hpb=a089ae1294c8a49555fd95a905e2caa04cb7f900;p=vpp.git diff --git a/src/vpp-api/python/vpp_papi/vpp_serializer.py b/src/vpp-api/python/vpp_papi/vpp_serializer.py index 7c7f331b04c..d8461398765 100644 --- a/src/vpp-api/python/vpp_papi/vpp_serializer.py +++ b/src/vpp-api/python/vpp_papi/vpp_serializer.py @@ -13,35 +13,25 @@ # limitations under the License. # import collections +from enum import IntFlag import logging import socket import struct import sys -if sys.version_info <= (3, 4): - from aenum import IntEnum # noqa: F401 -else: - from enum import IntEnum # noqa: F401 +from . import vpp_format -if sys.version_info <= (3, 6): - from aenum import IntFlag # noqa: F401 -else: - - from enum import IntFlag # noqa: F401 - -from . import vpp_format # noqa: E402 # # Set log-level in application by doing e.g.: # logger = logging.getLogger('vpp_serializer') # logger.setLevel(logging.DEBUG) # -logger = logging.getLogger(__name__) +logger = logging.getLogger("vpp_papi.serializer") + -if sys.version[0] == '2': - def check(d): type(d) is dict -else: - def check(d): type(d) is dict or type(d) is bytes +def check(d): + return type(d) is dict or type(d) is bytes def conversion_required(data, field_type): @@ -56,8 +46,7 @@ def conversion_required(data, field_type): def conversion_packer(data, field_type): t = type(data).__name__ - return types[field_type].pack(vpp_format. - conversion_table[field_type][t](data)) + return types[field_type].pack(vpp_format.conversion_table[field_type][t](data)) def conversion_unpacker(data, field_type): @@ -66,33 +55,63 @@ def conversion_unpacker(data, field_type): return vpp_format.conversion_unpacker_table[field_type](data) -class BaseTypes(object): +class Packer: + options = {} + + def pack(self, data, kwargs): + raise NotImplementedError + + def unpack(self, data, offset, result=None, ntc=False): + raise NotImplementedError + + # override as appropriate in subclasses + @staticmethod + def _get_packer_with_options(f_type, options): + return types[f_type] + + def get_packer_with_options(self, f_type, options): + if options is not None: + try: + c = types[f_type].__class__ + return c._get_packer_with_options(f_type, options) + except IndexError: + raise VPPSerializerValueError( + "Options not supported for {}{} ({})".format( + f_type, types[f_type].__class__, options + ) + ) + + +class BaseTypes(Packer): def __init__(self, type, elements=0, options=None): - base_types = {'u8': '>B', - 'string': '>s', - 'u16': '>H', - 'u32': '>I', - 'i32': '>i', - 'u64': '>Q', - 'f64': '=d', - 'bool': '>?', - 'header': '>HI'} - - if elements > 0 and (type == 'u8' or type == 'string'): - self.packer = struct.Struct('>%ss' % elements) + self._type = type + self._elements = elements + base_types = { + "u8": ">B", + "i8": ">b", + "string": ">s", + "u16": ">H", + "i16": ">h", + "u32": ">I", + "i32": ">i", + "u64": ">Q", + "i64": ">q", + "f64": "=d", + "bool": ">?", + "header": ">HI", + } + + if elements > 0 and (type == "u8" or type == "string"): + self.packer = struct.Struct(">%ss" % elements) else: self.packer = struct.Struct(base_types[type]) self.size = self.packer.size self.options = options - def __call__(self, args): - self.options = args - return self - def pack(self, data, kwargs=None): - if not data: # Default to zero if not specified - if self.options and 'default' in self.options: - data = self.options['default'] + if data is None: # Default to zero if not specified + if self.options and "default" in self.options: + data = self.options["default"] else: data = 0 return self.packer.pack(data) @@ -100,39 +119,78 @@ class BaseTypes(object): def unpack(self, data, offset, result=None, ntc=False): return self.packer.unpack_from(data, offset)[0], self.packer.size + @staticmethod + def _get_packer_with_options(f_type, options): + return BaseTypes(f_type, options=options) + + def __repr__(self): + return "BaseTypes(type=%s, elements=%s, options=%s)" % ( + self._type, + self._elements, + self.options, + ) + -class String(object): - def __init__(self, options): - self.name = 'string' +class String(Packer): + def __init__(self, name, num, options): + self.name = name + self.num = num self.size = 1 - self.length_field_packer = BaseTypes('u32') - self.limit = options['limit'] if 'limit' in options else None + self.length_field_packer = BaseTypes("u32") + self.limit = options["limit"] if "limit" in options else num + self.fixed = True if num else False + if self.fixed and not self.limit: + raise VPPSerializerValueError( + "Invalid combination for: {}, {} fixed:{} limit:{}".format( + name, options, self.fixed, self.limit + ) + ) def pack(self, list, kwargs=None): if not list: + if self.fixed: + return b"\x00" * self.limit return self.length_field_packer.pack(0) + b"" - if self.limit and len(list) > self.limit: + if self.limit and len(list) > self.limit - 1: raise VPPSerializerValueError( - "Invalid argument length for: {}, {} maximum {}". - format(list, len(list), self.limit)) - - return self.length_field_packer.pack(len(list)) + list.encode('utf8') + "Invalid argument length for: {}, {} maximum {}".format( + list, len(list), self.limit - 1 + ) + ) + if self.fixed: + return list.encode("ascii").ljust(self.limit, b"\x00") + return self.length_field_packer.pack(len(list)) + list.encode("ascii") def unpack(self, data, offset=0, result=None, ntc=False): - length, length_field_size = self.length_field_packer.unpack(data, - offset) + if self.fixed: + p = BaseTypes("u8", self.num) + s = p.unpack(data, offset) + s2 = s[0].split(b"\0", 1)[0] + return (s2.decode("ascii"), self.num) + + length, length_field_size = self.length_field_packer.unpack(data, offset) if length == 0: - return b'', 0 - p = BaseTypes('u8', length) + return "", 0 + p = BaseTypes("u8", length) x, size = p.unpack(data, offset + length_field_size) - x2 = x.split(b'\0', 1)[0] - return (x2.decode('utf8'), size + length_field_size) + return (x.decode("ascii", errors="replace"), size + length_field_size) -types = {'u8': BaseTypes('u8'), 'u16': BaseTypes('u16'), - 'u32': BaseTypes('u32'), 'i32': BaseTypes('i32'), - 'u64': BaseTypes('u64'), 'f64': BaseTypes('f64'), - 'bool': BaseTypes('bool'), 'string': String} +types = { + "u8": BaseTypes("u8"), + "i8": BaseTypes("i8"), + "u16": BaseTypes("u16"), + "i16": BaseTypes("i16"), + "u32": BaseTypes("u32"), + "i32": BaseTypes("i32"), + "u64": BaseTypes("u64"), + "i64": BaseTypes("i64"), + "f64": BaseTypes("f64"), + "bool": BaseTypes("bool"), + "string": String, +} + +class_types = {} def vpp_get_type(name): @@ -146,7 +204,7 @@ class VPPSerializerValueError(ValueError): pass -class FixedList_u8(object): +class FixedList_u8(Packer): def __init__(self, name, field_type, num): self.name = name self.num = num @@ -154,38 +212,42 @@ class FixedList_u8(object): self.size = self.packer.size self.field_type = field_type - def __call__(self, args): - self.options = args - return self - def pack(self, data, kwargs=None): """Packs a fixed length bytestring. Left-pads with zeros if input data is too short.""" if not data: - return b'\x00' * self.size + return b"\x00" * self.size if len(data) > self.num: raise VPPSerializerValueError( 'Fixed list length error for "{}", got: {}' - ' expected: {}' - .format(self.name, len(data), self.num)) + " expected: {}".format(self.name, len(data), self.num) + ) - return self.packer.pack(data) + try: + return self.packer.pack(data) + except struct.error: + raise VPPSerializerValueError( + 'Packing failed for "{}" {}'.format(self.name, kwargs) + ) def unpack(self, data, offset=0, result=None, ntc=False): if len(data[offset:]) < self.num: raise VPPSerializerValueError( 'Invalid array length for "{}" got {}' - ' expected {}' - .format(self.name, len(data[offset:]), self.num)) - if self.field_type == 'string': - s = self.packer.unpack(data, offset) - s2 = s[0].split(b'\0', 1)[0] - return (s2.decode('utf-8'), self.num) + " expected {}".format(self.name, len(data[offset:]), self.num) + ) return self.packer.unpack(data, offset) + def __repr__(self): + return "FixedList_u8(name=%s, field_type=%s, num=%s)" % ( + self.name, + self.field_type, + self.num, + ) + -class FixedList(object): +class FixedList(Packer): def __init__(self, name, field_type, num): self.num = num self.packer = types[field_type] @@ -193,19 +255,17 @@ class FixedList(object): self.name = name self.field_type = field_type - def __call__(self, args): - self.options = args - return self - def pack(self, list, kwargs): if len(list) != self.num: raise VPPSerializerValueError( - 'Fixed list length error, got: {} expected: {}' - .format(len(list), self.num)) - b = bytes() + "Fixed list length error, got: {} expected: {}".format( + len(list), self.num + ) + ) + b = bytearray() for e in list: b += self.packer.pack(e) - return b + return bytes(b) def unpack(self, data, offset=0, result=None, ntc=False): # Return a list of arguments @@ -218,8 +278,15 @@ class FixedList(object): total += size return result, total + def __repr__(self): + return "FixedList(name=%s, field_type=%s, num=%s)" % ( + self.name, + self.field_type, + self.num, + ) + -class VLAList(object): +class VLAList(Packer): def __init__(self, name, field_type, len_field_name, index): self.name = name self.field_type = field_type @@ -228,37 +295,35 @@ class VLAList(object): self.size = self.packer.size self.length_field = len_field_name - def __call__(self, args): - self.options = args - return self - - def pack(self, list, kwargs=None): - if not list: + def pack(self, lst, kwargs=None): + if not lst: return b"" - if len(list) != kwargs[self.length_field]: + if len(lst) != kwargs[self.length_field]: raise VPPSerializerValueError( - 'Variable length error, got: {} expected: {}' - .format(len(list), kwargs[self.length_field])) - b = bytes() - + "Variable length error, got: {} expected: {}".format( + len(lst), kwargs[self.length_field] + ) + ) # u8 array + if self.packer.size == 1 and self.field_type == "u8": + if isinstance(lst, list): + return b"".join(lst) + return bytes(lst) - if self.packer.size == 1: - return bytearray(list) - - for e in list: + b = bytearray() + for e in lst: b += self.packer.pack(e) - return b + return bytes(b) def unpack(self, data, offset=0, result=None, ntc=False): # Return a list of arguments total = 0 # u8 array - if self.packer.size == 1: + if self.packer.size == 1 and self.field_type == "u8": if result[self.index] == 0: - return b'', 0 - p = BaseTypes('u8', result[self.index]) + return b"", 0 + p = BaseTypes("u8", result[self.index]) return p.unpack(data, offset, ntc=ntc) r = [] @@ -269,31 +334,38 @@ class VLAList(object): total += size return r, total + def __repr__(self): + return "VLAList(name=%s, field_type=%s, " "len_field_name=%s, index=%s)" % ( + self.name, + self.field_type, + self.length_field, + self.index, + ) -class VLAList_legacy(): + +class VLAList_legacy(Packer): def __init__(self, name, field_type): + self.name = name + self.field_type = field_type self.packer = types[field_type] self.size = self.packer.size - def __call__(self, args): - self.options = args - return self - def pack(self, list, kwargs=None): if self.packer.size == 1: return bytes(list) - b = bytes() + b = bytearray() for e in list: b += self.packer.pack(e) - return b + return bytes(b) def unpack(self, data, offset=0, result=None, ntc=False): total = 0 # Return a list of arguments if (len(data) - offset) % self.packer.size: raise VPPSerializerValueError( - 'Legacy Variable Length Array length mismatch.') + "Legacy Variable Length Array length mismatch." + ) elements = int((len(data) - offset) / self.packer.size) r = [] for e in range(elements): @@ -303,26 +375,32 @@ class VLAList_legacy(): total += size return r, total + def __repr__(self): + return "VLAList_legacy(name=%s, field_type=%s)" % (self.name, self.field_type) -class VPPEnumType(object): - def __init__(self, name, msgdef): - self.size = types['u32'].size - self.enumtype = 'u32' + +# Will change to IntEnum after 21.04 release +class VPPEnumType(Packer): + output_class = IntFlag + + def __init__(self, name, msgdef, options=None): + self.size = types["u32"].size + self.name = name + self.enumtype = "u32" + self.msgdef = msgdef e_hash = {} for f in msgdef: - if type(f) is dict and 'enumtype' in f: - if f['enumtype'] != 'u32': - self.size = types[f['enumtype']].size - self.enumtype = f['enumtype'] + if type(f) is dict and "enumtype" in f: + if f["enumtype"] != "u32": + self.size = types[f["enumtype"]].size + self.enumtype = f["enumtype"] continue ename, evalue = f e_hash[ename] = evalue - self.enum = IntFlag(name, e_hash) + self.enum = self.output_class(name, e_hash) types[name] = self - - def __call__(self, args): - self.options = args - return self + class_types[name] = self.__class__ + self.options = options def __getattr__(self, name): return self.enum[name] @@ -330,33 +408,55 @@ class VPPEnumType(object): def __bool__(self): return True - if sys.version[0] == '2': - __nonzero__ = __bool__ - def pack(self, data, kwargs=None): + if data is None: # Default to zero if not specified + if self.options and "default" in self.options: + data = self.options["default"] + else: + data = 0 + return types[self.enumtype].pack(data) def unpack(self, data, offset=0, result=None, ntc=False): x, size = types[self.enumtype].unpack(data, offset) return self.enum(x), size + @classmethod + def _get_packer_with_options(cls, f_type, options): + return cls(f_type, types[f_type].msgdef, options=options) + + def __repr__(self): + return "%s(name=%s, msgdef=%s, options=%s)" % ( + self.__class__.__name__, + self.name, + self.msgdef, + self.options, + ) + + +class VPPEnumFlagType(VPPEnumType): + output_class = IntFlag + + def __init__(self, name, msgdef, options=None): + super(VPPEnumFlagType, self).__init__(name, msgdef, options) -class VPPUnionType(object): + +class VPPUnionType(Packer): def __init__(self, name, msgdef): self.name = name + self.msgdef = msgdef self.size = 0 self.maxindex = 0 fields = [] self.packers = collections.OrderedDict() for i, f in enumerate(msgdef): - if type(f) is dict and 'crc' in f: - self.crc = f['crc'] + if type(f) is dict and "crc" in f: + self.crc = f["crc"] continue f_type, f_name = f if f_type not in types: - logger.debug('Unknown union type {}'.format(f_type)) - raise VPPSerializerValueError( - 'Unknown message type {}'.format(f_type)) + logger.debug("Unknown union type {}".format(f_type)) + raise VPPSerializerValueError("Unknown message type {}".format(f_type)) fields.append(f_name) size = types[f_type].size self.packers[f_name] = types[f_type] @@ -367,21 +467,17 @@ class VPPUnionType(object): types[name] = self self.tuple = collections.namedtuple(name, fields, rename=True) - def __call__(self, args): - self.options = args - return self - # Union of variable length? def pack(self, data, kwargs=None): if not data: - return b'\x00' * self.size + return b"\x00" * self.size for k, v in data.items(): logger.debug("Key: {} Value: {}".format(k, v)) b = self.packers[k].pack(v, kwargs) break r = bytearray(self.size) - r[:len(b)] = b + r[: len(b)] = b return r def unpack(self, data, offset=0, result=None, ntc=False): @@ -394,50 +490,72 @@ class VPPUnionType(object): r.append(x) return self.tuple._make(r), maxsize + def __repr__(self): + return "VPPUnionType(name=%s, msgdef=%r)" % (self.name, self.msgdef) -class VPPTypeAlias(object): - def __init__(self, name, msgdef): + +class VPPTypeAlias(Packer): + def __init__(self, name, msgdef, options=None): self.name = name - t = vpp_get_type(msgdef['type']) + self.msgdef = msgdef + t = vpp_get_type(msgdef["type"]) if not t: - raise ValueError() - if 'length' in msgdef: - if msgdef['length'] == 0: + raise ValueError("No such type: {}".format(msgdef["type"])) + if "length" in msgdef: + if msgdef["length"] == 0: raise ValueError() - if msgdef['type'] == 'u8': - self.packer = FixedList_u8(name, msgdef['type'], - msgdef['length']) + if msgdef["type"] == "u8": + self.packer = FixedList_u8(name, msgdef["type"], msgdef["length"]) self.size = self.packer.size else: - self.packer = FixedList(name, msgdef['type'], msgdef['length']) + self.packer = FixedList(name, msgdef["type"], msgdef["length"]) else: self.packer = t self.size = t.size types[name] = self - - def __call__(self, args): - self.options = args - return self + self.toplevelconversion = False + self.options = options def pack(self, data, kwargs=None): if data and conversion_required(data, self.name): try: return conversion_packer(data, self.name) # Python 2 and 3 raises different exceptions from inet_pton - except(OSError, socket.error, TypeError): + except (OSError, socket.error, TypeError): pass + if data is None: # Default to zero if not specified + if self.options and "default" in self.options: + data = self.options["default"] + else: + data = 0 return self.packer.pack(data, kwargs) + @staticmethod + def _get_packer_with_options(f_type, options): + return VPPTypeAlias(f_type, types[f_type].msgdef, options=options) + def unpack(self, data, offset=0, result=None, ntc=False): + if ntc is False and self.name in vpp_format.conversion_unpacker_table: + # Disable type conversion for dependent types + ntc = True + self.toplevelconversion = True t, size = self.packer.unpack(data, offset, result, ntc=ntc) - if not ntc: + if self.toplevelconversion: + self.toplevelconversion = False return conversion_unpacker(t, self.name), size return t, size + def __repr__(self): + return "VPPTypeAlias(name=%s, msgdef=%s, options=%s)" % ( + self.name, + self.msgdef, + self.options, + ) -class VPPType(object): + +class VPPType(Packer): # Set everything up to be able to pack / unpack def __init__(self, name, msgdef): self.name = name @@ -448,17 +566,16 @@ class VPPType(object): self.field_by_name = {} size = 0 for i, f in enumerate(msgdef): - if type(f) is dict and 'crc' in f: - self.crc = f['crc'] + if type(f) is dict and "crc" in f: + self.crc = f["crc"] continue f_type, f_name = f[:2] self.fields.append(f_name) self.field_by_name[f_name] = None self.fieldtypes.append(f_type) if f_type not in types: - logger.debug('Unknown type {}'.format(f_type)) - raise VPPSerializerValueError( - 'Unknown message type {}'.format(f_type)) + logger.debug("Unknown type {}".format(f_type)) + raise VPPSerializerValueError("Unknown message type {}".format(f_type)) fieldlen = len(f) options = [x for x in f if type(x) is dict] @@ -470,12 +587,19 @@ class VPPType(object): if fieldlen == 3: # list list_elements = f[2] if list_elements == 0: - p = VLAList_legacy(f_name, f_type) + if f_type == "string": + p = String(f_name, 0, self.options) + else: + p = VLAList_legacy(f_name, f_type) self.packers.append(p) - elif f_type == 'u8' or f_type == 'string': + elif f_type == "u8": p = FixedList_u8(f_name, f_type, list_elements) self.packers.append(p) size += p.size + elif f_type == "string": + p = String(f_name, list_elements, self.options) + self.packers.append(p) + size += p.size else: p = FixedList(f_name, f_type, list_elements) self.packers.append(p) @@ -485,22 +609,23 @@ class VPPType(object): p = VLAList(f_name, f_type, f[3], length_index) self.packers.append(p) else: - p = types[f_type](self.options) + # default support for types that decay to basetype + if "default" in self.options: + p = self.get_packer_with_options(f_type, self.options) + else: + p = types[f_type] + self.packers.append(p) size += p.size - self.size = size self.tuple = collections.namedtuple(name, self.fields, rename=True) types[name] = self - - def __call__(self, args): - self.options = args - return self + self.toplevelconversion = False def pack(self, data, kwargs=None): if not kwargs: kwargs = data - b = bytes() + b = bytearray() # Try one of the format functions if data and conversion_required(data, self.name): @@ -509,8 +634,8 @@ class VPPType(object): for i, a in enumerate(self.fields): if data and type(data) is not dict and a not in data: raise VPPSerializerValueError( - "Invalid argument: {} expected {}.{}". - format(data, self.name, a)) + "Invalid argument: {} expected {}.{}".format(data, self.name, a) + ) # Defaulting to zero. if not data or a not in data: # Default to 0 @@ -524,12 +649,17 @@ class VPPType(object): else: b += self.packers[i].pack(arg, kwargs) - return b + return bytes(b) def unpack(self, data, offset=0, result=None, ntc=False): # Return a list of arguments result = [] total = 0 + if ntc is False and self.name in vpp_format.conversion_unpacker_table: + # Disable type conversion for dependent types + ntc = True + self.toplevelconversion = True + for p in self.packers: x, size = p.unpack(data, offset, result, ntc) if type(x) is tuple and len(x) == 1: @@ -538,10 +668,19 @@ class VPPType(object): offset += size total += size t = self.tuple._make(result) - if not ntc: + + if self.toplevelconversion: + self.toplevelconversion = False t = conversion_unpacker(t, self.name) return t, total + def __repr__(self): + return "%s(name=%s, msgdef=%s)" % ( + self.__class__.__name__, + self.name, + self.msgdef, + ) + class VPPMessage(VPPType): pass