2 # Copyright (c) 2018 Cisco and/or its affiliates.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
21 if sys.version_info <= (3, 4):
22 from aenum import IntEnum # noqa: F401
24 from enum import IntEnum # noqa: F401
26 if sys.version_info <= (3, 6):
27 from aenum import IntFlag # noqa: F401
30 from enum import IntFlag # noqa: F401
32 from . import vpp_format # noqa: E402
35 # Set log-level in application by doing e.g.:
36 # logger = logging.getLogger('vpp_serializer')
37 # logger.setLevel(logging.DEBUG)
39 logger = logging.getLogger(__name__)
41 if sys.version[0] == '2':
42 def check(d): type(d) is dict
44 def check(d): type(d) is dict or type(d) is bytes
47 def conversion_required(data, field_type):
51 if type(data).__name__ in vpp_format.conversion_table[field_type]:
57 def conversion_packer(data, field_type):
58 t = type(data).__name__
59 return types[field_type].pack(vpp_format.
60 conversion_table[field_type][t](data))
63 def conversion_unpacker(data, field_type):
64 if field_type not in vpp_format.conversion_unpacker_table:
66 return vpp_format.conversion_unpacker_table[field_type](data)
69 # TODO: post 20.01, remove inherit from object.
73 def pack(self, data, kwargs):
74 raise NotImplementedError
76 def unpack(self, data, offset, result=None, ntc=False):
77 raise NotImplementedError
79 # override as appropriate in subclasses
81 def _get_packer_with_options(f_type, options):
84 def get_packer_with_options(self, f_type, options):
85 if options is not None:
87 c = types[f_type].__class__
88 return c._get_packer_with_options(f_type, options)
90 raise VPPSerializerValueError(
91 "Options not supported for {}{} ({})".
92 format(f_type, types[f_type].__class__,
96 class BaseTypes(Packer):
97 def __init__(self, type, elements=0, options=None):
99 self._elements = elements
100 base_types = {'u8': '>B',
113 if elements > 0 and (type == 'u8' or type == 'string'):
114 self.packer = struct.Struct('>%ss' % elements)
116 self.packer = struct.Struct(base_types[type])
117 self.size = self.packer.size
118 self.options = options
120 def pack(self, data, kwargs=None):
121 if data is None: # Default to zero if not specified
122 if self.options and 'default' in self.options:
123 data = self.options['default']
126 return self.packer.pack(data)
128 def unpack(self, data, offset, result=None, ntc=False):
129 return self.packer.unpack_from(data, offset)[0], self.packer.size
132 def _get_packer_with_options(f_type, options):
133 return BaseTypes(f_type, options=options)
136 return "BaseTypes(type=%s, elements=%s, options=%s)" % (self._type,
141 class String(Packer):
142 def __init__(self, name, num, options):
146 self.length_field_packer = BaseTypes('u32')
147 self.limit = options['limit'] if 'limit' in options else num
148 self.fixed = True if num else False
149 if self.fixed and not self.limit:
150 raise VPPSerializerValueError(
151 "Invalid argument length for: {}, {} maximum {}".
152 format(list, len(list), self.limit))
154 def pack(self, list, kwargs=None):
157 return b"\x00" * self.limit
158 return self.length_field_packer.pack(0) + b""
159 if self.limit and len(list) > self.limit - 1:
160 raise VPPSerializerValueError(
161 "Invalid argument length for: {}, {} maximum {}".
162 format(list, len(list), self.limit - 1))
164 return list.encode('ascii').ljust(self.limit, b'\x00')
165 return self.length_field_packer.pack(len(list)) + list.encode('ascii')
167 def unpack(self, data, offset=0, result=None, ntc=False):
169 p = BaseTypes('u8', self.num)
170 s = p.unpack(data, offset)
171 s2 = s[0].split(b'\0', 1)[0]
172 return (s2.decode('ascii'), self.num)
174 length, length_field_size = self.length_field_packer.unpack(data,
178 p = BaseTypes('u8', length)
179 x, size = p.unpack(data, offset + length_field_size)
180 return (x.decode('ascii', errors='replace'), size + length_field_size)
183 types = {'u8': BaseTypes('u8'), 'i8': BaseTypes('i8'),
184 'u16': BaseTypes('u16'), 'i16': BaseTypes('i16'),
185 'u32': BaseTypes('u32'), 'i32': BaseTypes('i32'),
186 'u64': BaseTypes('u64'), 'i64': BaseTypes('i64'),
187 'f64': BaseTypes('f64'),
188 'bool': BaseTypes('bool'), 'string': String}
193 def vpp_get_type(name):
200 class VPPSerializerValueError(ValueError):
204 class FixedList_u8(Packer):
205 def __init__(self, name, field_type, num):
208 self.packer = BaseTypes(field_type, num)
209 self.size = self.packer.size
210 self.field_type = field_type
212 def pack(self, data, kwargs=None):
213 """Packs a fixed length bytestring. Left-pads with zeros
214 if input data is too short."""
216 return b'\x00' * self.size
218 if len(data) > self.num:
219 raise VPPSerializerValueError(
220 'Fixed list length error for "{}", got: {}'
222 .format(self.name, len(data), self.num))
225 return self.packer.pack(data)
227 raise VPPSerializerValueError(
228 'Packing failed for "{}" {}'
229 .format(self.name, kwargs))
231 def unpack(self, data, offset=0, result=None, ntc=False):
232 if len(data[offset:]) < self.num:
233 raise VPPSerializerValueError(
234 'Invalid array length for "{}" got {}'
236 .format(self.name, len(data[offset:]), self.num))
237 return self.packer.unpack(data, offset)
240 return "FixedList_u8(name=%s, field_type=%s, num=%s)" % (
241 self.name, self.field_type, self.num
245 class FixedList(Packer):
246 def __init__(self, name, field_type, num):
248 self.packer = types[field_type]
249 self.size = self.packer.size * num
251 self.field_type = field_type
253 def pack(self, list, kwargs):
254 if len(list) != self.num:
255 raise VPPSerializerValueError(
256 'Fixed list length error, got: {} expected: {}'
257 .format(len(list), self.num))
260 b += self.packer.pack(e)
263 def unpack(self, data, offset=0, result=None, ntc=False):
264 # Return a list of arguments
267 for e in range(self.num):
268 x, size = self.packer.unpack(data, offset, ntc=ntc)
275 return "FixedList(name=%s, field_type=%s, num=%s)" % (
276 self.name, self.field_type, self.num)
279 class VLAList(Packer):
280 def __init__(self, name, field_type, len_field_name, index):
282 self.field_type = field_type
284 self.packer = types[field_type]
285 self.size = self.packer.size
286 self.length_field = len_field_name
288 def pack(self, lst, kwargs=None):
291 if len(lst) != kwargs[self.length_field]:
292 raise VPPSerializerValueError(
293 'Variable length error, got: {} expected: {}'
294 .format(len(lst), kwargs[self.length_field]))
297 if self.packer.size == 1:
298 if isinstance(lst, list):
304 b += self.packer.pack(e)
307 def unpack(self, data, offset=0, result=None, ntc=False):
308 # Return a list of arguments
312 if self.packer.size == 1:
313 if result[self.index] == 0:
315 p = BaseTypes('u8', result[self.index])
316 return p.unpack(data, offset, ntc=ntc)
319 for e in range(result[self.index]):
320 x, size = self.packer.unpack(data, offset, ntc=ntc)
327 return "VLAList(name=%s, field_type=%s, " \
328 "len_field_name=%s, index=%s)" % (
329 self.name, self.field_type, self.length_field, self.index
333 class VLAList_legacy(Packer):
334 def __init__(self, name, field_type):
336 self.field_type = field_type
337 self.packer = types[field_type]
338 self.size = self.packer.size
340 def pack(self, list, kwargs=None):
341 if self.packer.size == 1:
346 b += self.packer.pack(e)
349 def unpack(self, data, offset=0, result=None, ntc=False):
351 # Return a list of arguments
352 if (len(data) - offset) % self.packer.size:
353 raise VPPSerializerValueError(
354 'Legacy Variable Length Array length mismatch.')
355 elements = int((len(data) - offset) / self.packer.size)
357 for e in range(elements):
358 x, size = self.packer.unpack(data, offset, ntc=ntc)
360 offset += self.packer.size
365 return "VLAList_legacy(name=%s, field_type=%s)" % (
366 self.name, self.field_type
370 class VPPEnumType(Packer):
371 def __init__(self, name, msgdef, options=None):
372 self.size = types['u32'].size
374 self.enumtype = 'u32'
378 if type(f) is dict and 'enumtype' in f:
379 if f['enumtype'] != 'u32':
380 self.size = types[f['enumtype']].size
381 self.enumtype = f['enumtype']
384 e_hash[ename] = evalue
385 self.enum = IntFlag(name, e_hash)
387 class_types[name] = VPPEnumType
388 self.options = options
390 def __getattr__(self, name):
391 return self.enum[name]
396 # TODO: Remove post 20.01.
397 if sys.version[0] == '2':
398 __nonzero__ = __bool__
400 def pack(self, data, kwargs=None):
401 if data is None: # Default to zero if not specified
402 if self.options and 'default' in self.options:
403 data = self.options['default']
407 return types[self.enumtype].pack(data)
409 def unpack(self, data, offset=0, result=None, ntc=False):
410 x, size = types[self.enumtype].unpack(data, offset)
411 return self.enum(x), size
414 def _get_packer_with_options(f_type, options):
415 return VPPEnumType(f_type, types[f_type].msgdef, options=options)
418 return "VPPEnumType(name=%s, msgdef=%s, options=%s)" % (
419 self.name, self.msgdef, self.options
423 class VPPUnionType(Packer):
424 def __init__(self, name, msgdef):
430 self.packers = collections.OrderedDict()
431 for i, f in enumerate(msgdef):
432 if type(f) is dict and 'crc' in f:
436 if f_type not in types:
437 logger.debug('Unknown union type {}'.format(f_type))
438 raise VPPSerializerValueError(
439 'Unknown message type {}'.format(f_type))
440 fields.append(f_name)
441 size = types[f_type].size
442 self.packers[f_name] = types[f_type]
448 self.tuple = collections.namedtuple(name, fields, rename=True)
450 # Union of variable length?
451 def pack(self, data, kwargs=None):
453 return b'\x00' * self.size
455 for k, v in data.items():
456 logger.debug("Key: {} Value: {}".format(k, v))
457 b = self.packers[k].pack(v, kwargs)
459 r = bytearray(self.size)
463 def unpack(self, data, offset=0, result=None, ntc=False):
466 for k, p in self.packers.items():
467 x, size = p.unpack(data, offset, ntc=ntc)
471 return self.tuple._make(r), maxsize
474 return"VPPUnionType(name=%s, msgdef=%r)" % (self.name, self.msgdef)
477 class VPPTypeAlias(Packer):
478 def __init__(self, name, msgdef, options=None):
481 t = vpp_get_type(msgdef['type'])
483 raise ValueError('No such type: {}'.format(msgdef['type']))
484 if 'length' in msgdef:
485 if msgdef['length'] == 0:
487 if msgdef['type'] == 'u8':
488 self.packer = FixedList_u8(name, msgdef['type'],
490 self.size = self.packer.size
492 self.packer = FixedList(name, msgdef['type'], msgdef['length'])
498 self.toplevelconversion = False
499 self.options = options
501 def pack(self, data, kwargs=None):
502 if data and conversion_required(data, self.name):
504 return conversion_packer(data, self.name)
505 # Python 2 and 3 raises different exceptions from inet_pton
506 except(OSError, socket.error, TypeError):
508 if data is None: # Default to zero if not specified
509 if self.options and 'default' in self.options:
510 data = self.options['default']
514 return self.packer.pack(data, kwargs)
517 def _get_packer_with_options(f_type, options):
518 return VPPTypeAlias(f_type, types[f_type].msgdef, options=options)
520 def unpack(self, data, offset=0, result=None, ntc=False):
521 if ntc is False and self.name in vpp_format.conversion_unpacker_table:
522 # Disable type conversion for dependent types
524 self.toplevelconversion = True
525 t, size = self.packer.unpack(data, offset, result, ntc=ntc)
526 if self.toplevelconversion:
527 self.toplevelconversion = False
528 return conversion_unpacker(t, self.name), size
532 return "VPPTypeAlias(name=%s, msgdef=%s, options=%s)" % (
533 self.name, self.msgdef, self.options)
536 class VPPType(Packer):
537 # Set everything up to be able to pack / unpack
538 def __init__(self, name, msgdef):
544 self.field_by_name = {}
546 for i, f in enumerate(msgdef):
547 if type(f) is dict and 'crc' in f:
550 f_type, f_name = f[:2]
551 self.fields.append(f_name)
552 self.field_by_name[f_name] = None
553 self.fieldtypes.append(f_type)
554 if f_type not in types:
555 logger.debug('Unknown type {}'.format(f_type))
556 raise VPPSerializerValueError(
557 'Unknown message type {}'.format(f_type))
560 options = [x for x in f if type(x) is dict]
562 self.options = options[0]
566 if fieldlen == 3: # list
568 if list_elements == 0:
569 if f_type == 'string':
570 p = String(f_name, 0, self.options)
572 p = VLAList_legacy(f_name, f_type)
573 self.packers.append(p)
575 p = FixedList_u8(f_name, f_type, list_elements)
576 self.packers.append(p)
578 elif f_type == 'string':
579 p = String(f_name, list_elements, self.options)
580 self.packers.append(p)
583 p = FixedList(f_name, f_type, list_elements)
584 self.packers.append(p)
586 elif fieldlen == 4: # Variable length list
587 length_index = self.fields.index(f[3])
588 p = VLAList(f_name, f_type, f[3], length_index)
589 self.packers.append(p)
591 # default support for types that decay to basetype
592 if 'default' in self.options:
593 p = self.get_packer_with_options(f_type, self.options)
597 self.packers.append(p)
601 self.tuple = collections.namedtuple(name, self.fields, rename=True)
603 self.toplevelconversion = False
605 def pack(self, data, kwargs=None):
610 # Try one of the format functions
611 if data and conversion_required(data, self.name):
612 return conversion_packer(data, self.name)
614 for i, a in enumerate(self.fields):
615 if data and type(data) is not dict and a not in data:
616 raise VPPSerializerValueError(
617 "Invalid argument: {} expected {}.{}".
618 format(data, self.name, a))
620 # Defaulting to zero.
621 if not data or a not in data: # Default to 0
623 kwarg = None # No default for VLA
626 kwarg = kwargs[a] if a in kwargs else None
627 if isinstance(self.packers[i], VPPType):
628 b += self.packers[i].pack(arg, kwarg)
630 b += self.packers[i].pack(arg, kwargs)
634 def unpack(self, data, offset=0, result=None, ntc=False):
635 # Return a list of arguments
638 if ntc is False and self.name in vpp_format.conversion_unpacker_table:
639 # Disable type conversion for dependent types
641 self.toplevelconversion = True
643 for p in self.packers:
644 x, size = p.unpack(data, offset, result, ntc)
645 if type(x) is tuple and len(x) == 1:
650 t = self.tuple._make(result)
652 if self.toplevelconversion:
653 self.toplevelconversion = False
654 t = conversion_unpacker(t, self.name)
658 return "%s(name=%s, msgdef=%s)" % (
659 self.__class__.__name__, self.name, self.msgdef
663 class VPPMessage(VPPType):