X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=src%2Fvpp-api%2Fpython%2Fvpp_papi%2Fvpp_serializer.py;h=644aeac65c63a7747e9382b760b7f54c2a08b415;hb=3825d93af;hp=146a8f6919a1015a4ce6cec7dd5161475595b946;hpb=a5ee900fb75201bbfceaf13c8bc57a13ed094988;p=vpp.git diff --git a/src/vpp-api/python/vpp_papi/vpp_serializer.py b/src/vpp-api/python/vpp_papi/vpp_serializer.py index 146a8f6919a..644aeac65c6 100644 --- a/src/vpp-api/python/vpp_papi/vpp_serializer.py +++ b/src/vpp-api/python/vpp_papi/vpp_serializer.py @@ -12,210 +12,413 @@ # See the License for the specific language governing permissions and # limitations under the License. # - -import struct import collections -from enum import IntEnum +from enum import IntFlag import logging +import socket +import struct +import sys + +from . import vpp_format + # # Set log-level in application by doing e.g.: # logger = logging.getLogger('vpp_serializer') # logger.setLevel(logging.DEBUG) # -logger = logging.getLogger(__name__) -FORMAT = "[%(filename)s:%(lineno)s - %(funcName)s() ] %(message)s" -logging.basicConfig(format=FORMAT) +logger = logging.getLogger('vpp_papi.serializer') + + +def check(d): + return type(d) is dict or type(d) is bytes + + +def conversion_required(data, field_type): + if check(data): + return False + try: + if type(data).__name__ in vpp_format.conversion_table[field_type]: + return True + except KeyError: + return False + + +def conversion_packer(data, field_type): + t = type(data).__name__ + return types[field_type].pack(vpp_format. + conversion_table[field_type][t](data)) + +def conversion_unpacker(data, field_type): + if field_type not in vpp_format.conversion_unpacker_table: + return data + return vpp_format.conversion_unpacker_table[field_type](data) -class BaseTypes(): - def __init__(self, type, elements=0): + +class Packer: + options = {} + + def pack(self, data, kwargs): + raise NotImplementedError + + def unpack(self, data, offset, result=None, ntc=False): + raise NotImplementedError + + # override as appropriate in subclasses + @staticmethod + def _get_packer_with_options(f_type, options): + return types[f_type] + + def get_packer_with_options(self, f_type, options): + if options is not None: + try: + c = types[f_type].__class__ + return c._get_packer_with_options(f_type, options) + except IndexError: + raise VPPSerializerValueError( + "Options not supported for {}{} ({})". + format(f_type, types[f_type].__class__, + options)) + + +class BaseTypes(Packer): + def __init__(self, type, elements=0, options=None): + self._type = type + self._elements = elements base_types = {'u8': '>B', + 'i8': '>b', + 'string': '>s', 'u16': '>H', + 'i16': '>h', 'u32': '>I', 'i32': '>i', 'u64': '>Q', - 'f64': '>d', + 'i64': '>q', + 'f64': '=d', + 'bool': '>?', 'header': '>HI'} - if elements > 0 and type == 'u8': + if elements > 0 and (type == 'u8' or type == 'string'): self.packer = struct.Struct('>%ss' % elements) else: self.packer = struct.Struct(base_types[type]) self.size = self.packer.size - logger.debug('Adding {} with format: {}' - .format(type, base_types[type])) + self.options = options def pack(self, data, kwargs=None): - logger.debug("Data: {} Format: {}".format(data, self.packer.format)) + if data is None: # Default to zero if not specified + if self.options and 'default' in self.options: + data = self.options['default'] + else: + data = 0 return self.packer.pack(data) - def unpack(self, data, offset, result=None): - logger.debug("@ {} Format: {}".format(offset, self.packer.format)) - return self.packer.unpack_from(data, offset)[0] + def unpack(self, data, offset, result=None, ntc=False): + return self.packer.unpack_from(data, offset)[0], self.packer.size + + @staticmethod + def _get_packer_with_options(f_type, options): + return BaseTypes(f_type, options=options) + def __repr__(self): + return "BaseTypes(type=%s, elements=%s, options=%s)" % (self._type, + self._elements, + self.options) -types = {} -types['u8'] = BaseTypes('u8') -types['u16'] = BaseTypes('u16') -types['u32'] = BaseTypes('u32') -types['i32'] = BaseTypes('i32') -types['u64'] = BaseTypes('u64') -types['f64'] = BaseTypes('f64') +class String(Packer): + def __init__(self, name, num, options): + self.name = name + self.num = num + self.size = 1 + self.length_field_packer = BaseTypes('u32') + self.limit = options['limit'] if 'limit' in options else num + self.fixed = True if num else False + if self.fixed and not self.limit: + raise VPPSerializerValueError( + "Invalid combination for: {}, {} fixed:{} limit:{}". + format(name, options, self.fixed, self.limit)) -class FixedList_u8(): + def pack(self, list, kwargs=None): + if not list: + if self.fixed: + return b"\x00" * self.limit + return self.length_field_packer.pack(0) + b"" + if self.limit and len(list) > self.limit - 1: + raise VPPSerializerValueError( + "Invalid argument length for: {}, {} maximum {}". + format(list, len(list), self.limit - 1)) + if self.fixed: + return list.encode('ascii').ljust(self.limit, b'\x00') + return self.length_field_packer.pack(len(list)) + list.encode('ascii') + + def unpack(self, data, offset=0, result=None, ntc=False): + if self.fixed: + p = BaseTypes('u8', self.num) + s = p.unpack(data, offset) + s2 = s[0].split(b'\0', 1)[0] + return (s2.decode('ascii'), self.num) + + length, length_field_size = self.length_field_packer.unpack(data, + offset) + if length == 0: + return '', 0 + p = BaseTypes('u8', length) + x, size = p.unpack(data, offset + length_field_size) + return (x.decode('ascii', errors='replace'), size + length_field_size) + + +types = {'u8': BaseTypes('u8'), 'i8': BaseTypes('i8'), + 'u16': BaseTypes('u16'), 'i16': BaseTypes('i16'), + 'u32': BaseTypes('u32'), 'i32': BaseTypes('i32'), + 'u64': BaseTypes('u64'), 'i64': BaseTypes('i64'), + 'f64': BaseTypes('f64'), + 'bool': BaseTypes('bool'), 'string': String} + +class_types = {} + + +def vpp_get_type(name): + try: + return types[name] + except KeyError: + return None + + +class VPPSerializerValueError(ValueError): + pass + + +class FixedList_u8(Packer): def __init__(self, name, field_type, num): self.name = name self.num = num self.packer = BaseTypes(field_type, num) self.size = self.packer.size + self.field_type = field_type - def pack(self, list, kwargs): - logger.debug("Data: {}".format(list)) - - if len(list) > self.num: - raise ValueError('Fixed list length error for "{}", got: {}' - ' expected: {}' - .format(self.name, len(list), self.num)) - return self.packer.pack(list) - - def unpack(self, data, offset=0, result=None): + def pack(self, data, kwargs=None): + """Packs a fixed length bytestring. Left-pads with zeros + if input data is too short.""" + if not data: + return b'\x00' * self.size + + if len(data) > self.num: + raise VPPSerializerValueError( + 'Fixed list length error for "{}", got: {}' + ' expected: {}' + .format(self.name, len(data), self.num)) + + try: + return self.packer.pack(data) + except struct.error: + raise VPPSerializerValueError( + 'Packing failed for "{}" {}' + .format(self.name, kwargs)) + + def unpack(self, data, offset=0, result=None, ntc=False): if len(data[offset:]) < self.num: - raise ValueError('Invalid array length for "{}" got {}' - ' expected {}' - .format(self.name, len(data), self.num)) + raise VPPSerializerValueError( + 'Invalid array length for "{}" got {}' + ' expected {}' + .format(self.name, len(data[offset:]), self.num)) return self.packer.unpack(data, offset) + def __repr__(self): + return "FixedList_u8(name=%s, field_type=%s, num=%s)" % ( + self.name, self.field_type, self.num + ) + -class FixedList(): +class FixedList(Packer): def __init__(self, name, field_type, num): self.num = num self.packer = types[field_type] self.size = self.packer.size * num + self.name = name + self.field_type = field_type def pack(self, list, kwargs): - logger.debug("Data: {}".format(list)) - if len(list) != self.num: - raise ValueError('Fixed list length error, got: {} expected: {}' - .format(len(list), self.num)) + raise VPPSerializerValueError( + 'Fixed list length error, got: {} expected: {}' + .format(len(list), self.num)) b = bytes() for e in list: b += self.packer.pack(e) return b - def unpack(self, data, offset=0, result=None): + def unpack(self, data, offset=0, result=None, ntc=False): # Return a list of arguments result = [] + total = 0 for e in range(self.num): - x = self.packer.unpack(data, offset) + x, size = self.packer.unpack(data, offset, ntc=ntc) result.append(x) - offset += self.packer.size - return result + offset += size + total += size + return result, total + + def __repr__(self): + return "FixedList(name=%s, field_type=%s, num=%s)" % ( + self.name, self.field_type, self.num) -class VLAList(): +class VLAList(Packer): def __init__(self, name, field_type, len_field_name, index): + self.name = name + self.field_type = field_type self.index = index self.packer = types[field_type] self.size = self.packer.size self.length_field = len_field_name - def pack(self, list, kwargs=None): - logger.debug("Data: {}".format(list)) - if len(list) != kwargs[self.length_field]: - raise ValueError('Variable length error, got: {} expected: {}' - .format(len(list), kwargs[self.length_field])) - b = bytes() + def pack(self, lst, kwargs=None): + if not lst: + return b"" + if len(lst) != kwargs[self.length_field]: + raise VPPSerializerValueError( + 'Variable length error, got: {} expected: {}' + .format(len(lst), kwargs[self.length_field])) # u8 array if self.packer.size == 1: - p = BaseTypes('u8', len(list)) - return p.pack(list) + if isinstance(lst, list): + return b''.join(lst) + return bytes(lst) - for e in list: + b = bytes() + for e in lst: b += self.packer.pack(e) return b - def unpack(self, data, offset=0, result=None): - logger.debug("Data: {} @ {} Result: {}" - .format(list, offset, result[self.index])) + def unpack(self, data, offset=0, result=None, ntc=False): # Return a list of arguments + total = 0 # u8 array if self.packer.size == 1: if result[self.index] == 0: - return b'' + return b'', 0 p = BaseTypes('u8', result[self.index]) - r = p.unpack(data, offset) - return r + return p.unpack(data, offset, ntc=ntc) r = [] for e in range(result[self.index]): - x = self.packer.unpack(data, offset) + x, size = self.packer.unpack(data, offset, ntc=ntc) r.append(x) - offset += self.packer.size - return r + offset += size + total += size + return r, total + + def __repr__(self): + return "VLAList(name=%s, field_type=%s, " \ + "len_field_name=%s, index=%s)" % ( + self.name, self.field_type, self.length_field, self.index + ) -class VLAList_legacy(): +class VLAList_legacy(Packer): def __init__(self, name, field_type): + self.name = name + self.field_type = field_type self.packer = types[field_type] self.size = self.packer.size def pack(self, list, kwargs=None): - logger.debug("Data: {}".format(list)) + if self.packer.size == 1: + return bytes(list) + b = bytes() for e in list: b += self.packer.pack(e) return b - def unpack(self, data, offset=0, result=None): + def unpack(self, data, offset=0, result=None, ntc=False): + total = 0 # Return a list of arguments if (len(data) - offset) % self.packer.size: - raise ValueError('Legacy Variable Length Array length mismatch.') + raise VPPSerializerValueError( + 'Legacy Variable Length Array length mismatch.') elements = int((len(data) - offset) / self.packer.size) r = [] - logger.debug("Legacy VLA: {} elements of size {}" - .format(elements, self.packer.size)) for e in range(elements): - x = self.packer.unpack(data, offset) + x, size = self.packer.unpack(data, offset, ntc=ntc) r.append(x) offset += self.packer.size - return r + total += size + return r, total + def __repr__(self): + return "VLAList_legacy(name=%s, field_type=%s)" % ( + self.name, self.field_type + ) -class VPPEnumType(): - def __init__(self, name, msgdef): + +# Will change to IntEnum after 21.04 release +class VPPEnumType(Packer): + output_class = IntFlag + + def __init__(self, name, msgdef, options=None): self.size = types['u32'].size + self.name = name + self.enumtype = 'u32' + self.msgdef = msgdef e_hash = {} for f in msgdef: if type(f) is dict and 'enumtype' in f: if f['enumtype'] != 'u32': - raise NotImplementedError + self.size = types[f['enumtype']].size + self.enumtype = f['enumtype'] continue ename, evalue = f e_hash[ename] = evalue - self.enum = IntEnum(name, e_hash) + self.enum = self.output_class(name, e_hash) types[name] = self - logger.debug('Adding enum {}'.format(name)) + class_types[name] = self.__class__ + self.options = options def __getattr__(self, name): return self.enum[name] + def __bool__(self): + return True + def pack(self, data, kwargs=None): - logger.debug("Data: {}".format(data)) - return types['u32'].pack(data, kwargs) + if data is None: # Default to zero if not specified + if self.options and 'default' in self.options: + data = self.options['default'] + else: + data = 0 + + return types[self.enumtype].pack(data) - def unpack(self, data, offset=0, result=None): - x = types['u32'].unpack(data, offset) - return self.enum(x) + def unpack(self, data, offset=0, result=None, ntc=False): + x, size = types[self.enumtype].unpack(data, offset) + return self.enum(x), size + @classmethod + def _get_packer_with_options(cls, f_type, options): + return cls(f_type, types[f_type].msgdef, options=options) -class VPPUnionType(): + def __repr__(self): + return "%s(name=%s, msgdef=%s, options=%s)" % ( + self.__class__.__name__, self.name, self.msgdef, self.options + ) + + +class VPPEnumFlagType(VPPEnumType): + output_class = IntFlag + + def __init__(self, name, msgdef, options=None): + super(VPPEnumFlagType, self).__init__(name, msgdef, options) + + +class VPPUnionType(Packer): def __init__(self, name, msgdef): self.name = name + self.msgdef = msgdef self.size = 0 self.maxindex = 0 fields = [] @@ -227,7 +430,8 @@ class VPPUnionType(): f_type, f_name = f if f_type not in types: logger.debug('Unknown union type {}'.format(f_type)) - raise ValueError('Unknown message type {}'.format(f_type)) + raise VPPSerializerValueError( + 'Unknown message type {}'.format(f_type)) fields.append(f_name) size = types[f_type].size self.packers[f_name] = types[f_type] @@ -237,28 +441,94 @@ class VPPUnionType(): types[name] = self self.tuple = collections.namedtuple(name, fields, rename=True) - logger.debug('Adding union {}'.format(name)) + # Union of variable length? def pack(self, data, kwargs=None): - logger.debug("Data: {}".format(data)) + if not data: + return b'\x00' * self.size + for k, v in data.items(): logger.debug("Key: {} Value: {}".format(k, v)) b = self.packers[k].pack(v, kwargs) - offset = self.size - self.packers[k].size break r = bytearray(self.size) - r[offset:] = b + r[:len(b)] = b return r - def unpack(self, data, offset=0, result=None): + def unpack(self, data, offset=0, result=None, ntc=False): r = [] + maxsize = 0 for k, p in self.packers.items(): - union_offset = self.size - p.size - r.append(p.unpack(data, offset + union_offset)) - return self.tuple._make(r) + x, size = p.unpack(data, offset, ntc=ntc) + if size > maxsize: + maxsize = size + r.append(x) + return self.tuple._make(r), maxsize + + def __repr__(self): + return"VPPUnionType(name=%s, msgdef=%r)" % (self.name, self.msgdef) + + +class VPPTypeAlias(Packer): + def __init__(self, name, msgdef, options=None): + self.name = name + self.msgdef = msgdef + t = vpp_get_type(msgdef['type']) + if not t: + raise ValueError('No such type: {}'.format(msgdef['type'])) + if 'length' in msgdef: + if msgdef['length'] == 0: + raise ValueError() + if msgdef['type'] == 'u8': + self.packer = FixedList_u8(name, msgdef['type'], + msgdef['length']) + self.size = self.packer.size + else: + self.packer = FixedList(name, msgdef['type'], msgdef['length']) + else: + self.packer = t + self.size = t.size + types[name] = self + self.toplevelconversion = False + self.options = options -class VPPType(): + def pack(self, data, kwargs=None): + if data and conversion_required(data, self.name): + try: + return conversion_packer(data, self.name) + # Python 2 and 3 raises different exceptions from inet_pton + except(OSError, socket.error, TypeError): + pass + if data is None: # Default to zero if not specified + if self.options and 'default' in self.options: + data = self.options['default'] + else: + data = 0 + + return self.packer.pack(data, kwargs) + + @staticmethod + def _get_packer_with_options(f_type, options): + return VPPTypeAlias(f_type, types[f_type].msgdef, options=options) + + def unpack(self, data, offset=0, result=None, ntc=False): + if ntc is False and self.name in vpp_format.conversion_unpacker_table: + # Disable type conversion for dependent types + ntc = True + self.toplevelconversion = True + t, size = self.packer.unpack(data, offset, result, ntc=ntc) + if self.toplevelconversion: + self.toplevelconversion = False + return conversion_unpacker(t, self.name), size + return t, size + + def __repr__(self): + return "VPPTypeAlias(name=%s, msgdef=%s, options=%s)" % ( + self.name, self.msgdef, self.options) + + +class VPPType(Packer): # Set everything up to be able to pack / unpack def __init__(self, name, msgdef): self.name = name @@ -278,55 +548,112 @@ class VPPType(): self.fieldtypes.append(f_type) if f_type not in types: logger.debug('Unknown type {}'.format(f_type)) - raise ValueError('Unknown message type {}'.format(f_type)) - if len(f) == 3: # list + raise VPPSerializerValueError( + 'Unknown message type {}'.format(f_type)) + + fieldlen = len(f) + options = [x for x in f if type(x) is dict] + if len(options): + self.options = options[0] + fieldlen -= 1 + else: + self.options = {} + if fieldlen == 3: # list list_elements = f[2] if list_elements == 0: - p = VLAList_legacy(f_name, f_type) + if f_type == 'string': + p = String(f_name, 0, self.options) + else: + p = VLAList_legacy(f_name, f_type) self.packers.append(p) elif f_type == 'u8': p = FixedList_u8(f_name, f_type, list_elements) self.packers.append(p) size += p.size + elif f_type == 'string': + p = String(f_name, list_elements, self.options) + self.packers.append(p) + size += p.size else: p = FixedList(f_name, f_type, list_elements) self.packers.append(p) size += p.size - elif len(f) == 4: # Variable length list - # Find index of length field - length_index = self.fields.index(f[3]) - p = VLAList(f_name, f_type, f[3], length_index) - self.packers.append(p) + elif fieldlen == 4: # Variable length list + length_index = self.fields.index(f[3]) + p = VLAList(f_name, f_type, f[3], length_index) + self.packers.append(p) else: - self.packers.append(types[f_type]) - size += types[f_type].size + # default support for types that decay to basetype + if 'default' in self.options: + p = self.get_packer_with_options(f_type, self.options) + else: + p = types[f_type] + + self.packers.append(p) + size += p.size self.size = size self.tuple = collections.namedtuple(name, self.fields, rename=True) types[name] = self - logger.debug('Adding type {}'.format(name)) + self.toplevelconversion = False def pack(self, data, kwargs=None): if not kwargs: kwargs = data - logger.debug("Data: {}".format(data)) b = bytes() + + # Try one of the format functions + if data and conversion_required(data, self.name): + return conversion_packer(data, self.name) + for i, a in enumerate(self.fields): - if a not in data: - logger.debug("Argument {} not given, defaulting to 0" - .format(a)) - b += b'\x00' * self.packers[i].size - continue - b += self.packers[i].pack(data[a], kwargs) + if data and type(data) is not dict and a not in data: + raise VPPSerializerValueError( + "Invalid argument: {} expected {}.{}". + format(data, self.name, a)) + + # Defaulting to zero. + if not data or a not in data: # Default to 0 + arg = None + kwarg = None # No default for VLA + else: + arg = data[a] + kwarg = kwargs[a] if a in kwargs else None + if isinstance(self.packers[i], VPPType): + b += self.packers[i].pack(arg, kwarg) + else: + b += self.packers[i].pack(arg, kwargs) + return b - def unpack(self, data, offset=0, result=None): + def unpack(self, data, offset=0, result=None, ntc=False): # Return a list of arguments result = [] + total = 0 + if ntc is False and self.name in vpp_format.conversion_unpacker_table: + # Disable type conversion for dependent types + ntc = True + self.toplevelconversion = True + for p in self.packers: - x = p.unpack(data, offset, result) + x, size = p.unpack(data, offset, result, ntc) if type(x) is tuple and len(x) == 1: x = x[0] result.append(x) - offset += p.size - return self.tuple._make(result) + offset += size + total += size + t = self.tuple._make(result) + + if self.toplevelconversion: + self.toplevelconversion = False + t = conversion_unpacker(t, self.name) + return t, total + + def __repr__(self): + return "%s(name=%s, msgdef=%s)" % ( + self.__class__.__name__, self.name, self.msgdef + ) + + +class VPPMessage(VPPType): + pass