2 # Copyright (c) 2018 Cisco and/or its affiliates.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
16 from enum import IntFlag
22 from . import vpp_format
26 # Set log-level in application by doing e.g.:
27 # logger = logging.getLogger('vpp_serializer')
28 # logger.setLevel(logging.DEBUG)
30 logger = logging.getLogger("vpp_papi.serializer")
34 return type(d) is dict or type(d) is bytes
37 def conversion_required(data, field_type):
41 if type(data).__name__ in vpp_format.conversion_table[field_type]:
47 def conversion_packer(data, field_type):
48 t = type(data).__name__
49 return types[field_type].pack(vpp_format.conversion_table[field_type][t](data))
52 def conversion_unpacker(data, field_type):
53 if field_type not in vpp_format.conversion_unpacker_table:
55 return vpp_format.conversion_unpacker_table[field_type](data)
61 def pack(self, data, kwargs):
62 raise NotImplementedError
64 def unpack(self, data, offset, result=None, ntc=False):
65 raise NotImplementedError
67 # override as appropriate in subclasses
69 def _get_packer_with_options(f_type, options):
72 def get_packer_with_options(self, f_type, options):
73 if options is not None:
75 c = types[f_type].__class__
76 return c._get_packer_with_options(f_type, options)
78 raise VPPSerializerValueError(
79 "Options not supported for {}{} ({})".format(
80 f_type, types[f_type].__class__, options
85 class BaseTypes(Packer):
86 def __init__(self, type, elements=0, options=None):
88 self._elements = elements
104 if elements > 0 and (type == "u8" or type == "string"):
105 self.packer = struct.Struct(">%ss" % elements)
107 self.packer = struct.Struct(base_types[type])
108 self.size = self.packer.size
109 self.options = options
111 def pack(self, data, kwargs=None):
112 if data is None: # Default to zero if not specified
113 if self.options and "default" in self.options:
114 data = self.options["default"]
117 return self.packer.pack(data)
119 def unpack(self, data, offset, result=None, ntc=False):
120 return self.packer.unpack_from(data, offset)[0], self.packer.size
123 def _get_packer_with_options(f_type, options):
124 return BaseTypes(f_type, options=options)
127 return "BaseTypes(type=%s, elements=%s, options=%s)" % (
134 class String(Packer):
135 def __init__(self, name, num, options):
139 self.length_field_packer = BaseTypes("u32")
140 self.limit = options["limit"] if "limit" in options else num
141 self.fixed = True if num else False
142 if self.fixed and not self.limit:
143 raise VPPSerializerValueError(
144 "Invalid combination for: {}, {} fixed:{} limit:{}".format(
145 name, options, self.fixed, self.limit
149 def pack(self, list, kwargs=None):
152 return b"\x00" * self.limit
153 return self.length_field_packer.pack(0) + b""
154 if self.limit and len(list) > self.limit - 1:
155 raise VPPSerializerValueError(
156 "Invalid argument length for: {}, {} maximum {}".format(
157 list, len(list), self.limit - 1
161 return list.encode("ascii").ljust(self.limit, b"\x00")
162 return self.length_field_packer.pack(len(list)) + list.encode("ascii")
164 def unpack(self, data, offset=0, result=None, ntc=False):
166 p = BaseTypes("u8", self.num)
167 s = p.unpack(data, offset)
168 s2 = s[0].split(b"\0", 1)[0]
169 return (s2.decode("ascii"), self.num)
171 length, length_field_size = self.length_field_packer.unpack(data, offset)
174 p = BaseTypes("u8", length)
175 x, size = p.unpack(data, offset + length_field_size)
176 return (x.decode("ascii", errors="replace"), size + length_field_size)
180 "u8": BaseTypes("u8"),
181 "i8": BaseTypes("i8"),
182 "u16": BaseTypes("u16"),
183 "i16": BaseTypes("i16"),
184 "u32": BaseTypes("u32"),
185 "i32": BaseTypes("i32"),
186 "u64": BaseTypes("u64"),
187 "i64": BaseTypes("i64"),
188 "f64": BaseTypes("f64"),
189 "bool": BaseTypes("bool"),
196 def vpp_get_type(name):
203 class VPPSerializerValueError(ValueError):
207 class FixedList_u8(Packer):
208 def __init__(self, name, field_type, num):
211 self.packer = BaseTypes(field_type, num)
212 self.size = self.packer.size
213 self.field_type = field_type
215 def pack(self, data, kwargs=None):
216 """Packs a fixed length bytestring. Left-pads with zeros
217 if input data is too short."""
219 return b"\x00" * self.size
221 if len(data) > self.num:
222 raise VPPSerializerValueError(
223 'Fixed list length error for "{}", got: {}'
224 " expected: {}".format(self.name, len(data), self.num)
228 return self.packer.pack(data)
230 raise VPPSerializerValueError(
231 'Packing failed for "{}" {}'.format(self.name, kwargs)
234 def unpack(self, data, offset=0, result=None, ntc=False):
235 if len(data[offset:]) < self.num:
236 raise VPPSerializerValueError(
237 'Invalid array length for "{}" got {}'
238 " expected {}".format(self.name, len(data[offset:]), self.num)
240 return self.packer.unpack(data, offset)
243 return "FixedList_u8(name=%s, field_type=%s, num=%s)" % (
250 class FixedList(Packer):
251 def __init__(self, name, field_type, num):
253 self.packer = types[field_type]
254 self.size = self.packer.size * num
256 self.field_type = field_type
258 def pack(self, list, kwargs):
259 if len(list) != self.num:
260 raise VPPSerializerValueError(
261 "Fixed list length error, got: {} expected: {}".format(
267 b += self.packer.pack(e)
270 def unpack(self, data, offset=0, result=None, ntc=False):
271 # Return a list of arguments
274 for e in range(self.num):
275 x, size = self.packer.unpack(data, offset, ntc=ntc)
282 return "FixedList(name=%s, field_type=%s, num=%s)" % (
289 class VLAList(Packer):
290 def __init__(self, name, field_type, len_field_name, index):
292 self.field_type = field_type
294 self.packer = types[field_type]
295 self.size = self.packer.size
296 self.length_field = len_field_name
298 def pack(self, lst, kwargs=None):
301 if len(lst) != kwargs[self.length_field]:
302 raise VPPSerializerValueError(
303 "Variable length error, got: {} expected: {}".format(
304 len(lst), kwargs[self.length_field]
308 if self.packer.size == 1 and self.field_type == "u8":
309 if isinstance(lst, list):
315 b += self.packer.pack(e)
318 def unpack(self, data, offset=0, result=None, ntc=False):
319 # Return a list of arguments
323 if self.packer.size == 1 and self.field_type == "u8":
324 if result[self.index] == 0:
326 p = BaseTypes("u8", result[self.index])
327 return p.unpack(data, offset, ntc=ntc)
330 for e in range(result[self.index]):
331 x, size = self.packer.unpack(data, offset, ntc=ntc)
338 return "VLAList(name=%s, field_type=%s, " "len_field_name=%s, index=%s)" % (
346 class VLAList_legacy(Packer):
347 def __init__(self, name, field_type):
349 self.field_type = field_type
350 self.packer = types[field_type]
351 self.size = self.packer.size
353 def pack(self, list, kwargs=None):
354 if self.packer.size == 1:
359 b += self.packer.pack(e)
362 def unpack(self, data, offset=0, result=None, ntc=False):
364 # Return a list of arguments
365 if (len(data) - offset) % self.packer.size:
366 raise VPPSerializerValueError(
367 "Legacy Variable Length Array length mismatch."
369 elements = int((len(data) - offset) / self.packer.size)
371 for e in range(elements):
372 x, size = self.packer.unpack(data, offset, ntc=ntc)
374 offset += self.packer.size
379 return "VLAList_legacy(name=%s, field_type=%s)" % (self.name, self.field_type)
382 # Will change to IntEnum after 21.04 release
383 class VPPEnumType(Packer):
384 output_class = IntFlag
386 def __init__(self, name, msgdef, options=None):
387 self.size = types["u32"].size
389 self.enumtype = "u32"
393 if type(f) is dict and "enumtype" in f:
394 if f["enumtype"] != "u32":
395 self.size = types[f["enumtype"]].size
396 self.enumtype = f["enumtype"]
399 e_hash[ename] = evalue
400 self.enum = self.output_class(name, e_hash)
402 class_types[name] = self.__class__
403 self.options = options
405 def __getattr__(self, name):
406 return self.enum[name]
411 def pack(self, data, kwargs=None):
412 if data is None: # Default to zero if not specified
413 if self.options and "default" in self.options:
414 data = self.options["default"]
418 return types[self.enumtype].pack(data)
420 def unpack(self, data, offset=0, result=None, ntc=False):
421 x, size = types[self.enumtype].unpack(data, offset)
422 return self.enum(x), size
425 def _get_packer_with_options(cls, f_type, options):
426 return cls(f_type, types[f_type].msgdef, options=options)
429 return "%s(name=%s, msgdef=%s, options=%s)" % (
430 self.__class__.__name__,
437 class VPPEnumFlagType(VPPEnumType):
438 output_class = IntFlag
440 def __init__(self, name, msgdef, options=None):
441 super(VPPEnumFlagType, self).__init__(name, msgdef, options)
444 class VPPUnionType(Packer):
445 def __init__(self, name, msgdef):
451 self.packers = collections.OrderedDict()
452 for i, f in enumerate(msgdef):
453 if type(f) is dict and "crc" in f:
457 if f_type not in types:
458 logger.debug("Unknown union type {}".format(f_type))
459 raise VPPSerializerValueError("Unknown message type {}".format(f_type))
460 fields.append(f_name)
461 size = types[f_type].size
462 self.packers[f_name] = types[f_type]
468 self.tuple = collections.namedtuple(name, fields, rename=True)
470 # Union of variable length?
471 def pack(self, data, kwargs=None):
473 return b"\x00" * self.size
475 for k, v in data.items():
476 logger.debug("Key: {} Value: {}".format(k, v))
477 b = self.packers[k].pack(v, kwargs)
479 r = bytearray(self.size)
483 def unpack(self, data, offset=0, result=None, ntc=False):
486 for k, p in self.packers.items():
487 x, size = p.unpack(data, offset, ntc=ntc)
491 return self.tuple._make(r), maxsize
494 return "VPPUnionType(name=%s, msgdef=%r)" % (self.name, self.msgdef)
497 class VPPTypeAlias(Packer):
498 def __init__(self, name, msgdef, options=None):
501 t = vpp_get_type(msgdef["type"])
503 raise ValueError("No such type: {}".format(msgdef["type"]))
504 if "length" in msgdef:
505 if msgdef["length"] == 0:
507 if msgdef["type"] == "u8":
508 self.packer = FixedList_u8(name, msgdef["type"], msgdef["length"])
509 self.size = self.packer.size
511 self.packer = FixedList(name, msgdef["type"], msgdef["length"])
517 self.toplevelconversion = False
518 self.options = options
520 def pack(self, data, kwargs=None):
521 if data and conversion_required(data, self.name):
523 return conversion_packer(data, self.name)
524 # Python 2 and 3 raises different exceptions from inet_pton
525 except (OSError, socket.error, TypeError):
527 if data is None: # Default to zero if not specified
528 if self.options and "default" in self.options:
529 data = self.options["default"]
533 return self.packer.pack(data, kwargs)
536 def _get_packer_with_options(f_type, options):
537 return VPPTypeAlias(f_type, types[f_type].msgdef, options=options)
539 def unpack(self, data, offset=0, result=None, ntc=False):
540 if ntc is False and self.name in vpp_format.conversion_unpacker_table:
541 # Disable type conversion for dependent types
543 self.toplevelconversion = True
544 t, size = self.packer.unpack(data, offset, result, ntc=ntc)
545 if self.toplevelconversion:
546 self.toplevelconversion = False
547 return conversion_unpacker(t, self.name), size
551 return "VPPTypeAlias(name=%s, msgdef=%s, options=%s)" % (
558 class VPPType(Packer):
559 # Set everything up to be able to pack / unpack
560 def __init__(self, name, msgdef):
566 self.field_by_name = {}
568 for i, f in enumerate(msgdef):
569 if type(f) is dict and "crc" in f:
572 f_type, f_name = f[:2]
573 self.fields.append(f_name)
574 self.field_by_name[f_name] = None
575 self.fieldtypes.append(f_type)
576 if f_type not in types:
577 logger.debug("Unknown type {}".format(f_type))
578 raise VPPSerializerValueError("Unknown message type {}".format(f_type))
581 options = [x for x in f if type(x) is dict]
583 self.options = options[0]
587 if fieldlen == 3: # list
589 if list_elements == 0:
590 if f_type == "string":
591 p = String(f_name, 0, self.options)
593 p = VLAList_legacy(f_name, f_type)
594 self.packers.append(p)
596 p = FixedList_u8(f_name, f_type, list_elements)
597 self.packers.append(p)
599 elif f_type == "string":
600 p = String(f_name, list_elements, self.options)
601 self.packers.append(p)
604 p = FixedList(f_name, f_type, list_elements)
605 self.packers.append(p)
607 elif fieldlen == 4: # Variable length list
608 length_index = self.fields.index(f[3])
609 p = VLAList(f_name, f_type, f[3], length_index)
610 self.packers.append(p)
612 # default support for types that decay to basetype
613 if "default" in self.options:
614 p = self.get_packer_with_options(f_type, self.options)
618 self.packers.append(p)
621 self.tuple = collections.namedtuple(name, self.fields, rename=True)
623 self.toplevelconversion = False
625 def pack(self, data, kwargs=None):
630 # Try one of the format functions
631 if data and conversion_required(data, self.name):
632 return conversion_packer(data, self.name)
634 for i, a in enumerate(self.fields):
635 if data and type(data) is not dict and a not in data:
636 raise VPPSerializerValueError(
637 "Invalid argument: {} expected {}.{}".format(data, self.name, a)
640 # Defaulting to zero.
641 if not data or a not in data: # Default to 0
643 kwarg = None # No default for VLA
646 kwarg = kwargs[a] if a in kwargs else None
647 if isinstance(self.packers[i], VPPType):
648 b += self.packers[i].pack(arg, kwarg)
650 b += self.packers[i].pack(arg, kwargs)
654 def unpack(self, data, offset=0, result=None, ntc=False):
655 # Return a list of arguments
658 if ntc is False and self.name in vpp_format.conversion_unpacker_table:
659 # Disable type conversion for dependent types
661 self.toplevelconversion = True
663 for p in self.packers:
664 x, size = p.unpack(data, offset, result, ntc)
665 if type(x) is tuple and len(x) == 1:
670 t = self.tuple._make(result)
672 if self.toplevelconversion:
673 self.toplevelconversion = False
674 t = conversion_unpacker(t, self.name)
678 return "%s(name=%s, msgdef=%s)" % (
679 self.__class__.__name__,
685 class VPPMessage(VPPType):