+class String(Packer):
+ def __init__(self, name, num, options):
+ self.name = name
+ self.num = num
+ self.size = 1
+ self.length_field_packer = BaseTypes("u32")
+ self.limit = options["limit"] if "limit" in options else num
+ self.fixed = True if num else False
+ if self.fixed and not self.limit:
+ raise VPPSerializerValueError(
+ "Invalid combination for: {}, {} fixed:{} limit:{}".format(
+ name, options, self.fixed, self.limit
+ )
+ )
+
+ def pack(self, list, kwargs=None):
+ if not list:
+ if self.fixed:
+ return b"\x00" * self.limit
+ return self.length_field_packer.pack(0) + b""
+ if self.limit and len(list) > self.limit - 1:
+ raise VPPSerializerValueError(
+ "Invalid argument length for: {}, {} maximum {}".format(
+ list, len(list), self.limit - 1
+ )
+ )
+ if self.fixed:
+ return list.encode("ascii").ljust(self.limit, b"\x00")
+ return self.length_field_packer.pack(len(list)) + list.encode("ascii")
+
+ def unpack(self, data, offset=0, result=None, ntc=False):
+ if self.fixed:
+ p = BaseTypes("u8", self.num)
+ s = p.unpack(data, offset)
+ s2 = s[0].split(b"\0", 1)[0]
+ return (s2.decode("ascii"), self.num)
+
+ length, length_field_size = self.length_field_packer.unpack(data, offset)
+ if length == 0:
+ return "", 0
+ p = BaseTypes("u8", length)
+ x, size = p.unpack(data, offset + length_field_size)
+ return (x.decode("ascii", errors="replace"), size + length_field_size)
+
+
+types = {
+ "u8": BaseTypes("u8"),
+ "i8": BaseTypes("i8"),
+ "u16": BaseTypes("u16"),
+ "i16": BaseTypes("i16"),
+ "u32": BaseTypes("u32"),
+ "i32": BaseTypes("i32"),
+ "u64": BaseTypes("u64"),
+ "i64": BaseTypes("i64"),
+ "f64": BaseTypes("f64"),
+ "bool": BaseTypes("bool"),
+ "string": String,
+}
+
+class_types = {}
+
+
+def vpp_get_type(name):
+ try:
+ return types[name]
+ except KeyError:
+ return None
+
+
+class VPPSerializerValueError(ValueError):
+ pass
+
+
+class FixedList_u8(Packer):