2 # Copyright (c) 2018 Cisco and/or its affiliates.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
20 if sys.version_info <= (3, 4):
21 from aenum import IntEnum
23 from enum import IntEnum
25 if sys.version_info <= (3, 6):
26 from aenum import IntFlag
28 from enum import IntFlag
31 from . import vpp_format
37 # Set log-level in application by doing e.g.:
38 # logger = logging.getLogger('vpp_serializer')
39 # logger.setLevel(logging.DEBUG)
41 logger = logging.getLogger(__name__)
43 if sys.version[0] == '2':
44 def check(d): type(d) is dict
46 def check(d): type(d) is dict or type(d) is bytes
49 def conversion_required(data, field_type):
53 if type(data).__name__ in vpp_format.conversion_table[field_type]:
59 def conversion_packer(data, field_type):
60 t = type(data).__name__
61 return types[field_type].pack(vpp_format.
62 conversion_table[field_type][t](data))
65 def conversion_unpacker(data, field_type):
66 if field_type not in vpp_format.conversion_unpacker_table:
68 return vpp_format.conversion_unpacker_table[field_type](data)
71 class BaseTypes(object):
72 def __init__(self, type, elements=0):
73 base_types = {'u8': '>B',
83 if elements > 0 and (type == 'u8' or type == 'string'):
84 self.packer = struct.Struct('>%ss' % elements)
86 self.packer = struct.Struct(base_types[type])
87 self.size = self.packer.size
89 def pack(self, data, kwargs=None):
90 if not data: # Default to zero if not specified
92 return self.packer.pack(data)
94 def unpack(self, data, offset, result=None, ntc=False):
95 return self.packer.unpack_from(data, offset)[0], self.packer.size
99 def __init__(self, options):
102 self.length_field_packer = BaseTypes('u32')
103 self.limit = options['limit'] if 'limit' in options else None
105 def pack(self, list, kwargs=None):
107 return self.length_field_packer.pack(0) + b""
108 if self.limit and len(list) > self.limit:
109 raise VPPSerializerValueError(
110 "Invalid argument length for: {}, {} maximum {}".
111 format(list, len(list), self.limit))
113 return self.length_field_packer.pack(len(list)) + list.encode('utf8')
115 def unpack(self, data, offset=0, result=None, ntc=False):
116 length, length_field_size = self.length_field_packer.unpack(data,
120 p = BaseTypes('u8', length)
121 x, size = p.unpack(data, offset + length_field_size)
122 x2 = x.split(b'\0', 1)[0]
123 return (x2.decode('utf8'), size + length_field_size)
126 types = {'u8': BaseTypes('u8'), 'u16': BaseTypes('u16'),
127 'u32': BaseTypes('u32'), 'i32': BaseTypes('i32'),
128 'u64': BaseTypes('u64'), 'f64': BaseTypes('f64'),
129 'bool': BaseTypes('bool'), 'string': String}
132 def vpp_get_type(name):
139 class VPPSerializerValueError(ValueError):
143 class FixedList_u8(object):
144 def __init__(self, name, field_type, num):
147 self.packer = BaseTypes(field_type, num)
148 self.size = self.packer.size
149 self.field_type = field_type
151 def pack(self, data, kwargs=None):
152 """Packs a fixed length bytestring. Left-pads with zeros
153 if input data is too short."""
155 return b'\x00' * self.size
157 if len(data) > self.num:
158 raise VPPSerializerValueError(
159 'Fixed list length error for "{}", got: {}'
161 .format(self.name, len(data), self.num))
163 return self.packer.pack(data)
165 def unpack(self, data, offset=0, result=None, ntc=False):
166 if len(data[offset:]) < self.num:
167 raise VPPSerializerValueError(
168 'Invalid array length for "{}" got {}'
170 .format(self.name, len(data[offset:]), self.num))
171 if self.field_type == 'string':
172 s = self.packer.unpack(data, offset)
173 s2 = s[0].split(b'\0', 1)[0]
174 return (s2.decode('utf-8'), self.num)
175 return self.packer.unpack(data, offset)
178 class FixedList(object):
179 def __init__(self, name, field_type, num):
181 self.packer = types[field_type]
182 self.size = self.packer.size * num
184 self.field_type = field_type
186 def pack(self, list, kwargs):
187 if len(list) != self.num:
188 raise VPPSerializerValueError(
189 'Fixed list length error, got: {} expected: {}'
190 .format(len(list), self.num))
193 b += self.packer.pack(e)
196 def unpack(self, data, offset=0, result=None, ntc=False):
197 # Return a list of arguments
200 for e in range(self.num):
201 x, size = self.packer.unpack(data, offset, ntc=ntc)
208 class VLAList(object):
209 def __init__(self, name, field_type, len_field_name, index):
211 self.field_type = field_type
213 self.packer = types[field_type]
214 self.size = self.packer.size
215 self.length_field = len_field_name
217 def pack(self, list, kwargs=None):
220 if len(list) != kwargs[self.length_field]:
221 raise VPPSerializerValueError(
222 'Variable length error, got: {} expected: {}'
223 .format(len(list), kwargs[self.length_field]))
228 if self.packer.size == 1:
229 return bytearray(list)
232 b += self.packer.pack(e)
235 def unpack(self, data, offset=0, result=None, ntc=False):
236 # Return a list of arguments
240 if self.packer.size == 1:
241 if result[self.index] == 0:
243 p = BaseTypes('u8', result[self.index])
244 return p.unpack(data, offset, ntc=ntc)
247 for e in range(result[self.index]):
248 x, size = self.packer.unpack(data, offset, ntc=ntc)
255 class VLAList_legacy():
256 def __init__(self, name, field_type):
257 self.packer = types[field_type]
258 self.size = self.packer.size
260 def pack(self, list, kwargs=None):
261 if self.packer.size == 1:
266 b += self.packer.pack(e)
269 def unpack(self, data, offset=0, result=None, ntc=False):
271 # Return a list of arguments
272 if (len(data) - offset) % self.packer.size:
273 raise VPPSerializerValueError(
274 'Legacy Variable Length Array length mismatch.')
275 elements = int((len(data) - offset) / self.packer.size)
277 for e in range(elements):
278 x, size = self.packer.unpack(data, offset, ntc=ntc)
280 offset += self.packer.size
285 class VPPEnumType(object):
286 def __init__(self, name, msgdef):
287 self.size = types['u32'].size
288 self.enumtype = 'u32'
291 if type(f) is dict and 'enumtype' in f:
292 if f['enumtype'] != 'u32':
293 self.size = types[f['enumtype']].size
294 self.enumtype = f['enumtype']
297 e_hash[ename] = evalue
298 self.enum = IntFlag(name, e_hash)
301 def __getattr__(self, name):
302 return self.enum[name]
304 def __nonzero__(self):
307 def pack(self, data, kwargs=None):
308 return types[self.enumtype].pack(data)
310 def unpack(self, data, offset=0, result=None, ntc=False):
311 x, size = types[self.enumtype].unpack(data, offset)
312 return self.enum(x), size
315 class VPPUnionType(object):
316 def __init__(self, name, msgdef):
321 self.packers = collections.OrderedDict()
322 for i, f in enumerate(msgdef):
323 if type(f) is dict and 'crc' in f:
327 if f_type not in types:
328 logger.debug('Unknown union type {}'.format(f_type))
329 raise VPPSerializerValueError(
330 'Unknown message type {}'.format(f_type))
331 fields.append(f_name)
332 size = types[f_type].size
333 self.packers[f_name] = types[f_type]
339 self.tuple = collections.namedtuple(name, fields, rename=True)
341 # Union of variable length?
342 def pack(self, data, kwargs=None):
344 return b'\x00' * self.size
346 for k, v in data.items():
347 logger.debug("Key: {} Value: {}".format(k, v))
348 b = self.packers[k].pack(v, kwargs)
350 r = bytearray(self.size)
354 def unpack(self, data, offset=0, result=None, ntc=False):
357 for k, p in self.packers.items():
358 x, size = p.unpack(data, offset, ntc=ntc)
362 return self.tuple._make(r), maxsize
365 class VPPTypeAlias(object):
366 def __init__(self, name, msgdef):
368 t = vpp_get_type(msgdef['type'])
371 if 'length' in msgdef:
372 if msgdef['length'] == 0:
374 if msgdef['type'] == 'u8':
375 self.packer = FixedList_u8(name, msgdef['type'],
377 self.size = self.packer.size
379 self.packer = FixedList(name, msgdef['type'], msgdef['length'])
386 def pack(self, data, kwargs=None):
387 if data and conversion_required(data, self.name):
389 return conversion_packer(data, self.name)
390 # Python 2 and 3 raises different exceptions from inet_pton
391 except(OSError, socket.error, TypeError):
394 return self.packer.pack(data, kwargs)
396 def unpack(self, data, offset=0, result=None, ntc=False):
397 t, size = self.packer.unpack(data, offset, result, ntc=ntc)
399 return conversion_unpacker(t, self.name), size
403 class VPPType(object):
404 # Set everything up to be able to pack / unpack
405 def __init__(self, name, msgdef):
411 self.field_by_name = {}
413 for i, f in enumerate(msgdef):
414 if type(f) is dict and 'crc' in f:
417 f_type, f_name = f[:2]
418 self.fields.append(f_name)
419 self.field_by_name[f_name] = None
420 self.fieldtypes.append(f_type)
421 if f_type not in types:
422 logger.debug('Unknown type {}'.format(f_type))
423 raise VPPSerializerValueError(
424 'Unknown message type {}'.format(f_type))
427 options = [x for x in f if type(x) is dict]
429 self.options = options[0]
435 if list_elements == 0:
436 p = VLAList_legacy(f_name, f_type)
437 self.packers.append(p)
438 elif f_type == 'u8' or f_type == 'string':
439 p = FixedList_u8(f_name, f_type, list_elements)
440 self.packers.append(p)
443 p = FixedList(f_name, f_type, list_elements)
444 self.packers.append(p)
446 elif l == 4: # Variable length list
447 length_index = self.fields.index(f[3])
448 p = VLAList(f_name, f_type, f[3], length_index)
449 self.packers.append(p)
451 if f_type == 'string':
452 p = types[f_type](self.options)
455 self.packers.append(p)
459 self.tuple = collections.namedtuple(name, self.fields, rename=True)
462 def pack(self, data, kwargs=None):
467 # Try one of the format functions
468 if data and conversion_required(data, self.name):
469 return conversion_packer(data, self.name)
471 for i, a in enumerate(self.fields):
472 if data and type(data) is not dict and a not in data:
473 raise VPPSerializerValueError(
474 "Invalid argument: {} expected {}.{}".
475 format(data, self.name, a))
477 # Defaulting to zero.
478 if not data or a not in data: # Default to 0
480 kwarg = None # No default for VLA
483 kwarg = kwargs[a] if a in kwargs else None
484 if isinstance(self.packers[i], VPPType):
485 b += self.packers[i].pack(arg, kwarg)
487 b += self.packers[i].pack(arg, kwargs)
491 def unpack(self, data, offset=0, result=None, ntc=False):
492 # Return a list of arguments
495 for p in self.packers:
496 x, size = p.unpack(data, offset, result, ntc)
497 if type(x) is tuple and len(x) == 1:
502 t = self.tuple._make(result)
504 t = conversion_unpacker(t, self.name)
508 class VPPMessage(VPPType):