- def __struct(self, t, n=None, e=-1, vl=None):
- """Create a packing structure for a message."""
- base_types = {'u8': 'B',
- 'u16': 'H',
- 'u32': 'I',
- 'i32': 'i',
- 'u64': 'Q',
- 'f64': 'd', }
-
- if t in base_types:
- if not vl:
- if e > 0 and t == 'u8':
- # Fixed byte array
- s = struct.Struct('>' + str(e) + 's')
- return s.size, s
- if e > 0:
- # Fixed array of base type
- s = struct.Struct('>' + base_types[t])
- return s.size, [e, s]
- elif e == 0:
- # Old style variable array
- s = struct.Struct('>' + base_types[t])
- return s.size, [-1, s]
- else:
- # Variable length array
- if t == 'u8':
- s = struct.Struct('>s')
- return s.size, [vl, s]
- else:
- s = struct.Struct('>' + base_types[t])
- return s.size, [vl, s]
-
- s = struct.Struct('>' + base_types[t])
- return s.size, s
-
- if t in self.messages:
- size = self.messages[t]['sizes'][0]
-
- # Return a list in case of array
- if e > 0 and not vl:
- return size, [e, lambda self, encode, buf, offset, args: (
- self.__struct_type(encode, self.messages[t], buf, offset,
- args))]
- if vl:
- return size, [vl, lambda self, encode, buf, offset, args: (
- self.__struct_type(encode, self.messages[t], buf, offset,
- args))]
- elif e == 0:
- # Old style VLA
- raise NotImplementedError(1,
- 'No support for compound types ' + t)
- return size, lambda self, encode, buf, offset, args: (
- self.__struct_type(encode, self.messages[t], buf, offset, args)
- )
-
- raise ValueError(1, 'Invalid message type: ' + t)
-
- def __struct_type(self, encode, msgdef, buf, offset, kwargs):
- """Get a message packer or unpacker."""
- if encode:
- return self.__struct_type_encode(msgdef, buf, offset, kwargs)
- else:
- return self.__struct_type_decode(msgdef, buf, offset)
-
- def __struct_type_encode(self, msgdef, buf, offset, kwargs):
- off = offset
- size = 0
-
- for k in kwargs:
- if k not in msgdef['args']:
- raise ValueError(1, 'Non existing argument [' + k + ']' +
- ' used in call to: ' +
- self.id_names[kwargs['_vl_msg_id']] + '()')
-
- for k, v in vpp_iterator(msgdef['args']):
- off += size
- if k in kwargs:
- if type(v) is list:
- if callable(v[1]):
- e = kwargs[v[0]] if v[0] in kwargs else v[0]
- if e != len(kwargs[k]):
- raise (ValueError(1,
- 'Input list length mismatch: '
- '%s (%s != %s)' %
- (k, e, len(kwargs[k]))))
- size = 0
- for i in range(e):
- size += v[1](self, True, buf, off + size,
- kwargs[k][i])
- else:
- if v[0] in kwargs:
- kwargslen = kwargs[v[0]]
- if kwargslen != len(kwargs[k]):
- raise ValueError(1,
- 'Input list length mismatch:'
- ' %s (%s != %s)' %
- (k, kwargslen,
- len(kwargs[k])))
- else:
- kwargslen = len(kwargs[k])
- if v[1].size == 1:
- buf[off:off + kwargslen] = bytearray(kwargs[k])
- size = kwargslen
- else:
- size = 0
- for i in kwargs[k]:
- v[1].pack_into(buf, off + size, i)
- size += v[1].size
- else:
- if callable(v):
- size = v(self, True, buf, off, kwargs[k])
- else:
- if type(kwargs[k]) is str and v.size < len(kwargs[k]):
- raise ValueError(1,
- 'Input list length mismatch: '
- '%s (%s < %s)' %
- (k, v.size, len(kwargs[k])))
- v.pack_into(buf, off, kwargs[k])
- size = v.size
- else:
- size = v.size if not type(v) is list else 0
-
- return off + size - offset
-
- def __getitem__(self, name):
- if name in self.messages:
- return self.messages[name]
- return None
-
- def get_size(self, sizes, kwargs):
- total_size = sizes[0]
- for e in sizes[1]:
- if e in kwargs and type(kwargs[e]) is list:
- total_size += len(kwargs[e]) * sizes[1][e]
- return total_size
-
- def encode(self, msgdef, kwargs):
- # Make suitably large buffer
- size = self.get_size(msgdef['sizes'], kwargs)
- buf = bytearray(size)
- offset = 0
- size = self.__struct_type(True, msgdef, buf, offset, kwargs)
- return buf[:offset + size]
-
- def decode(self, msgdef, buf):
- return self.__struct_type(False, msgdef, buf, 0, None)[1]
-
- def __struct_type_decode(self, msgdef, buf, offset):
- res = []
- off = offset
- size = 0
- for k, v in vpp_iterator(msgdef['args']):
- off += size
- if type(v) is list:
- lst = []
- if callable(v[1]): # compound type
- size = 0
- if v[0] in msgdef['args']: # vla
- e = res[v[2]]
- else: # fixed array
- e = v[0]
- res.append(lst)
- for i in range(e):
- (s, l) = v[1](self, False, buf, off + size, None)
- lst.append(l)
- size += s
- continue
- if v[1].size == 1:
- if type(v[0]) is int:
- size = len(buf) - off
- else:
- size = res[v[2]]
- res.append(buf[off:off + size])
- else:
- e = v[0] if type(v[0]) is int else res[v[2]]
- if e == -1:
- e = (len(buf) - off) / v[1].size
- lst = []
- res.append(lst)
- size = 0
- for i in range(e):
- lst.append(v[1].unpack_from(buf, off + size)[0])
- size += v[1].size
- else:
- if callable(v):
- size = 0
- (s, l) = v(self, False, buf, off, None)
- res.append(l)
- size += s
- else:
- res.append(v.unpack_from(buf, off)[0])
- size = v.size
-
- return off + size - offset, msgdef['return_tuple']._make(res)
-
- def ret_tup(self, name):
- if name in self.messages and 'return_tuple' in self.messages[name]:
- return self.messages[name]['return_tuple']
- return None