2 # Copyright (c) 2018 Cisco and/or its affiliates.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
16 from enum import IntFlag
22 from . import vpp_format
26 # Set log-level in application by doing e.g.:
27 # logger = logging.getLogger('vpp_serializer')
28 # logger.setLevel(logging.DEBUG)
30 logger = logging.getLogger('vpp_papi.serializer')
32 if sys.version[0] == '2':
33 def check(d): type(d) is dict
35 def check(d): type(d) is dict or type(d) is bytes
38 def conversion_required(data, field_type):
42 if type(data).__name__ in vpp_format.conversion_table[field_type]:
48 def conversion_packer(data, field_type):
49 t = type(data).__name__
50 return types[field_type].pack(vpp_format.
51 conversion_table[field_type][t](data))
54 def conversion_unpacker(data, field_type):
55 if field_type not in vpp_format.conversion_unpacker_table:
57 return vpp_format.conversion_unpacker_table[field_type](data)
60 # TODO: post 20.01, remove inherit from object.
64 def pack(self, data, kwargs):
65 raise NotImplementedError
67 def unpack(self, data, offset, result=None, ntc=False):
68 raise NotImplementedError
70 # override as appropriate in subclasses
72 def _get_packer_with_options(f_type, options):
75 def get_packer_with_options(self, f_type, options):
76 if options is not None:
78 c = types[f_type].__class__
79 return c._get_packer_with_options(f_type, options)
81 raise VPPSerializerValueError(
82 "Options not supported for {}{} ({})".
83 format(f_type, types[f_type].__class__,
87 class BaseTypes(Packer):
88 def __init__(self, type, elements=0, options=None):
90 self._elements = elements
91 base_types = {'u8': '>B',
104 if elements > 0 and (type == 'u8' or type == 'string'):
105 self.packer = struct.Struct('>%ss' % elements)
107 self.packer = struct.Struct(base_types[type])
108 self.size = self.packer.size
109 self.options = options
111 def pack(self, data, kwargs=None):
112 if data is None: # Default to zero if not specified
113 if self.options and 'default' in self.options:
114 data = self.options['default']
117 return self.packer.pack(data)
119 def unpack(self, data, offset, result=None, ntc=False):
120 return self.packer.unpack_from(data, offset)[0], self.packer.size
123 def _get_packer_with_options(f_type, options):
124 return BaseTypes(f_type, options=options)
127 return "BaseTypes(type=%s, elements=%s, options=%s)" % (self._type,
132 class String(Packer):
133 def __init__(self, name, num, options):
137 self.length_field_packer = BaseTypes('u32')
138 self.limit = options['limit'] if 'limit' in options else num
139 self.fixed = True if num else False
140 if self.fixed and not self.limit:
141 raise VPPSerializerValueError(
142 "Invalid combination for: {}, {} fixed:{} limit:{}".
143 format(name, options, self.fixed, self.limit))
145 def pack(self, list, kwargs=None):
148 return b"\x00" * self.limit
149 return self.length_field_packer.pack(0) + b""
150 if self.limit and len(list) > self.limit - 1:
151 raise VPPSerializerValueError(
152 "Invalid argument length for: {}, {} maximum {}".
153 format(list, len(list), self.limit - 1))
155 return list.encode('ascii').ljust(self.limit, b'\x00')
156 return self.length_field_packer.pack(len(list)) + list.encode('ascii')
158 def unpack(self, data, offset=0, result=None, ntc=False):
160 p = BaseTypes('u8', self.num)
161 s = p.unpack(data, offset)
162 s2 = s[0].split(b'\0', 1)[0]
163 return (s2.decode('ascii'), self.num)
165 length, length_field_size = self.length_field_packer.unpack(data,
169 p = BaseTypes('u8', length)
170 x, size = p.unpack(data, offset + length_field_size)
171 return (x.decode('ascii', errors='replace'), size + length_field_size)
174 types = {'u8': BaseTypes('u8'), 'i8': BaseTypes('i8'),
175 'u16': BaseTypes('u16'), 'i16': BaseTypes('i16'),
176 'u32': BaseTypes('u32'), 'i32': BaseTypes('i32'),
177 'u64': BaseTypes('u64'), 'i64': BaseTypes('i64'),
178 'f64': BaseTypes('f64'),
179 'bool': BaseTypes('bool'), 'string': String}
184 def vpp_get_type(name):
191 class VPPSerializerValueError(ValueError):
195 class FixedList_u8(Packer):
196 def __init__(self, name, field_type, num):
199 self.packer = BaseTypes(field_type, num)
200 self.size = self.packer.size
201 self.field_type = field_type
203 def pack(self, data, kwargs=None):
204 """Packs a fixed length bytestring. Left-pads with zeros
205 if input data is too short."""
207 return b'\x00' * self.size
209 if len(data) > self.num:
210 raise VPPSerializerValueError(
211 'Fixed list length error for "{}", got: {}'
213 .format(self.name, len(data), self.num))
216 return self.packer.pack(data)
218 raise VPPSerializerValueError(
219 'Packing failed for "{}" {}'
220 .format(self.name, kwargs))
222 def unpack(self, data, offset=0, result=None, ntc=False):
223 if len(data[offset:]) < self.num:
224 raise VPPSerializerValueError(
225 'Invalid array length for "{}" got {}'
227 .format(self.name, len(data[offset:]), self.num))
228 return self.packer.unpack(data, offset)
231 return "FixedList_u8(name=%s, field_type=%s, num=%s)" % (
232 self.name, self.field_type, self.num
236 class FixedList(Packer):
237 def __init__(self, name, field_type, num):
239 self.packer = types[field_type]
240 self.size = self.packer.size * num
242 self.field_type = field_type
244 def pack(self, list, kwargs):
245 if len(list) != self.num:
246 raise VPPSerializerValueError(
247 'Fixed list length error, got: {} expected: {}'
248 .format(len(list), self.num))
251 b += self.packer.pack(e)
254 def unpack(self, data, offset=0, result=None, ntc=False):
255 # Return a list of arguments
258 for e in range(self.num):
259 x, size = self.packer.unpack(data, offset, ntc=ntc)
266 return "FixedList(name=%s, field_type=%s, num=%s)" % (
267 self.name, self.field_type, self.num)
270 class VLAList(Packer):
271 def __init__(self, name, field_type, len_field_name, index):
273 self.field_type = field_type
275 self.packer = types[field_type]
276 self.size = self.packer.size
277 self.length_field = len_field_name
279 def pack(self, lst, kwargs=None):
282 if len(lst) != kwargs[self.length_field]:
283 raise VPPSerializerValueError(
284 'Variable length error, got: {} expected: {}'
285 .format(len(lst), kwargs[self.length_field]))
288 if self.packer.size == 1:
289 if isinstance(lst, list):
295 b += self.packer.pack(e)
298 def unpack(self, data, offset=0, result=None, ntc=False):
299 # Return a list of arguments
303 if self.packer.size == 1:
304 if result[self.index] == 0:
306 p = BaseTypes('u8', result[self.index])
307 return p.unpack(data, offset, ntc=ntc)
310 for e in range(result[self.index]):
311 x, size = self.packer.unpack(data, offset, ntc=ntc)
318 return "VLAList(name=%s, field_type=%s, " \
319 "len_field_name=%s, index=%s)" % (
320 self.name, self.field_type, self.length_field, self.index
324 class VLAList_legacy(Packer):
325 def __init__(self, name, field_type):
327 self.field_type = field_type
328 self.packer = types[field_type]
329 self.size = self.packer.size
331 def pack(self, list, kwargs=None):
332 if self.packer.size == 1:
337 b += self.packer.pack(e)
340 def unpack(self, data, offset=0, result=None, ntc=False):
342 # Return a list of arguments
343 if (len(data) - offset) % self.packer.size:
344 raise VPPSerializerValueError(
345 'Legacy Variable Length Array length mismatch.')
346 elements = int((len(data) - offset) / self.packer.size)
348 for e in range(elements):
349 x, size = self.packer.unpack(data, offset, ntc=ntc)
351 offset += self.packer.size
356 return "VLAList_legacy(name=%s, field_type=%s)" % (
357 self.name, self.field_type
361 class VPPEnumType(Packer):
362 def __init__(self, name, msgdef, options=None):
363 self.size = types['u32'].size
365 self.enumtype = 'u32'
369 if type(f) is dict and 'enumtype' in f:
370 if f['enumtype'] != 'u32':
371 self.size = types[f['enumtype']].size
372 self.enumtype = f['enumtype']
375 e_hash[ename] = evalue
376 self.enum = IntFlag(name, e_hash)
378 class_types[name] = VPPEnumType
379 self.options = options
381 def __getattr__(self, name):
382 return self.enum[name]
387 # TODO: Remove post 20.01.
388 if sys.version[0] == '2':
389 __nonzero__ = __bool__
391 def pack(self, data, kwargs=None):
392 if data is None: # Default to zero if not specified
393 if self.options and 'default' in self.options:
394 data = self.options['default']
398 return types[self.enumtype].pack(data)
400 def unpack(self, data, offset=0, result=None, ntc=False):
401 x, size = types[self.enumtype].unpack(data, offset)
402 return self.enum(x), size
405 def _get_packer_with_options(f_type, options):
406 return VPPEnumType(f_type, types[f_type].msgdef, options=options)
409 return "VPPEnumType(name=%s, msgdef=%s, options=%s)" % (
410 self.name, self.msgdef, self.options
414 class VPPUnionType(Packer):
415 def __init__(self, name, msgdef):
421 self.packers = collections.OrderedDict()
422 for i, f in enumerate(msgdef):
423 if type(f) is dict and 'crc' in f:
427 if f_type not in types:
428 logger.debug('Unknown union type {}'.format(f_type))
429 raise VPPSerializerValueError(
430 'Unknown message type {}'.format(f_type))
431 fields.append(f_name)
432 size = types[f_type].size
433 self.packers[f_name] = types[f_type]
439 self.tuple = collections.namedtuple(name, fields, rename=True)
441 # Union of variable length?
442 def pack(self, data, kwargs=None):
444 return b'\x00' * self.size
446 for k, v in data.items():
447 logger.debug("Key: {} Value: {}".format(k, v))
448 b = self.packers[k].pack(v, kwargs)
450 r = bytearray(self.size)
454 def unpack(self, data, offset=0, result=None, ntc=False):
457 for k, p in self.packers.items():
458 x, size = p.unpack(data, offset, ntc=ntc)
462 return self.tuple._make(r), maxsize
465 return"VPPUnionType(name=%s, msgdef=%r)" % (self.name, self.msgdef)
468 class VPPTypeAlias(Packer):
469 def __init__(self, name, msgdef, options=None):
472 t = vpp_get_type(msgdef['type'])
474 raise ValueError('No such type: {}'.format(msgdef['type']))
475 if 'length' in msgdef:
476 if msgdef['length'] == 0:
478 if msgdef['type'] == 'u8':
479 self.packer = FixedList_u8(name, msgdef['type'],
481 self.size = self.packer.size
483 self.packer = FixedList(name, msgdef['type'], msgdef['length'])
489 self.toplevelconversion = False
490 self.options = options
492 def pack(self, data, kwargs=None):
493 if data and conversion_required(data, self.name):
495 return conversion_packer(data, self.name)
496 # Python 2 and 3 raises different exceptions from inet_pton
497 except(OSError, socket.error, TypeError):
499 if data is None: # Default to zero if not specified
500 if self.options and 'default' in self.options:
501 data = self.options['default']
505 return self.packer.pack(data, kwargs)
508 def _get_packer_with_options(f_type, options):
509 return VPPTypeAlias(f_type, types[f_type].msgdef, options=options)
511 def unpack(self, data, offset=0, result=None, ntc=False):
512 if ntc is False and self.name in vpp_format.conversion_unpacker_table:
513 # Disable type conversion for dependent types
515 self.toplevelconversion = True
516 t, size = self.packer.unpack(data, offset, result, ntc=ntc)
517 if self.toplevelconversion:
518 self.toplevelconversion = False
519 return conversion_unpacker(t, self.name), size
523 return "VPPTypeAlias(name=%s, msgdef=%s, options=%s)" % (
524 self.name, self.msgdef, self.options)
527 class VPPType(Packer):
528 # Set everything up to be able to pack / unpack
529 def __init__(self, name, msgdef):
535 self.field_by_name = {}
537 for i, f in enumerate(msgdef):
538 if type(f) is dict and 'crc' in f:
541 f_type, f_name = f[:2]
542 self.fields.append(f_name)
543 self.field_by_name[f_name] = None
544 self.fieldtypes.append(f_type)
545 if f_type not in types:
546 logger.debug('Unknown type {}'.format(f_type))
547 raise VPPSerializerValueError(
548 'Unknown message type {}'.format(f_type))
551 options = [x for x in f if type(x) is dict]
553 self.options = options[0]
557 if fieldlen == 3: # list
559 if list_elements == 0:
560 if f_type == 'string':
561 p = String(f_name, 0, self.options)
563 p = VLAList_legacy(f_name, f_type)
564 self.packers.append(p)
566 p = FixedList_u8(f_name, f_type, list_elements)
567 self.packers.append(p)
569 elif f_type == 'string':
570 p = String(f_name, list_elements, self.options)
571 self.packers.append(p)
574 p = FixedList(f_name, f_type, list_elements)
575 self.packers.append(p)
577 elif fieldlen == 4: # Variable length list
578 length_index = self.fields.index(f[3])
579 p = VLAList(f_name, f_type, f[3], length_index)
580 self.packers.append(p)
582 # default support for types that decay to basetype
583 if 'default' in self.options:
584 p = self.get_packer_with_options(f_type, self.options)
588 self.packers.append(p)
592 self.tuple = collections.namedtuple(name, self.fields, rename=True)
594 self.toplevelconversion = False
596 def pack(self, data, kwargs=None):
601 # Try one of the format functions
602 if data and conversion_required(data, self.name):
603 return conversion_packer(data, self.name)
605 for i, a in enumerate(self.fields):
606 if data and type(data) is not dict and a not in data:
607 raise VPPSerializerValueError(
608 "Invalid argument: {} expected {}.{}".
609 format(data, self.name, a))
611 # Defaulting to zero.
612 if not data or a not in data: # Default to 0
614 kwarg = None # No default for VLA
617 kwarg = kwargs[a] if a in kwargs else None
618 if isinstance(self.packers[i], VPPType):
619 b += self.packers[i].pack(arg, kwarg)
621 b += self.packers[i].pack(arg, kwargs)
625 def unpack(self, data, offset=0, result=None, ntc=False):
626 # Return a list of arguments
629 if ntc is False and self.name in vpp_format.conversion_unpacker_table:
630 # Disable type conversion for dependent types
632 self.toplevelconversion = True
634 for p in self.packers:
635 x, size = p.unpack(data, offset, result, ntc)
636 if type(x) is tuple and len(x) == 1:
641 t = self.tuple._make(result)
643 if self.toplevelconversion:
644 self.toplevelconversion = False
645 t = conversion_unpacker(t, self.name)
649 return "%s(name=%s, msgdef=%s)" % (
650 self.__class__.__name__, self.name, self.msgdef
654 class VPPMessage(VPPType):