import struct
import collections
import sys
-if sys.version[0] == '2':
- from aenum import IntEnum, IntFlag
-else:
- from enum import IntEnum, IntFlag
import logging
from . import vpp_format
import ipaddress
-
import socket
+if sys.version_info <= (3, 4):
+ from aenum import IntEnum
+else:
+ from enum import IntEnum
+
+if sys.version_info <= (3, 6):
+ from aenum import IntFlag
+else:
+ from enum import IntFlag
+
+
#
# Set log-level in application by doing e.g.:
# logger = logging.getLogger('vpp_serializer')
class BaseTypes(object):
- def __init__(self, type, elements=0):
+ def __init__(self, type, elements=0, options=None):
base_types = {'u8': '>B',
'string': '>s',
'u16': '>H',
else:
self.packer = struct.Struct(base_types[type])
self.size = self.packer.size
+ self.options = options
+
+ def __call__(self, args):
+ self.options = args
+ return self
def pack(self, data, kwargs=None):
if not data: # Default to zero if not specified
- data = 0
+ if self.options and 'default' in self.options:
+ data = self.options['default']
+ else:
+ data = 0
return self.packer.pack(data)
def unpack(self, data, offset, result=None, ntc=False):
class String(object):
- def __init__(self):
+ def __init__(self, options):
self.name = 'string'
self.size = 1
self.length_field_packer = BaseTypes('u32')
+ self.limit = options['limit'] if 'limit' in options else None
def pack(self, list, kwargs=None):
if not list:
return self.length_field_packer.pack(0) + b""
+ if self.limit and len(list) > self.limit:
+ raise VPPSerializerValueError(
+ "Invalid argument length for: {}, {} maximum {}".
+ format(list, len(list), self.limit))
+
return self.length_field_packer.pack(len(list)) + list.encode('utf8')
def unpack(self, data, offset=0, result=None, ntc=False):
types = {'u8': BaseTypes('u8'), 'u16': BaseTypes('u16'),
'u32': BaseTypes('u32'), 'i32': BaseTypes('i32'),
'u64': BaseTypes('u64'), 'f64': BaseTypes('f64'),
- 'bool': BaseTypes('bool'), 'string': String()}
+ 'bool': BaseTypes('bool'), 'string': String}
def vpp_get_type(name):
self.size = self.packer.size
self.field_type = field_type
+ def __call__(self, args):
+ self.options = args
+ return self
+
def pack(self, data, kwargs=None):
"""Packs a fixed length bytestring. Left-pads with zeros
if input data is too short."""
self.name = name
self.field_type = field_type
+ def __call__(self, args):
+ self.options = args
+ return self
+
def pack(self, list, kwargs):
if len(list) != self.num:
raise VPPSerializerValueError(
self.size = self.packer.size
self.length_field = len_field_name
+ def __call__(self, args):
+ self.options = args
+ return self
+
def pack(self, list, kwargs=None):
if not list:
return b""
self.packer = types[field_type]
self.size = self.packer.size
+ def __call__(self, args):
+ self.options = args
+ return self
+
def pack(self, list, kwargs=None):
if self.packer.size == 1:
return bytes(list)
class VPPEnumType(object):
def __init__(self, name, msgdef):
self.size = types['u32'].size
+ self.enumtype = 'u32'
e_hash = {}
for f in msgdef:
if type(f) is dict and 'enumtype' in f:
if f['enumtype'] != 'u32':
- raise NotImplementedError
+ self.size = types[f['enumtype']].size
+ self.enumtype = f['enumtype']
continue
ename, evalue = f
e_hash[ename] = evalue
self.enum = IntFlag(name, e_hash)
types[name] = self
+ def __call__(self, args):
+ self.options = args
+ return self
+
def __getattr__(self, name):
return self.enum[name]
return True
def pack(self, data, kwargs=None):
- return types['u32'].pack(data)
+ return types[self.enumtype].pack(data)
def unpack(self, data, offset=0, result=None, ntc=False):
- x, size = types['u32'].unpack(data, offset)
+ x, size = types[self.enumtype].unpack(data, offset)
return self.enum(x), size
types[name] = self
self.tuple = collections.namedtuple(name, fields, rename=True)
+ def __call__(self, args):
+ self.options = args
+ return self
+
# Union of variable length?
def pack(self, data, kwargs=None):
if not data:
types[name] = self
+ def __call__(self, args):
+ self.options = args
+ return self
+
def pack(self, data, kwargs=None):
if data and conversion_required(data, self.name):
try:
logger.debug('Unknown type {}'.format(f_type))
raise VPPSerializerValueError(
'Unknown message type {}'.format(f_type))
- if len(f) == 3: # list
+
+ fieldlen = len(f)
+ options = [x for x in f if type(x) is dict]
+ if len(options):
+ self.options = options[0]
+ fieldlen -= 1
+ else:
+ self.options = {}
+ if fieldlen == 3: # list
list_elements = f[2]
if list_elements == 0:
p = VLAList_legacy(f_name, f_type)
p = FixedList(f_name, f_type, list_elements)
self.packers.append(p)
size += p.size
- elif len(f) == 4: # Variable length list
+ elif fieldlen == 4: # Variable length list
length_index = self.fields.index(f[3])
p = VLAList(f_name, f_type, f[3], length_index)
self.packers.append(p)
else:
- self.packers.append(types[f_type])
- size += types[f_type].size
+ p = types[f_type](self.options)
+ self.packers.append(p)
+ size += p.size
self.size = size
self.tuple = collections.namedtuple(name, self.fields, rename=True)
types[name] = self
+ def __call__(self, args):
+ self.options = args
+ return self
+
def pack(self, data, kwargs=None):
if not kwargs:
kwargs = data