2 # Copyright (c) 2018 Cisco and/or its affiliates.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
16 from enum import IntFlag
22 from . import vpp_format
26 # Set log-level in application by doing e.g.:
27 # logger = logging.getLogger('vpp_serializer')
28 # logger.setLevel(logging.DEBUG)
30 logger = logging.getLogger('vpp_papi.serializer')
34 return type(d) is dict or type(d) is bytes
37 def conversion_required(data, field_type):
41 if type(data).__name__ in vpp_format.conversion_table[field_type]:
47 def conversion_packer(data, field_type):
48 t = type(data).__name__
49 return types[field_type].pack(vpp_format.
50 conversion_table[field_type][t](data))
53 def conversion_unpacker(data, field_type):
54 if field_type not in vpp_format.conversion_unpacker_table:
56 return vpp_format.conversion_unpacker_table[field_type](data)
62 def pack(self, data, kwargs):
63 raise NotImplementedError
65 def unpack(self, data, offset, result=None, ntc=False):
66 raise NotImplementedError
68 # override as appropriate in subclasses
70 def _get_packer_with_options(f_type, options):
73 def get_packer_with_options(self, f_type, options):
74 if options is not None:
76 c = types[f_type].__class__
77 return c._get_packer_with_options(f_type, options)
79 raise VPPSerializerValueError(
80 "Options not supported for {}{} ({})".
81 format(f_type, types[f_type].__class__,
85 class BaseTypes(Packer):
86 def __init__(self, type, elements=0, options=None):
88 self._elements = elements
89 base_types = {'u8': '>B',
102 if elements > 0 and (type == 'u8' or type == 'string'):
103 self.packer = struct.Struct('>%ss' % elements)
105 self.packer = struct.Struct(base_types[type])
106 self.size = self.packer.size
107 self.options = options
109 def pack(self, data, kwargs=None):
110 if data is None: # Default to zero if not specified
111 if self.options and 'default' in self.options:
112 data = self.options['default']
115 return self.packer.pack(data)
117 def unpack(self, data, offset, result=None, ntc=False):
118 return self.packer.unpack_from(data, offset)[0], self.packer.size
121 def _get_packer_with_options(f_type, options):
122 return BaseTypes(f_type, options=options)
125 return "BaseTypes(type=%s, elements=%s, options=%s)" % (self._type,
130 class String(Packer):
131 def __init__(self, name, num, options):
135 self.length_field_packer = BaseTypes('u32')
136 self.limit = options['limit'] if 'limit' in options else num
137 self.fixed = True if num else False
138 if self.fixed and not self.limit:
139 raise VPPSerializerValueError(
140 "Invalid combination for: {}, {} fixed:{} limit:{}".
141 format(name, options, self.fixed, self.limit))
143 def pack(self, list, kwargs=None):
146 return b"\x00" * self.limit
147 return self.length_field_packer.pack(0) + b""
148 if self.limit and len(list) > self.limit - 1:
149 raise VPPSerializerValueError(
150 "Invalid argument length for: {}, {} maximum {}".
151 format(list, len(list), self.limit - 1))
153 return list.encode('ascii').ljust(self.limit, b'\x00')
154 return self.length_field_packer.pack(len(list)) + list.encode('ascii')
156 def unpack(self, data, offset=0, result=None, ntc=False):
158 p = BaseTypes('u8', self.num)
159 s = p.unpack(data, offset)
160 s2 = s[0].split(b'\0', 1)[0]
161 return (s2.decode('ascii'), self.num)
163 length, length_field_size = self.length_field_packer.unpack(data,
167 p = BaseTypes('u8', length)
168 x, size = p.unpack(data, offset + length_field_size)
169 return (x.decode('ascii', errors='replace'), size + length_field_size)
172 types = {'u8': BaseTypes('u8'), 'i8': BaseTypes('i8'),
173 'u16': BaseTypes('u16'), 'i16': BaseTypes('i16'),
174 'u32': BaseTypes('u32'), 'i32': BaseTypes('i32'),
175 'u64': BaseTypes('u64'), 'i64': BaseTypes('i64'),
176 'f64': BaseTypes('f64'),
177 'bool': BaseTypes('bool'), 'string': String}
182 def vpp_get_type(name):
189 class VPPSerializerValueError(ValueError):
193 class FixedList_u8(Packer):
194 def __init__(self, name, field_type, num):
197 self.packer = BaseTypes(field_type, num)
198 self.size = self.packer.size
199 self.field_type = field_type
201 def pack(self, data, kwargs=None):
202 """Packs a fixed length bytestring. Left-pads with zeros
203 if input data is too short."""
205 return b'\x00' * self.size
207 if len(data) > self.num:
208 raise VPPSerializerValueError(
209 'Fixed list length error for "{}", got: {}'
211 .format(self.name, len(data), self.num))
214 return self.packer.pack(data)
216 raise VPPSerializerValueError(
217 'Packing failed for "{}" {}'
218 .format(self.name, kwargs))
220 def unpack(self, data, offset=0, result=None, ntc=False):
221 if len(data[offset:]) < self.num:
222 raise VPPSerializerValueError(
223 'Invalid array length for "{}" got {}'
225 .format(self.name, len(data[offset:]), self.num))
226 return self.packer.unpack(data, offset)
229 return "FixedList_u8(name=%s, field_type=%s, num=%s)" % (
230 self.name, self.field_type, self.num
234 class FixedList(Packer):
235 def __init__(self, name, field_type, num):
237 self.packer = types[field_type]
238 self.size = self.packer.size * num
240 self.field_type = field_type
242 def pack(self, list, kwargs):
243 if len(list) != self.num:
244 raise VPPSerializerValueError(
245 'Fixed list length error, got: {} expected: {}'
246 .format(len(list), self.num))
249 b += self.packer.pack(e)
252 def unpack(self, data, offset=0, result=None, ntc=False):
253 # Return a list of arguments
256 for e in range(self.num):
257 x, size = self.packer.unpack(data, offset, ntc=ntc)
264 return "FixedList(name=%s, field_type=%s, num=%s)" % (
265 self.name, self.field_type, self.num)
268 class VLAList(Packer):
269 def __init__(self, name, field_type, len_field_name, index):
271 self.field_type = field_type
273 self.packer = types[field_type]
274 self.size = self.packer.size
275 self.length_field = len_field_name
277 def pack(self, lst, kwargs=None):
280 if len(lst) != kwargs[self.length_field]:
281 raise VPPSerializerValueError(
282 'Variable length error, got: {} expected: {}'
283 .format(len(lst), kwargs[self.length_field]))
286 if self.packer.size == 1:
287 if isinstance(lst, list):
293 b += self.packer.pack(e)
296 def unpack(self, data, offset=0, result=None, ntc=False):
297 # Return a list of arguments
301 if self.packer.size == 1:
302 if result[self.index] == 0:
304 p = BaseTypes('u8', result[self.index])
305 return p.unpack(data, offset, ntc=ntc)
308 for e in range(result[self.index]):
309 x, size = self.packer.unpack(data, offset, ntc=ntc)
316 return "VLAList(name=%s, field_type=%s, " \
317 "len_field_name=%s, index=%s)" % (
318 self.name, self.field_type, self.length_field, self.index
322 class VLAList_legacy(Packer):
323 def __init__(self, name, field_type):
325 self.field_type = field_type
326 self.packer = types[field_type]
327 self.size = self.packer.size
329 def pack(self, list, kwargs=None):
330 if self.packer.size == 1:
335 b += self.packer.pack(e)
338 def unpack(self, data, offset=0, result=None, ntc=False):
340 # Return a list of arguments
341 if (len(data) - offset) % self.packer.size:
342 raise VPPSerializerValueError(
343 'Legacy Variable Length Array length mismatch.')
344 elements = int((len(data) - offset) / self.packer.size)
346 for e in range(elements):
347 x, size = self.packer.unpack(data, offset, ntc=ntc)
349 offset += self.packer.size
354 return "VLAList_legacy(name=%s, field_type=%s)" % (
355 self.name, self.field_type
359 # Will change to IntEnum after 21.04 release
360 class VPPEnumType(Packer):
361 output_class = IntFlag
363 def __init__(self, name, msgdef, options=None):
364 self.size = types['u32'].size
366 self.enumtype = 'u32'
370 if type(f) is dict and 'enumtype' in f:
371 if f['enumtype'] != 'u32':
372 self.size = types[f['enumtype']].size
373 self.enumtype = f['enumtype']
376 e_hash[ename] = evalue
377 self.enum = self.output_class(name, e_hash)
379 class_types[name] = self.__class__
380 self.options = options
382 def __getattr__(self, name):
383 return self.enum[name]
388 def pack(self, data, kwargs=None):
389 if data is None: # Default to zero if not specified
390 if self.options and 'default' in self.options:
391 data = self.options['default']
395 return types[self.enumtype].pack(data)
397 def unpack(self, data, offset=0, result=None, ntc=False):
398 x, size = types[self.enumtype].unpack(data, offset)
399 return self.enum(x), size
402 def _get_packer_with_options(cls, f_type, options):
403 return cls(f_type, types[f_type].msgdef, options=options)
406 return "%s(name=%s, msgdef=%s, options=%s)" % (
407 self.__class__.__name__, self.name, self.msgdef, self.options
411 class VPPEnumFlagType(VPPEnumType):
412 output_class = IntFlag
414 def __init__(self, name, msgdef, options=None):
415 super(VPPEnumFlagType, self).__init__(name, msgdef, options)
418 class VPPUnionType(Packer):
419 def __init__(self, name, msgdef):
425 self.packers = collections.OrderedDict()
426 for i, f in enumerate(msgdef):
427 if type(f) is dict and 'crc' in f:
431 if f_type not in types:
432 logger.debug('Unknown union type {}'.format(f_type))
433 raise VPPSerializerValueError(
434 'Unknown message type {}'.format(f_type))
435 fields.append(f_name)
436 size = types[f_type].size
437 self.packers[f_name] = types[f_type]
443 self.tuple = collections.namedtuple(name, fields, rename=True)
445 # Union of variable length?
446 def pack(self, data, kwargs=None):
448 return b'\x00' * self.size
450 for k, v in data.items():
451 logger.debug("Key: {} Value: {}".format(k, v))
452 b = self.packers[k].pack(v, kwargs)
454 r = bytearray(self.size)
458 def unpack(self, data, offset=0, result=None, ntc=False):
461 for k, p in self.packers.items():
462 x, size = p.unpack(data, offset, ntc=ntc)
466 return self.tuple._make(r), maxsize
469 return"VPPUnionType(name=%s, msgdef=%r)" % (self.name, self.msgdef)
472 class VPPTypeAlias(Packer):
473 def __init__(self, name, msgdef, options=None):
476 t = vpp_get_type(msgdef['type'])
478 raise ValueError('No such type: {}'.format(msgdef['type']))
479 if 'length' in msgdef:
480 if msgdef['length'] == 0:
482 if msgdef['type'] == 'u8':
483 self.packer = FixedList_u8(name, msgdef['type'],
485 self.size = self.packer.size
487 self.packer = FixedList(name, msgdef['type'], msgdef['length'])
493 self.toplevelconversion = False
494 self.options = options
496 def pack(self, data, kwargs=None):
497 if data and conversion_required(data, self.name):
499 return conversion_packer(data, self.name)
500 # Python 2 and 3 raises different exceptions from inet_pton
501 except(OSError, socket.error, TypeError):
503 if data is None: # Default to zero if not specified
504 if self.options and 'default' in self.options:
505 data = self.options['default']
509 return self.packer.pack(data, kwargs)
512 def _get_packer_with_options(f_type, options):
513 return VPPTypeAlias(f_type, types[f_type].msgdef, options=options)
515 def unpack(self, data, offset=0, result=None, ntc=False):
516 if ntc is False and self.name in vpp_format.conversion_unpacker_table:
517 # Disable type conversion for dependent types
519 self.toplevelconversion = True
520 t, size = self.packer.unpack(data, offset, result, ntc=ntc)
521 if self.toplevelconversion:
522 self.toplevelconversion = False
523 return conversion_unpacker(t, self.name), size
527 return "VPPTypeAlias(name=%s, msgdef=%s, options=%s)" % (
528 self.name, self.msgdef, self.options)
531 class VPPType(Packer):
532 # Set everything up to be able to pack / unpack
533 def __init__(self, name, msgdef):
539 self.field_by_name = {}
541 for i, f in enumerate(msgdef):
542 if type(f) is dict and 'crc' in f:
545 f_type, f_name = f[:2]
546 self.fields.append(f_name)
547 self.field_by_name[f_name] = None
548 self.fieldtypes.append(f_type)
549 if f_type not in types:
550 logger.debug('Unknown type {}'.format(f_type))
551 raise VPPSerializerValueError(
552 'Unknown message type {}'.format(f_type))
555 options = [x for x in f if type(x) is dict]
557 self.options = options[0]
561 if fieldlen == 3: # list
563 if list_elements == 0:
564 if f_type == 'string':
565 p = String(f_name, 0, self.options)
567 p = VLAList_legacy(f_name, f_type)
568 self.packers.append(p)
570 p = FixedList_u8(f_name, f_type, list_elements)
571 self.packers.append(p)
573 elif f_type == 'string':
574 p = String(f_name, list_elements, self.options)
575 self.packers.append(p)
578 p = FixedList(f_name, f_type, list_elements)
579 self.packers.append(p)
581 elif fieldlen == 4: # Variable length list
582 length_index = self.fields.index(f[3])
583 p = VLAList(f_name, f_type, f[3], length_index)
584 self.packers.append(p)
586 # default support for types that decay to basetype
587 if 'default' in self.options:
588 p = self.get_packer_with_options(f_type, self.options)
592 self.packers.append(p)
596 self.tuple = collections.namedtuple(name, self.fields, rename=True)
598 self.toplevelconversion = False
600 def pack(self, data, kwargs=None):
605 # Try one of the format functions
606 if data and conversion_required(data, self.name):
607 return conversion_packer(data, self.name)
609 for i, a in enumerate(self.fields):
610 if data and type(data) is not dict and a not in data:
611 raise VPPSerializerValueError(
612 "Invalid argument: {} expected {}.{}".
613 format(data, self.name, a))
615 # Defaulting to zero.
616 if not data or a not in data: # Default to 0
618 kwarg = None # No default for VLA
621 kwarg = kwargs[a] if a in kwargs else None
622 if isinstance(self.packers[i], VPPType):
623 b += self.packers[i].pack(arg, kwarg)
625 b += self.packers[i].pack(arg, kwargs)
629 def unpack(self, data, offset=0, result=None, ntc=False):
630 # Return a list of arguments
633 if ntc is False and self.name in vpp_format.conversion_unpacker_table:
634 # Disable type conversion for dependent types
636 self.toplevelconversion = True
638 for p in self.packers:
639 x, size = p.unpack(data, offset, result, ntc)
640 if type(x) is tuple and len(x) == 1:
645 t = self.tuple._make(result)
647 if self.toplevelconversion:
648 self.toplevelconversion = False
649 t = conversion_unpacker(t, self.name)
653 return "%s(name=%s, msgdef=%s)" % (
654 self.__class__.__name__, self.name, self.msgdef
658 class VPPMessage(VPPType):