2 # Copyright (c) 2018 Cisco and/or its affiliates.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
16 from enum import IntFlag
22 from . import vpp_format
26 # Set log-level in application by doing e.g.:
27 # logger = logging.getLogger('vpp_serializer')
28 # logger.setLevel(logging.DEBUG)
30 logger = logging.getLogger("vpp_papi.serializer")
34 return type(d) is dict or type(d) is bytes
37 def conversion_required(data, field_type):
41 if type(data).__name__ in vpp_format.conversion_table[field_type]:
47 def conversion_packer(data, field_type):
48 t = type(data).__name__
49 return types[field_type].pack(vpp_format.conversion_table[field_type][t](data))
52 def conversion_unpacker(data, field_type):
53 if field_type not in vpp_format.conversion_unpacker_table:
55 return vpp_format.conversion_unpacker_table[field_type](data)
61 def pack(self, data, kwargs):
62 raise NotImplementedError
64 def unpack(self, data, offset, result=None, ntc=False):
65 raise NotImplementedError
67 # override as appropriate in subclasses
69 def _get_packer_with_options(f_type, options):
72 def get_packer_with_options(self, f_type, options):
73 if options is not None:
75 c = types[f_type].__class__
76 return c._get_packer_with_options(f_type, options)
78 raise VPPSerializerValueError(
79 "Options not supported for {}{} ({})".format(
80 f_type, types[f_type].__class__, options
85 class BaseTypes(Packer):
86 def __init__(self, type, elements=0, options=None):
88 self._elements = elements
104 if elements > 0 and (type == "u8" or type == "string"):
105 self.packer = struct.Struct(">%ss" % elements)
107 self.packer = struct.Struct(base_types[type])
108 self.size = self.packer.size
109 self.options = options
111 def pack(self, data, kwargs=None):
112 if data is None: # Default to zero if not specified
113 if self.options and "default" in self.options:
114 data = self.options["default"]
117 return self.packer.pack(data)
119 def unpack(self, data, offset, result=None, ntc=False):
120 return self.packer.unpack_from(data, offset)[0], self.packer.size
123 def _get_packer_with_options(f_type, options):
124 return BaseTypes(f_type, options=options)
127 return "BaseTypes(type=%s, elements=%s, options=%s)" % (
134 class String(Packer):
135 def __init__(self, name, num, options):
139 self.length_field_packer = BaseTypes("u32")
140 self.limit = options["limit"] if "limit" in options else num
141 self.fixed = True if num else False
142 if self.fixed and not self.limit:
143 raise VPPSerializerValueError(
144 "Invalid combination for: {}, {} fixed:{} limit:{}".format(
145 name, options, self.fixed, self.limit
149 def pack(self, list, kwargs=None):
152 return b"\x00" * self.limit
153 return self.length_field_packer.pack(0) + b""
154 if self.limit and len(list) > self.limit - 1:
155 raise VPPSerializerValueError(
156 "Invalid argument length for: {}, {} maximum {}".format(
157 list, len(list), self.limit - 1
161 return list.encode("ascii").ljust(self.limit, b"\x00")
162 return self.length_field_packer.pack(len(list)) + list.encode("ascii")
164 def unpack(self, data, offset=0, result=None, ntc=False):
166 p = BaseTypes("u8", self.num)
167 s = p.unpack(data, offset)
168 s2 = s[0].split(b"\0", 1)[0]
169 return (s2.decode("ascii"), self.num)
171 length, length_field_size = self.length_field_packer.unpack(data, offset)
174 p = BaseTypes("u8", length)
175 x, size = p.unpack(data, offset + length_field_size)
176 return (x.decode("ascii", errors="replace"), size + length_field_size)
180 "u8": BaseTypes("u8"),
181 "i8": BaseTypes("i8"),
182 "u16": BaseTypes("u16"),
183 "i16": BaseTypes("i16"),
184 "u32": BaseTypes("u32"),
185 "i32": BaseTypes("i32"),
186 "u64": BaseTypes("u64"),
187 "i64": BaseTypes("i64"),
188 "f64": BaseTypes("f64"),
189 "bool": BaseTypes("bool"),
196 def vpp_get_type(name):
203 class VPPSerializerValueError(ValueError):
207 class FixedList_u8(Packer):
208 def __init__(self, name, field_type, num):
211 self.packer = BaseTypes(field_type, num)
212 self.size = self.packer.size
213 self.field_type = field_type
215 def pack(self, data, kwargs=None):
216 """Packs a fixed length bytestring. Left-pads with zeros
217 if input data is too short."""
219 return b"\x00" * self.size
221 if len(data) > self.num:
222 raise VPPSerializerValueError(
223 'Fixed list length error for "{}", got: {}'
224 " expected: {}".format(self.name, len(data), self.num)
228 return self.packer.pack(data)
230 raise VPPSerializerValueError(
231 'Packing failed for "{}" {}'.format(self.name, kwargs)
234 def unpack(self, data, offset=0, result=None, ntc=False):
235 if len(data[offset:]) < self.num:
236 raise VPPSerializerValueError(
237 'Invalid array length for "{}" got {}'
238 " expected {}".format(self.name, len(data[offset:]), self.num)
240 return self.packer.unpack(data, offset)
243 return "FixedList_u8(name=%s, field_type=%s, num=%s)" % (
250 class FixedList(Packer):
251 def __init__(self, name, field_type, num):
253 self.packer = types[field_type]
254 self.size = self.packer.size * num
256 self.field_type = field_type
258 def pack(self, list, kwargs):
259 if len(list) != self.num:
260 raise VPPSerializerValueError(
261 "Fixed list length error, got: {} expected: {}".format(
267 b += self.packer.pack(e)
270 def unpack(self, data, offset=0, result=None, ntc=False):
271 # Return a list of arguments
274 for e in range(self.num):
275 x, size = self.packer.unpack(data, offset, ntc=ntc)
282 return "FixedList(name=%s, field_type=%s, num=%s)" % (
289 class VLAList(Packer):
290 def __init__(self, name, field_type, len_field_name, index):
292 self.field_type = field_type
294 self.packer = types[field_type]
295 self.size = self.packer.size
296 self.length_field = len_field_name
298 def pack(self, lst, kwargs=None):
301 if len(lst) != kwargs[self.length_field]:
302 raise VPPSerializerValueError(
303 "Variable length error, got: {} expected: {}".format(
304 len(lst), kwargs[self.length_field]
309 if self.packer.size == 1:
310 if isinstance(lst, list):
316 b += self.packer.pack(e)
319 def unpack(self, data, offset=0, result=None, ntc=False):
320 # Return a list of arguments
324 if self.packer.size == 1:
325 if result[self.index] == 0:
327 p = BaseTypes("u8", result[self.index])
328 return p.unpack(data, offset, ntc=ntc)
331 for e in range(result[self.index]):
332 x, size = self.packer.unpack(data, offset, ntc=ntc)
339 return "VLAList(name=%s, field_type=%s, " "len_field_name=%s, index=%s)" % (
347 class VLAList_legacy(Packer):
348 def __init__(self, name, field_type):
350 self.field_type = field_type
351 self.packer = types[field_type]
352 self.size = self.packer.size
354 def pack(self, list, kwargs=None):
355 if self.packer.size == 1:
360 b += self.packer.pack(e)
363 def unpack(self, data, offset=0, result=None, ntc=False):
365 # Return a list of arguments
366 if (len(data) - offset) % self.packer.size:
367 raise VPPSerializerValueError(
368 "Legacy Variable Length Array length mismatch."
370 elements = int((len(data) - offset) / self.packer.size)
372 for e in range(elements):
373 x, size = self.packer.unpack(data, offset, ntc=ntc)
375 offset += self.packer.size
380 return "VLAList_legacy(name=%s, field_type=%s)" % (self.name, self.field_type)
383 # Will change to IntEnum after 21.04 release
384 class VPPEnumType(Packer):
385 output_class = IntFlag
387 def __init__(self, name, msgdef, options=None):
388 self.size = types["u32"].size
390 self.enumtype = "u32"
394 if type(f) is dict and "enumtype" in f:
395 if f["enumtype"] != "u32":
396 self.size = types[f["enumtype"]].size
397 self.enumtype = f["enumtype"]
400 e_hash[ename] = evalue
401 self.enum = self.output_class(name, e_hash)
403 class_types[name] = self.__class__
404 self.options = options
406 def __getattr__(self, name):
407 return self.enum[name]
412 def pack(self, data, kwargs=None):
413 if data is None: # Default to zero if not specified
414 if self.options and "default" in self.options:
415 data = self.options["default"]
419 return types[self.enumtype].pack(data)
421 def unpack(self, data, offset=0, result=None, ntc=False):
422 x, size = types[self.enumtype].unpack(data, offset)
423 return self.enum(x), size
426 def _get_packer_with_options(cls, f_type, options):
427 return cls(f_type, types[f_type].msgdef, options=options)
430 return "%s(name=%s, msgdef=%s, options=%s)" % (
431 self.__class__.__name__,
438 class VPPEnumFlagType(VPPEnumType):
439 output_class = IntFlag
441 def __init__(self, name, msgdef, options=None):
442 super(VPPEnumFlagType, self).__init__(name, msgdef, options)
445 class VPPUnionType(Packer):
446 def __init__(self, name, msgdef):
452 self.packers = collections.OrderedDict()
453 for i, f in enumerate(msgdef):
454 if type(f) is dict and "crc" in f:
458 if f_type not in types:
459 logger.debug("Unknown union type {}".format(f_type))
460 raise VPPSerializerValueError("Unknown message type {}".format(f_type))
461 fields.append(f_name)
462 size = types[f_type].size
463 self.packers[f_name] = types[f_type]
469 self.tuple = collections.namedtuple(name, fields, rename=True)
471 # Union of variable length?
472 def pack(self, data, kwargs=None):
474 return b"\x00" * self.size
476 for k, v in data.items():
477 logger.debug("Key: {} Value: {}".format(k, v))
478 b = self.packers[k].pack(v, kwargs)
480 r = bytearray(self.size)
484 def unpack(self, data, offset=0, result=None, ntc=False):
487 for k, p in self.packers.items():
488 x, size = p.unpack(data, offset, ntc=ntc)
492 return self.tuple._make(r), maxsize
495 return "VPPUnionType(name=%s, msgdef=%r)" % (self.name, self.msgdef)
498 class VPPTypeAlias(Packer):
499 def __init__(self, name, msgdef, options=None):
502 t = vpp_get_type(msgdef["type"])
504 raise ValueError("No such type: {}".format(msgdef["type"]))
505 if "length" in msgdef:
506 if msgdef["length"] == 0:
508 if msgdef["type"] == "u8":
509 self.packer = FixedList_u8(name, msgdef["type"], msgdef["length"])
510 self.size = self.packer.size
512 self.packer = FixedList(name, msgdef["type"], msgdef["length"])
518 self.toplevelconversion = False
519 self.options = options
521 def pack(self, data, kwargs=None):
522 if data and conversion_required(data, self.name):
524 return conversion_packer(data, self.name)
525 # Python 2 and 3 raises different exceptions from inet_pton
526 except (OSError, socket.error, TypeError):
528 if data is None: # Default to zero if not specified
529 if self.options and "default" in self.options:
530 data = self.options["default"]
534 return self.packer.pack(data, kwargs)
537 def _get_packer_with_options(f_type, options):
538 return VPPTypeAlias(f_type, types[f_type].msgdef, options=options)
540 def unpack(self, data, offset=0, result=None, ntc=False):
541 if ntc is False and self.name in vpp_format.conversion_unpacker_table:
542 # Disable type conversion for dependent types
544 self.toplevelconversion = True
545 t, size = self.packer.unpack(data, offset, result, ntc=ntc)
546 if self.toplevelconversion:
547 self.toplevelconversion = False
548 return conversion_unpacker(t, self.name), size
552 return "VPPTypeAlias(name=%s, msgdef=%s, options=%s)" % (
559 class VPPType(Packer):
560 # Set everything up to be able to pack / unpack
561 def __init__(self, name, msgdef):
567 self.field_by_name = {}
569 for i, f in enumerate(msgdef):
570 if type(f) is dict and "crc" in f:
573 f_type, f_name = f[:2]
574 self.fields.append(f_name)
575 self.field_by_name[f_name] = None
576 self.fieldtypes.append(f_type)
577 if f_type not in types:
578 logger.debug("Unknown type {}".format(f_type))
579 raise VPPSerializerValueError("Unknown message type {}".format(f_type))
582 options = [x for x in f if type(x) is dict]
584 self.options = options[0]
588 if fieldlen == 3: # list
590 if list_elements == 0:
591 if f_type == "string":
592 p = String(f_name, 0, self.options)
594 p = VLAList_legacy(f_name, f_type)
595 self.packers.append(p)
597 p = FixedList_u8(f_name, f_type, list_elements)
598 self.packers.append(p)
600 elif f_type == "string":
601 p = String(f_name, list_elements, self.options)
602 self.packers.append(p)
605 p = FixedList(f_name, f_type, list_elements)
606 self.packers.append(p)
608 elif fieldlen == 4: # Variable length list
609 length_index = self.fields.index(f[3])
610 p = VLAList(f_name, f_type, f[3], length_index)
611 self.packers.append(p)
613 # default support for types that decay to basetype
614 if "default" in self.options:
615 p = self.get_packer_with_options(f_type, self.options)
619 self.packers.append(p)
623 self.tuple = collections.namedtuple(name, self.fields, rename=True)
625 self.toplevelconversion = False
627 def pack(self, data, kwargs=None):
632 # Try one of the format functions
633 if data and conversion_required(data, self.name):
634 return conversion_packer(data, self.name)
636 for i, a in enumerate(self.fields):
637 if data and type(data) is not dict and a not in data:
638 raise VPPSerializerValueError(
639 "Invalid argument: {} expected {}.{}".format(data, self.name, a)
642 # Defaulting to zero.
643 if not data or a not in data: # Default to 0
645 kwarg = None # No default for VLA
648 kwarg = kwargs[a] if a in kwargs else None
649 if isinstance(self.packers[i], VPPType):
650 b += self.packers[i].pack(arg, kwarg)
652 b += self.packers[i].pack(arg, kwargs)
656 def unpack(self, data, offset=0, result=None, ntc=False):
657 # Return a list of arguments
660 if ntc is False and self.name in vpp_format.conversion_unpacker_table:
661 # Disable type conversion for dependent types
663 self.toplevelconversion = True
665 for p in self.packers:
666 x, size = p.unpack(data, offset, result, ntc)
667 if type(x) is tuple and len(x) == 1:
672 t = self.tuple._make(result)
674 if self.toplevelconversion:
675 self.toplevelconversion = False
676 t = conversion_unpacker(t, self.name)
680 return "%s(name=%s, msgdef=%s)" % (
681 self.__class__.__name__,
687 class VPPMessage(VPPType):