# See the License for the specific language governing permissions and
# limitations under the License.
#
-
-import struct
import collections
+import logging
+import socket
+import struct
import sys
if sys.version_info <= (3, 4):
- from aenum import IntEnum
+ from aenum import IntEnum # noqa: F401
else:
- from enum import IntEnum
+ from enum import IntEnum # noqa: F401
if sys.version_info <= (3, 6):
- from aenum import IntFlag
+ from aenum import IntFlag # noqa: F401
else:
- from enum import IntFlag
-import logging
-from . import vpp_format
-import ipaddress
+ from enum import IntFlag # noqa: F401
-import socket
+from . import vpp_format # noqa: E402
#
# Set log-level in application by doing e.g.:
class BaseTypes(object):
- def __init__(self, type, elements=0):
+ def __init__(self, type, elements=0, options=None):
base_types = {'u8': '>B',
+ 'i8': '>b',
'string': '>s',
'u16': '>H',
+ 'i16': '>h',
'u32': '>I',
'i32': '>i',
'u64': '>Q',
- 'f64': '>d',
+ 'i64': '>q',
+ 'f64': '=d',
'bool': '>?',
'header': '>HI'}
else:
self.packer = struct.Struct(base_types[type])
self.size = self.packer.size
+ self.options = options
+
+ def __call__(self, args):
+ self.options = args
+ return self
def pack(self, data, kwargs=None):
if not data: # Default to zero if not specified
- data = 0
+ if self.options and 'default' in self.options:
+ data = self.options['default']
+ else:
+ data = 0
return self.packer.pack(data)
def unpack(self, data, offset, result=None, ntc=False):
class String(object):
- def __init__(self):
- self.name = 'string'
+ def __init__(self, name, num, options):
+ self.name = name
+ self.num = num
self.size = 1
self.length_field_packer = BaseTypes('u32')
+ self.limit = options['limit'] if 'limit' in options else num
+ self.fixed = True if num else False
+ if self.fixed and not self.limit:
+ raise VPPSerializerValueError(
+ "Invalid argument length for: {}, {} maximum {}".
+ format(list, len(list), self.limit))
def pack(self, list, kwargs=None):
if not list:
+ if self.fixed:
+ return b"\x00" * self.limit
return self.length_field_packer.pack(0) + b""
- return self.length_field_packer.pack(len(list)) + list.encode('utf8')
+ if self.limit and len(list) > self.limit - 1:
+ raise VPPSerializerValueError(
+ "Invalid argument length for: {}, {} maximum {}".
+ format(list, len(list), self.limit - 1))
+ if self.fixed:
+ return list.encode('ascii').ljust(self.limit, b'\x00')
+ return self.length_field_packer.pack(len(list)) + list.encode('ascii')
def unpack(self, data, offset=0, result=None, ntc=False):
+ if self.fixed:
+ p = BaseTypes('u8', self.num)
+ s = p.unpack(data, offset)
+ s2 = s[0].split(b'\0', 1)[0]
+ return (s2.decode('ascii'), self.num)
+
length, length_field_size = self.length_field_packer.unpack(data,
offset)
if length == 0:
- return b'', 0
+ return '', 0
p = BaseTypes('u8', length)
x, size = p.unpack(data, offset + length_field_size)
- x2 = x.split(b'\0', 1)[0]
- return (x2.decode('utf8'), size + length_field_size)
+ #x2 = x.split(b'\0', 1)[0]
+ return (x.decode('ascii', errors='replace'), size + length_field_size)
types = {'u8': BaseTypes('u8'), 'u16': BaseTypes('u16'),
'u32': BaseTypes('u32'), 'i32': BaseTypes('i32'),
'u64': BaseTypes('u64'), 'f64': BaseTypes('f64'),
- 'bool': BaseTypes('bool'), 'string': String()}
+ 'bool': BaseTypes('bool'), 'string': String}
def vpp_get_type(name):
self.size = self.packer.size
self.field_type = field_type
+ def __call__(self, args):
+ self.options = args
+ return self
+
def pack(self, data, kwargs=None):
"""Packs a fixed length bytestring. Left-pads with zeros
if input data is too short."""
'Invalid array length for "{}" got {}'
' expected {}'
.format(self.name, len(data[offset:]), self.num))
- if self.field_type == 'string':
- s = self.packer.unpack(data, offset)
- s2 = s[0].split(b'\0', 1)[0]
- return (s2.decode('utf-8'), self.num)
return self.packer.unpack(data, offset)
self.name = name
self.field_type = field_type
+ def __call__(self, args):
+ self.options = args
+ return self
+
def pack(self, list, kwargs):
if len(list) != self.num:
raise VPPSerializerValueError(
self.size = self.packer.size
self.length_field = len_field_name
+ def __call__(self, args):
+ self.options = args
+ return self
+
def pack(self, list, kwargs=None):
if not list:
return b""
self.packer = types[field_type]
self.size = self.packer.size
+ def __call__(self, args):
+ self.options = args
+ return self
+
def pack(self, list, kwargs=None):
if self.packer.size == 1:
return bytes(list)
class VPPEnumType(object):
def __init__(self, name, msgdef):
self.size = types['u32'].size
+ self.enumtype = 'u32'
e_hash = {}
for f in msgdef:
if type(f) is dict and 'enumtype' in f:
if f['enumtype'] != 'u32':
- raise NotImplementedError
+ self.size = types[f['enumtype']].size
+ self.enumtype = f['enumtype']
continue
ename, evalue = f
e_hash[ename] = evalue
self.enum = IntFlag(name, e_hash)
types[name] = self
+ def __call__(self, args):
+ self.options = args
+ return self
+
def __getattr__(self, name):
return self.enum[name]
- def __nonzero__(self):
+ def __bool__(self):
return True
+ if sys.version[0] == '2':
+ __nonzero__ = __bool__
+
def pack(self, data, kwargs=None):
- return types['u32'].pack(data)
+ return types[self.enumtype].pack(data)
def unpack(self, data, offset=0, result=None, ntc=False):
- x, size = types['u32'].unpack(data, offset)
+ x, size = types[self.enumtype].unpack(data, offset)
return self.enum(x), size
types[name] = self
self.tuple = collections.namedtuple(name, fields, rename=True)
+ def __call__(self, args):
+ self.options = args
+ return self
+
# Union of variable length?
def pack(self, data, kwargs=None):
if not data:
types[name] = self
+ def __call__(self, args):
+ self.options = args
+ return self
+
def pack(self, data, kwargs=None):
if data and conversion_required(data, self.name):
try:
logger.debug('Unknown type {}'.format(f_type))
raise VPPSerializerValueError(
'Unknown message type {}'.format(f_type))
- if len(f) == 3: # list
+
+ fieldlen = len(f)
+ options = [x for x in f if type(x) is dict]
+ if len(options):
+ self.options = options[0]
+ fieldlen -= 1
+ else:
+ self.options = {}
+ if fieldlen == 3: # list
list_elements = f[2]
if list_elements == 0:
- p = VLAList_legacy(f_name, f_type)
+ if f_type == 'string':
+ p = String(f_name, 0, self.options)
+ else:
+ p = VLAList_legacy(f_name, f_type)
self.packers.append(p)
- elif f_type == 'u8' or f_type == 'string':
+ elif f_type == 'u8':
p = FixedList_u8(f_name, f_type, list_elements)
self.packers.append(p)
size += p.size
+ elif f_type == 'string':
+ p = String(f_name, list_elements, self.options)
+ self.packers.append(p)
+ size += p.size
else:
p = FixedList(f_name, f_type, list_elements)
self.packers.append(p)
size += p.size
- elif len(f) == 4: # Variable length list
+ elif fieldlen == 4: # Variable length list
length_index = self.fields.index(f[3])
p = VLAList(f_name, f_type, f[3], length_index)
self.packers.append(p)
else:
- self.packers.append(types[f_type])
- size += types[f_type].size
+ p = types[f_type](self.options)
+ self.packers.append(p)
+ size += p.size
self.size = size
self.tuple = collections.namedtuple(name, self.fields, rename=True)
types[name] = self
+ def __call__(self, args):
+ self.options = args
+ return self
+
def pack(self, data, kwargs=None):
if not kwargs:
kwargs = data