# See the License for the specific language governing permissions and
# limitations under the License.
#
-
-import struct
import collections
-from enum import IntEnum
+from enum import IntFlag
import logging
-from . import vpp_format
-import ipaddress
-import sys
import socket
+import struct
+import sys
+
+from . import vpp_format
+
#
# Set log-level in application by doing e.g.:
# logger = logging.getLogger('vpp_serializer')
# logger.setLevel(logging.DEBUG)
#
-logger = logging.getLogger(__name__)
+logger = logging.getLogger("vpp_papi.serializer")
+
+
+def check(d):
+ return type(d) is dict or type(d) is bytes
-if sys.version[0] == '2':
- check = lambda d: type(d) is dict
-else:
- check = lambda d: type(d) is dict or type(d) is bytes
def conversion_required(data, field_type):
if check(data):
def conversion_packer(data, field_type):
t = type(data).__name__
- return types[field_type].pack(vpp_format.
- conversion_table[field_type][t](data))
+ return types[field_type].pack(vpp_format.conversion_table[field_type][t](data))
def conversion_unpacker(data, field_type):
return vpp_format.conversion_unpacker_table[field_type](data)
-class BaseTypes(object):
- def __init__(self, type, elements=0):
- base_types = {'u8': '>B',
- 'string': '>s',
- 'u16': '>H',
- 'u32': '>I',
- 'i32': '>i',
- 'u64': '>Q',
- 'f64': '>d',
- 'bool': '>?',
- 'header': '>HI'}
-
- if elements > 0 and (type == 'u8' or type == 'string'):
- self.packer = struct.Struct('>%ss' % elements)
+class Packer:
+ options = {}
+
+ def pack(self, data, kwargs):
+ raise NotImplementedError
+
+ def unpack(self, data, offset, result=None, ntc=False):
+ raise NotImplementedError
+
+ # override as appropriate in subclasses
+ @staticmethod
+ def _get_packer_with_options(f_type, options):
+ return types[f_type]
+
+ def get_packer_with_options(self, f_type, options):
+ if options is not None:
+ try:
+ c = types[f_type].__class__
+ return c._get_packer_with_options(f_type, options)
+ except IndexError:
+ raise VPPSerializerValueError(
+ "Options not supported for {}{} ({})".format(
+ f_type, types[f_type].__class__, options
+ )
+ )
+
+
+class BaseTypes(Packer):
+ def __init__(self, type, elements=0, options=None):
+ self._type = type
+ self._elements = elements
+ base_types = {
+ "u8": ">B",
+ "i8": ">b",
+ "string": ">s",
+ "u16": ">H",
+ "i16": ">h",
+ "u32": ">I",
+ "i32": ">i",
+ "u64": ">Q",
+ "i64": ">q",
+ "f64": "=d",
+ "bool": ">?",
+ "header": ">HI",
+ }
+
+ if elements > 0 and (type == "u8" or type == "string"):
+ self.packer = struct.Struct(">%ss" % elements)
else:
self.packer = struct.Struct(base_types[type])
self.size = self.packer.size
+ self.options = options
def pack(self, data, kwargs=None):
- if not data: # Default to zero if not specified
- data = 0
+ if data is None: # Default to zero if not specified
+ if self.options and "default" in self.options:
+ data = self.options["default"]
+ else:
+ data = 0
return self.packer.pack(data)
def unpack(self, data, offset, result=None, ntc=False):
return self.packer.unpack_from(data, offset)[0], self.packer.size
+ @staticmethod
+ def _get_packer_with_options(f_type, options):
+ return BaseTypes(f_type, options=options)
+
+ def __repr__(self):
+ return "BaseTypes(type=%s, elements=%s, options=%s)" % (
+ self._type,
+ self._elements,
+ self.options,
+ )
+
-class String(object):
- def __init__(self):
- self.name = 'string'
+class String(Packer):
+ def __init__(self, name, num, options):
+ self.name = name
+ self.num = num
self.size = 1
- self.length_field_packer = BaseTypes('u32')
+ self.length_field_packer = BaseTypes("u32")
+ self.limit = options["limit"] if "limit" in options else num
+ self.fixed = True if num else False
+ if self.fixed and not self.limit:
+ raise VPPSerializerValueError(
+ "Invalid combination for: {}, {} fixed:{} limit:{}".format(
+ name, options, self.fixed, self.limit
+ )
+ )
def pack(self, list, kwargs=None):
if not list:
+ if self.fixed:
+ return b"\x00" * self.limit
return self.length_field_packer.pack(0) + b""
- return self.length_field_packer.pack(len(list)) + list.encode('utf8')
+ if self.limit and len(list) > self.limit - 1:
+ raise VPPSerializerValueError(
+ "Invalid argument length for: {}, {} maximum {}".format(
+ list, len(list), self.limit - 1
+ )
+ )
+ if self.fixed:
+ return list.encode("ascii").ljust(self.limit, b"\x00")
+ return self.length_field_packer.pack(len(list)) + list.encode("ascii")
def unpack(self, data, offset=0, result=None, ntc=False):
- length, length_field_size = self.length_field_packer.unpack(data,
- offset)
+ if self.fixed:
+ p = BaseTypes("u8", self.num)
+ s = p.unpack(data, offset)
+ s2 = s[0].split(b"\0", 1)[0]
+ return (s2.decode("ascii"), self.num)
+
+ length, length_field_size = self.length_field_packer.unpack(data, offset)
if length == 0:
- return b'', 0
- p = BaseTypes('u8', length)
+ return "", 0
+ p = BaseTypes("u8", length)
x, size = p.unpack(data, offset + length_field_size)
- x2 = x.split(b'\0',1)[0]
- return (x2.decode('utf8'), size + length_field_size)
+ return (x.decode("ascii", errors="replace"), size + length_field_size)
+
+types = {
+ "u8": BaseTypes("u8"),
+ "i8": BaseTypes("i8"),
+ "u16": BaseTypes("u16"),
+ "i16": BaseTypes("i16"),
+ "u32": BaseTypes("u32"),
+ "i32": BaseTypes("i32"),
+ "u64": BaseTypes("u64"),
+ "i64": BaseTypes("i64"),
+ "f64": BaseTypes("f64"),
+ "bool": BaseTypes("bool"),
+ "string": String,
+}
-types = {'u8': BaseTypes('u8'), 'u16': BaseTypes('u16'),
- 'u32': BaseTypes('u32'), 'i32': BaseTypes('i32'),
- 'u64': BaseTypes('u64'), 'f64': BaseTypes('f64'),
- 'bool': BaseTypes('bool'), 'string': String()}
+class_types = {}
def vpp_get_type(name):
pass
-class FixedList_u8(object):
+class FixedList_u8(Packer):
def __init__(self, name, field_type, num):
self.name = name
self.num = num
"""Packs a fixed length bytestring. Left-pads with zeros
if input data is too short."""
if not data:
- return b'\x00' * self.size
+ return b"\x00" * self.size
if len(data) > self.num:
raise VPPSerializerValueError(
'Fixed list length error for "{}", got: {}'
- ' expected: {}'
- .format(self.name, len(data), self.num))
+ " expected: {}".format(self.name, len(data), self.num)
+ )
- return self.packer.pack(data)
+ try:
+ return self.packer.pack(data)
+ except struct.error:
+ raise VPPSerializerValueError(
+ 'Packing failed for "{}" {}'.format(self.name, kwargs)
+ )
def unpack(self, data, offset=0, result=None, ntc=False):
if len(data[offset:]) < self.num:
raise VPPSerializerValueError(
'Invalid array length for "{}" got {}'
- ' expected {}'
- .format(self.name, len(data[offset:]), self.num))
- if self.field_type == 'string':
- s = self.packer.unpack(data, offset)
- s2 = s[0].split(b'\0', 1)[0]
- return (s2.decode('utf-8'), self.num)
+ " expected {}".format(self.name, len(data[offset:]), self.num)
+ )
return self.packer.unpack(data, offset)
+ def __repr__(self):
+ return "FixedList_u8(name=%s, field_type=%s, num=%s)" % (
+ self.name,
+ self.field_type,
+ self.num,
+ )
-class FixedList(object):
+
+class FixedList(Packer):
def __init__(self, name, field_type, num):
self.num = num
self.packer = types[field_type]
def pack(self, list, kwargs):
if len(list) != self.num:
raise VPPSerializerValueError(
- 'Fixed list length error, got: {} expected: {}'
- .format(len(list), self.num))
+ "Fixed list length error, got: {} expected: {}".format(
+ len(list), self.num
+ )
+ )
b = bytes()
for e in list:
b += self.packer.pack(e)
total += size
return result, total
+ def __repr__(self):
+ return "FixedList(name=%s, field_type=%s, num=%s)" % (
+ self.name,
+ self.field_type,
+ self.num,
+ )
+
-class VLAList(object):
+class VLAList(Packer):
def __init__(self, name, field_type, len_field_name, index):
self.name = name
self.field_type = field_type
self.size = self.packer.size
self.length_field = len_field_name
- def pack(self, list, kwargs=None):
- if not list:
+ def pack(self, lst, kwargs=None):
+ if not lst:
return b""
- if len(list) != kwargs[self.length_field]:
+ if len(lst) != kwargs[self.length_field]:
raise VPPSerializerValueError(
- 'Variable length error, got: {} expected: {}'
- .format(len(list), kwargs[self.length_field]))
- b = bytes()
+ "Variable length error, got: {} expected: {}".format(
+ len(lst), kwargs[self.length_field]
+ )
+ )
# u8 array
-
if self.packer.size == 1:
- return bytearray(list)
+ if isinstance(lst, list):
+ return b"".join(lst)
+ return bytes(lst)
- for e in list:
+ b = bytes()
+ for e in lst:
b += self.packer.pack(e)
return b
# u8 array
if self.packer.size == 1:
if result[self.index] == 0:
- return b'', 0
- p = BaseTypes('u8', result[self.index])
+ return b"", 0
+ p = BaseTypes("u8", result[self.index])
return p.unpack(data, offset, ntc=ntc)
r = []
total += size
return r, total
+ def __repr__(self):
+ return "VLAList(name=%s, field_type=%s, " "len_field_name=%s, index=%s)" % (
+ self.name,
+ self.field_type,
+ self.length_field,
+ self.index,
+ )
+
-class VLAList_legacy():
+class VLAList_legacy(Packer):
def __init__(self, name, field_type):
+ self.name = name
+ self.field_type = field_type
self.packer = types[field_type]
self.size = self.packer.size
# Return a list of arguments
if (len(data) - offset) % self.packer.size:
raise VPPSerializerValueError(
- 'Legacy Variable Length Array length mismatch.')
+ "Legacy Variable Length Array length mismatch."
+ )
elements = int((len(data) - offset) / self.packer.size)
r = []
for e in range(elements):
total += size
return r, total
+ def __repr__(self):
+ return "VLAList_legacy(name=%s, field_type=%s)" % (self.name, self.field_type)
-class VPPEnumType(object):
- def __init__(self, name, msgdef):
- self.size = types['u32'].size
+
+# Will change to IntEnum after 21.04 release
+class VPPEnumType(Packer):
+ output_class = IntFlag
+
+ def __init__(self, name, msgdef, options=None):
+ self.size = types["u32"].size
+ self.name = name
+ self.enumtype = "u32"
+ self.msgdef = msgdef
e_hash = {}
for f in msgdef:
- if type(f) is dict and 'enumtype' in f:
- if f['enumtype'] != 'u32':
- raise NotImplementedError
+ if type(f) is dict and "enumtype" in f:
+ if f["enumtype"] != "u32":
+ self.size = types[f["enumtype"]].size
+ self.enumtype = f["enumtype"]
continue
ename, evalue = f
e_hash[ename] = evalue
- self.enum = IntEnum(name, e_hash)
+ self.enum = self.output_class(name, e_hash)
types[name] = self
+ class_types[name] = self.__class__
+ self.options = options
def __getattr__(self, name):
return self.enum[name]
- def __nonzero__(self):
+ def __bool__(self):
return True
def pack(self, data, kwargs=None):
- return types['u32'].pack(data)
+ if data is None: # Default to zero if not specified
+ if self.options and "default" in self.options:
+ data = self.options["default"]
+ else:
+ data = 0
+
+ return types[self.enumtype].pack(data)
def unpack(self, data, offset=0, result=None, ntc=False):
- x, size = types['u32'].unpack(data, offset)
+ x, size = types[self.enumtype].unpack(data, offset)
return self.enum(x), size
+ @classmethod
+ def _get_packer_with_options(cls, f_type, options):
+ return cls(f_type, types[f_type].msgdef, options=options)
+
+ def __repr__(self):
+ return "%s(name=%s, msgdef=%s, options=%s)" % (
+ self.__class__.__name__,
+ self.name,
+ self.msgdef,
+ self.options,
+ )
+
-class VPPUnionType(object):
+class VPPEnumFlagType(VPPEnumType):
+ output_class = IntFlag
+
+ def __init__(self, name, msgdef, options=None):
+ super(VPPEnumFlagType, self).__init__(name, msgdef, options)
+
+
+class VPPUnionType(Packer):
def __init__(self, name, msgdef):
self.name = name
+ self.msgdef = msgdef
self.size = 0
self.maxindex = 0
fields = []
self.packers = collections.OrderedDict()
for i, f in enumerate(msgdef):
- if type(f) is dict and 'crc' in f:
- self.crc = f['crc']
+ if type(f) is dict and "crc" in f:
+ self.crc = f["crc"]
continue
f_type, f_name = f
if f_type not in types:
- logger.debug('Unknown union type {}'.format(f_type))
- raise VPPSerializerValueError(
- 'Unknown message type {}'.format(f_type))
+ logger.debug("Unknown union type {}".format(f_type))
+ raise VPPSerializerValueError("Unknown message type {}".format(f_type))
fields.append(f_name)
size = types[f_type].size
self.packers[f_name] = types[f_type]
# Union of variable length?
def pack(self, data, kwargs=None):
if not data:
- return b'\x00' * self.size
+ return b"\x00" * self.size
for k, v in data.items():
logger.debug("Key: {} Value: {}".format(k, v))
b = self.packers[k].pack(v, kwargs)
break
r = bytearray(self.size)
- r[:len(b)] = b
+ r[: len(b)] = b
return r
def unpack(self, data, offset=0, result=None, ntc=False):
r.append(x)
return self.tuple._make(r), maxsize
+ def __repr__(self):
+ return "VPPUnionType(name=%s, msgdef=%r)" % (self.name, self.msgdef)
-class VPPTypeAlias(object):
- def __init__(self, name, msgdef):
+
+class VPPTypeAlias(Packer):
+ def __init__(self, name, msgdef, options=None):
self.name = name
- t = vpp_get_type(msgdef['type'])
+ self.msgdef = msgdef
+ t = vpp_get_type(msgdef["type"])
if not t:
- raise ValueError()
- if 'length' in msgdef:
- if msgdef['length'] == 0:
+ raise ValueError("No such type: {}".format(msgdef["type"]))
+ if "length" in msgdef:
+ if msgdef["length"] == 0:
raise ValueError()
- if msgdef['type'] == 'u8':
- self.packer = FixedList_u8(name, msgdef['type'],
- msgdef['length'])
+ if msgdef["type"] == "u8":
+ self.packer = FixedList_u8(name, msgdef["type"], msgdef["length"])
self.size = self.packer.size
else:
- self.packer = FixedList(name, msgdef['type'], msgdef['length'])
+ self.packer = FixedList(name, msgdef["type"], msgdef["length"])
else:
self.packer = t
self.size = t.size
types[name] = self
+ self.toplevelconversion = False
+ self.options = options
def pack(self, data, kwargs=None):
if data and conversion_required(data, self.name):
try:
return conversion_packer(data, self.name)
# Python 2 and 3 raises different exceptions from inet_pton
- except(OSError, socket.error, TypeError):
+ except (OSError, socket.error, TypeError):
pass
+ if data is None: # Default to zero if not specified
+ if self.options and "default" in self.options:
+ data = self.options["default"]
+ else:
+ data = 0
return self.packer.pack(data, kwargs)
+ @staticmethod
+ def _get_packer_with_options(f_type, options):
+ return VPPTypeAlias(f_type, types[f_type].msgdef, options=options)
+
def unpack(self, data, offset=0, result=None, ntc=False):
+ if ntc is False and self.name in vpp_format.conversion_unpacker_table:
+ # Disable type conversion for dependent types
+ ntc = True
+ self.toplevelconversion = True
t, size = self.packer.unpack(data, offset, result, ntc=ntc)
- if not ntc:
+ if self.toplevelconversion:
+ self.toplevelconversion = False
return conversion_unpacker(t, self.name), size
return t, size
+ def __repr__(self):
+ return "VPPTypeAlias(name=%s, msgdef=%s, options=%s)" % (
+ self.name,
+ self.msgdef,
+ self.options,
+ )
+
-class VPPType(object):
+class VPPType(Packer):
# Set everything up to be able to pack / unpack
def __init__(self, name, msgdef):
self.name = name
self.field_by_name = {}
size = 0
for i, f in enumerate(msgdef):
- if type(f) is dict and 'crc' in f:
- self.crc = f['crc']
+ if type(f) is dict and "crc" in f:
+ self.crc = f["crc"]
continue
f_type, f_name = f[:2]
self.fields.append(f_name)
self.field_by_name[f_name] = None
self.fieldtypes.append(f_type)
if f_type not in types:
- logger.debug('Unknown type {}'.format(f_type))
- raise VPPSerializerValueError(
- 'Unknown message type {}'.format(f_type))
- if len(f) == 3: # list
+ logger.debug("Unknown type {}".format(f_type))
+ raise VPPSerializerValueError("Unknown message type {}".format(f_type))
+
+ fieldlen = len(f)
+ options = [x for x in f if type(x) is dict]
+ if len(options):
+ self.options = options[0]
+ fieldlen -= 1
+ else:
+ self.options = {}
+ if fieldlen == 3: # list
list_elements = f[2]
if list_elements == 0:
- p = VLAList_legacy(f_name, f_type)
+ if f_type == "string":
+ p = String(f_name, 0, self.options)
+ else:
+ p = VLAList_legacy(f_name, f_type)
self.packers.append(p)
- elif f_type == 'u8' or f_type == 'string':
+ elif f_type == "u8":
p = FixedList_u8(f_name, f_type, list_elements)
self.packers.append(p)
size += p.size
+ elif f_type == "string":
+ p = String(f_name, list_elements, self.options)
+ self.packers.append(p)
+ size += p.size
else:
p = FixedList(f_name, f_type, list_elements)
self.packers.append(p)
size += p.size
- elif len(f) == 4: # Variable length list
+ elif fieldlen == 4: # Variable length list
length_index = self.fields.index(f[3])
p = VLAList(f_name, f_type, f[3], length_index)
self.packers.append(p)
else:
- self.packers.append(types[f_type])
- size += types[f_type].size
+ # default support for types that decay to basetype
+ if "default" in self.options:
+ p = self.get_packer_with_options(f_type, self.options)
+ else:
+ p = types[f_type]
+
+ self.packers.append(p)
+ size += p.size
self.size = size
self.tuple = collections.namedtuple(name, self.fields, rename=True)
types[name] = self
+ self.toplevelconversion = False
def pack(self, data, kwargs=None):
if not kwargs:
for i, a in enumerate(self.fields):
if data and type(data) is not dict and a not in data:
raise VPPSerializerValueError(
- "Invalid argument: {} expected {}.{}".
- format(data, self.name, a))
+ "Invalid argument: {} expected {}.{}".format(data, self.name, a)
+ )
# Defaulting to zero.
if not data or a not in data: # Default to 0
# Return a list of arguments
result = []
total = 0
+ if ntc is False and self.name in vpp_format.conversion_unpacker_table:
+ # Disable type conversion for dependent types
+ ntc = True
+ self.toplevelconversion = True
+
for p in self.packers:
x, size = p.unpack(data, offset, result, ntc)
if type(x) is tuple and len(x) == 1:
offset += size
total += size
t = self.tuple._make(result)
- if not ntc:
+
+ if self.toplevelconversion:
+ self.toplevelconversion = False
t = conversion_unpacker(t, self.name)
return t, total
+ def __repr__(self):
+ return "%s(name=%s, msgdef=%s)" % (
+ self.__class__.__name__,
+ self.name,
+ self.msgdef,
+ )
+
class VPPMessage(VPPType):
pass