# See the License for the specific language governing permissions and
# limitations under the License.
#
-
-import struct
import collections
+import logging
+import socket
+import struct
import sys
if sys.version_info <= (3, 4):
- from aenum import IntEnum
+ from aenum import IntEnum # noqa: F401
else:
- from enum import IntEnum
+ from enum import IntEnum # noqa: F401
if sys.version_info <= (3, 6):
- from aenum import IntFlag
+ from aenum import IntFlag # noqa: F401
else:
- from enum import IntFlag
-import logging
-from . import vpp_format
-import ipaddress
+ from enum import IntFlag # noqa: F401
-import socket
+from . import vpp_format # noqa: E402
#
# Set log-level in application by doing e.g.:
return vpp_format.conversion_unpacker_table[field_type](data)
-class BaseTypes(object):
- def __init__(self, type, elements=0):
+# TODO: post 20.01, remove inherit from object.
+class Packer(object):
+ options = {}
+
+ def pack(self, data, kwargs):
+ raise NotImplementedError
+
+ def unpack(self, data, offset, result=None, ntc=False):
+ raise NotImplementedError
+
+ # override as appropriate in subclasses
+ @staticmethod
+ def _get_packer_with_options(f_type, options):
+ return types[f_type]
+
+ def get_packer_with_options(self, f_type, options):
+ if options is not None:
+ try:
+ c = types[f_type].__class__
+ return c._get_packer_with_options(f_type, options)
+ except IndexError:
+ raise VPPSerializerValueError(
+ "Options not supported for {}{} ({})".
+ format(f_type, types[f_type].__class__,
+ options))
+
+
+class BaseTypes(Packer):
+ def __init__(self, type, elements=0, options=None):
+ self._type = type
+ self._elements = elements
base_types = {'u8': '>B',
+ 'i8': '>b',
'string': '>s',
'u16': '>H',
+ 'i16': '>h',
'u32': '>I',
'i32': '>i',
'u64': '>Q',
- 'f64': '>d',
+ 'i64': '>q',
+ 'f64': '=d',
'bool': '>?',
'header': '>HI'}
else:
self.packer = struct.Struct(base_types[type])
self.size = self.packer.size
+ self.options = options
def pack(self, data, kwargs=None):
- if not data: # Default to zero if not specified
- data = 0
+ if data is None: # Default to zero if not specified
+ if self.options and 'default' in self.options:
+ data = self.options['default']
+ else:
+ data = 0
return self.packer.pack(data)
def unpack(self, data, offset, result=None, ntc=False):
return self.packer.unpack_from(data, offset)[0], self.packer.size
+ @staticmethod
+ def _get_packer_with_options(f_type, options):
+ return BaseTypes(f_type, options=options)
-class String(object):
- def __init__(self):
- self.name = 'string'
+ def __repr__(self):
+ return "BaseTypes(type=%s, elements=%s, options=%s)" % (self._type,
+ self._elements,
+ self.options)
+
+
+class String(Packer):
+ def __init__(self, name, num, options):
+ self.name = name
+ self.num = num
self.size = 1
self.length_field_packer = BaseTypes('u32')
+ self.limit = options['limit'] if 'limit' in options else num
+ self.fixed = True if num else False
+ if self.fixed and not self.limit:
+ raise VPPSerializerValueError(
+ "Invalid argument length for: {}, {} maximum {}".
+ format(list, len(list), self.limit))
def pack(self, list, kwargs=None):
if not list:
+ if self.fixed:
+ return b"\x00" * self.limit
return self.length_field_packer.pack(0) + b""
- return self.length_field_packer.pack(len(list)) + list.encode('utf8')
+ if self.limit and len(list) > self.limit - 1:
+ raise VPPSerializerValueError(
+ "Invalid argument length for: {}, {} maximum {}".
+ format(list, len(list), self.limit - 1))
+ if self.fixed:
+ return list.encode('ascii').ljust(self.limit, b'\x00')
+ return self.length_field_packer.pack(len(list)) + list.encode('ascii')
def unpack(self, data, offset=0, result=None, ntc=False):
+ if self.fixed:
+ p = BaseTypes('u8', self.num)
+ s = p.unpack(data, offset)
+ s2 = s[0].split(b'\0', 1)[0]
+ return (s2.decode('ascii'), self.num)
+
length, length_field_size = self.length_field_packer.unpack(data,
offset)
if length == 0:
- return b'', 0
+ return '', 0
p = BaseTypes('u8', length)
x, size = p.unpack(data, offset + length_field_size)
- x2 = x.split(b'\0', 1)[0]
- return (x2.decode('utf8'), size + length_field_size)
+ return (x.decode('ascii', errors='replace'), size + length_field_size)
-types = {'u8': BaseTypes('u8'), 'u16': BaseTypes('u16'),
+types = {'u8': BaseTypes('u8'), 'i8': BaseTypes('i8'),
+ 'u16': BaseTypes('u16'), 'i16': BaseTypes('i16'),
'u32': BaseTypes('u32'), 'i32': BaseTypes('i32'),
- 'u64': BaseTypes('u64'), 'f64': BaseTypes('f64'),
- 'bool': BaseTypes('bool'), 'string': String()}
+ 'u64': BaseTypes('u64'), 'i64': BaseTypes('i64'),
+ 'f64': BaseTypes('f64'),
+ 'bool': BaseTypes('bool'), 'string': String}
+
+class_types = {}
def vpp_get_type(name):
pass
-class FixedList_u8(object):
+class FixedList_u8(Packer):
def __init__(self, name, field_type, num):
self.name = name
self.num = num
' expected: {}'
.format(self.name, len(data), self.num))
- return self.packer.pack(data)
+ try:
+ return self.packer.pack(data)
+ except struct.error:
+ raise VPPSerializerValueError(
+ 'Packing failed for "{}" {}'
+ .format(self.name, kwargs))
def unpack(self, data, offset=0, result=None, ntc=False):
if len(data[offset:]) < self.num:
'Invalid array length for "{}" got {}'
' expected {}'
.format(self.name, len(data[offset:]), self.num))
- if self.field_type == 'string':
- s = self.packer.unpack(data, offset)
- s2 = s[0].split(b'\0', 1)[0]
- return (s2.decode('utf-8'), self.num)
return self.packer.unpack(data, offset)
+ def __repr__(self):
+ return "FixedList_u8(name=%s, field_type=%s, num=%s)" % (
+ self.name, self.field_type, self.num
+ )
-class FixedList(object):
+
+class FixedList(Packer):
def __init__(self, name, field_type, num):
self.num = num
self.packer = types[field_type]
total += size
return result, total
+ def __repr__(self):
+ return "FixedList(name=%s, field_type=%s, num=%s)" % (
+ self.name, self.field_type, self.num)
+
-class VLAList(object):
+class VLAList(Packer):
def __init__(self, name, field_type, len_field_name, index):
self.name = name
self.field_type = field_type
self.size = self.packer.size
self.length_field = len_field_name
- def pack(self, list, kwargs=None):
- if not list:
+ def pack(self, lst, kwargs=None):
+ if not lst:
return b""
- if len(list) != kwargs[self.length_field]:
+ if len(lst) != kwargs[self.length_field]:
raise VPPSerializerValueError(
'Variable length error, got: {} expected: {}'
- .format(len(list), kwargs[self.length_field]))
- b = bytes()
+ .format(len(lst), kwargs[self.length_field]))
# u8 array
-
if self.packer.size == 1:
- return bytearray(list)
+ if isinstance(lst, list):
+ return b''.join(lst)
+ return bytes(lst)
- for e in list:
+ b = bytes()
+ for e in lst:
b += self.packer.pack(e)
return b
total += size
return r, total
+ def __repr__(self):
+ return "VLAList(name=%s, field_type=%s, " \
+ "len_field_name=%s, index=%s)" % (
+ self.name, self.field_type, self.length_field, self.index
+ )
+
-class VLAList_legacy():
+class VLAList_legacy(Packer):
def __init__(self, name, field_type):
+ self.name = name
+ self.field_type = field_type
self.packer = types[field_type]
self.size = self.packer.size
total += size
return r, total
+ def __repr__(self):
+ return "VLAList_legacy(name=%s, field_type=%s)" % (
+ self.name, self.field_type
+ )
-class VPPEnumType(object):
- def __init__(self, name, msgdef):
+
+class VPPEnumType(Packer):
+ def __init__(self, name, msgdef, options=None):
self.size = types['u32'].size
+ self.name = name
+ self.enumtype = 'u32'
+ self.msgdef = msgdef
e_hash = {}
for f in msgdef:
if type(f) is dict and 'enumtype' in f:
if f['enumtype'] != 'u32':
- raise NotImplementedError
+ self.size = types[f['enumtype']].size
+ self.enumtype = f['enumtype']
continue
ename, evalue = f
e_hash[ename] = evalue
self.enum = IntFlag(name, e_hash)
types[name] = self
+ class_types[name] = VPPEnumType
+ self.options = options
def __getattr__(self, name):
return self.enum[name]
- def __nonzero__(self):
+ def __bool__(self):
return True
+ # TODO: Remove post 20.01.
+ if sys.version[0] == '2':
+ __nonzero__ = __bool__
+
def pack(self, data, kwargs=None):
- return types['u32'].pack(data)
+ if data is None: # Default to zero if not specified
+ if self.options and 'default' in self.options:
+ data = self.options['default']
+ else:
+ data = 0
+
+ return types[self.enumtype].pack(data)
def unpack(self, data, offset=0, result=None, ntc=False):
- x, size = types['u32'].unpack(data, offset)
+ x, size = types[self.enumtype].unpack(data, offset)
return self.enum(x), size
+ @staticmethod
+ def _get_packer_with_options(f_type, options):
+ return VPPEnumType(f_type, types[f_type].msgdef, options=options)
+
+ def __repr__(self):
+ return "VPPEnumType(name=%s, msgdef=%s, options=%s)" % (
+ self.name, self.msgdef, self.options
+ )
-class VPPUnionType(object):
+
+class VPPUnionType(Packer):
def __init__(self, name, msgdef):
self.name = name
+ self.msgdef = msgdef
self.size = 0
self.maxindex = 0
fields = []
r.append(x)
return self.tuple._make(r), maxsize
+ def __repr__(self):
+ return"VPPUnionType(name=%s, msgdef=%r)" % (self.name, self.msgdef)
-class VPPTypeAlias(object):
- def __init__(self, name, msgdef):
+
+class VPPTypeAlias(Packer):
+ def __init__(self, name, msgdef, options=None):
self.name = name
+ self.msgdef = msgdef
t = vpp_get_type(msgdef['type'])
if not t:
- raise ValueError()
+ raise ValueError('No such type: {}'.format(msgdef['type']))
if 'length' in msgdef:
if msgdef['length'] == 0:
raise ValueError()
self.size = t.size
types[name] = self
+ self.toplevelconversion = False
+ self.options = options
def pack(self, data, kwargs=None):
if data and conversion_required(data, self.name):
# Python 2 and 3 raises different exceptions from inet_pton
except(OSError, socket.error, TypeError):
pass
+ if data is None: # Default to zero if not specified
+ if self.options and 'default' in self.options:
+ data = self.options['default']
+ else:
+ data = 0
return self.packer.pack(data, kwargs)
+ @staticmethod
+ def _get_packer_with_options(f_type, options):
+ return VPPTypeAlias(f_type, types[f_type].msgdef, options=options)
+
def unpack(self, data, offset=0, result=None, ntc=False):
+ if ntc is False and self.name in vpp_format.conversion_unpacker_table:
+ # Disable type conversion for dependent types
+ ntc = True
+ self.toplevelconversion = True
t, size = self.packer.unpack(data, offset, result, ntc=ntc)
- if not ntc:
+ if self.toplevelconversion:
+ self.toplevelconversion = False
return conversion_unpacker(t, self.name), size
return t, size
+ def __repr__(self):
+ return "VPPTypeAlias(name=%s, msgdef=%s, options=%s)" % (
+ self.name, self.msgdef, self.options)
+
-class VPPType(object):
+class VPPType(Packer):
# Set everything up to be able to pack / unpack
def __init__(self, name, msgdef):
self.name = name
logger.debug('Unknown type {}'.format(f_type))
raise VPPSerializerValueError(
'Unknown message type {}'.format(f_type))
- if len(f) == 3: # list
+
+ fieldlen = len(f)
+ options = [x for x in f if type(x) is dict]
+ if len(options):
+ self.options = options[0]
+ fieldlen -= 1
+ else:
+ self.options = {}
+ if fieldlen == 3: # list
list_elements = f[2]
if list_elements == 0:
- p = VLAList_legacy(f_name, f_type)
+ if f_type == 'string':
+ p = String(f_name, 0, self.options)
+ else:
+ p = VLAList_legacy(f_name, f_type)
self.packers.append(p)
- elif f_type == 'u8' or f_type == 'string':
+ elif f_type == 'u8':
p = FixedList_u8(f_name, f_type, list_elements)
self.packers.append(p)
size += p.size
+ elif f_type == 'string':
+ p = String(f_name, list_elements, self.options)
+ self.packers.append(p)
+ size += p.size
else:
p = FixedList(f_name, f_type, list_elements)
self.packers.append(p)
size += p.size
- elif len(f) == 4: # Variable length list
+ elif fieldlen == 4: # Variable length list
length_index = self.fields.index(f[3])
p = VLAList(f_name, f_type, f[3], length_index)
self.packers.append(p)
else:
- self.packers.append(types[f_type])
- size += types[f_type].size
+ # default support for types that decay to basetype
+ if 'default' in self.options:
+ p = self.get_packer_with_options(f_type, self.options)
+ else:
+ p = types[f_type]
+
+ self.packers.append(p)
+ size += p.size
self.size = size
self.tuple = collections.namedtuple(name, self.fields, rename=True)
types[name] = self
+ self.toplevelconversion = False
def pack(self, data, kwargs=None):
if not kwargs:
# Return a list of arguments
result = []
total = 0
+ if ntc is False and self.name in vpp_format.conversion_unpacker_table:
+ # Disable type conversion for dependent types
+ ntc = True
+ self.toplevelconversion = True
+
for p in self.packers:
x, size = p.unpack(data, offset, result, ntc)
if type(x) is tuple and len(x) == 1:
offset += size
total += size
t = self.tuple._make(result)
- if not ntc:
+
+ if self.toplevelconversion:
+ self.toplevelconversion = False
t = conversion_unpacker(t, self.name)
return t, total
+ def __repr__(self):
+ return "%s(name=%s, msgdef=%s)" % (
+ self.__class__.__name__, self.name, self.msgdef
+ )
+
class VPPMessage(VPPType):
pass