2 # Copyright (c) 2018 Cisco and/or its affiliates.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
18 from enum import IntEnum
20 from .vpp_format import VPPFormat
23 # Set log-level in application by doing e.g.:
24 # logger = logging.getLogger('vpp_serializer')
25 # logger.setLevel(logging.DEBUG)
27 logger = logging.getLogger(__name__)
30 class BaseTypes(object):
31 def __init__(self, type, elements=0):
32 base_types = {'u8': '>B',
41 if elements > 0 and type == 'u8':
42 self.packer = struct.Struct('>%ss' % elements)
44 self.packer = struct.Struct(base_types[type])
45 self.size = self.packer.size
46 logger.debug('Adding {} with format: {}'
47 .format(type, base_types[type]))
49 def pack(self, data, kwargs=None):
50 if not data: # Default to zero if not specified
52 return self.packer.pack(data)
54 def unpack(self, data, offset, result=None):
55 return self.packer.unpack_from(data, offset)[0], self.packer.size
58 types = {'u8': BaseTypes('u8'), 'u16': BaseTypes('u16'),
59 'u32': BaseTypes('u32'), 'i32': BaseTypes('i32'),
60 'u64': BaseTypes('u64'), 'f64': BaseTypes('f64'),
61 'bool': BaseTypes('bool')}
64 def vpp_get_type(name):
71 class VPPSerializerValueError(ValueError):
75 class FixedList_u8(object):
76 def __init__(self, name, field_type, num):
79 self.packer = BaseTypes(field_type, num)
80 self.size = self.packer.size
82 def pack(self, list, kwargs=None):
83 """Packs a fixed length bytestring. Left-pads with zeros
84 if input data is too short."""
86 return b'\x00' * self.size
87 if len(list) > self.num:
88 raise VPPSerializerValueError(
89 'Fixed list length error for "{}", got: {}'
91 .format(self.name, len(list), self.num))
92 return self.packer.pack(list)
94 def unpack(self, data, offset=0, result=None):
95 if len(data[offset:]) < self.num:
96 raise VPPSerializerValueError(
97 'Invalid array length for "{}" got {}'
99 .format(self.name, len(data[offset:]), self.num))
100 return self.packer.unpack(data, offset)
103 class FixedList(object):
104 def __init__(self, name, field_type, num):
106 self.packer = types[field_type]
107 self.size = self.packer.size * num
109 def pack(self, list, kwargs):
110 if len(list) != self.num:
111 raise VPPSerializerValueError(
112 'Fixed list length error, got: {} expected: {}'
113 .format(len(list), self.num))
116 b += self.packer.pack(e)
119 def unpack(self, data, offset=0, result=None):
120 # Return a list of arguments
123 for e in range(self.num):
124 x, size = self.packer.unpack(data, offset)
131 class VLAList(object):
132 def __init__(self, name, field_type, len_field_name, index):
135 self.packer = types[field_type]
136 self.size = self.packer.size
137 self.length_field = len_field_name
139 def pack(self, list, kwargs=None):
142 if len(list) != kwargs[self.length_field]:
143 raise VPPSerializerValueError(
144 'Variable length error, got: {} expected: {}'
145 .format(len(list), kwargs[self.length_field]))
149 if self.packer.size == 1:
150 return bytearray(list)
153 b += self.packer.pack(e)
156 def unpack(self, data, offset=0, result=None):
157 # Return a list of arguments
161 if self.packer.size == 1:
162 if result[self.index] == 0:
164 p = BaseTypes('u8', result[self.index])
165 return p.unpack(data, offset)
168 for e in range(result[self.index]):
169 x, size = self.packer.unpack(data, offset)
176 class VLAList_legacy():
177 def __init__(self, name, field_type):
178 self.packer = types[field_type]
179 self.size = self.packer.size
181 def pack(self, list, kwargs=None):
182 if self.packer.size == 1:
187 b += self.packer.pack(e)
190 def unpack(self, data, offset=0, result=None):
192 # Return a list of arguments
193 if (len(data) - offset) % self.packer.size:
194 raise VPPSerializerValueError(
195 'Legacy Variable Length Array length mismatch.')
196 elements = int((len(data) - offset) / self.packer.size)
198 for e in range(elements):
199 x, size = self.packer.unpack(data, offset)
201 offset += self.packer.size
206 class VPPEnumType(object):
207 def __init__(self, name, msgdef):
208 self.size = types['u32'].size
211 if type(f) is dict and 'enumtype' in f:
212 if f['enumtype'] != 'u32':
213 raise NotImplementedError
216 e_hash[ename] = evalue
217 self.enum = IntEnum(name, e_hash)
219 logger.debug('Adding enum {}'.format(name))
221 def __getattr__(self, name):
222 return self.enum[name]
224 def __nonzero__(self):
227 def pack(self, data, kwargs=None):
228 return types['u32'].pack(data)
230 def unpack(self, data, offset=0, result=None):
231 x, size = types['u32'].unpack(data, offset)
232 return self.enum(x), size
235 class VPPUnionType(object):
236 def __init__(self, name, msgdef):
241 self.packers = collections.OrderedDict()
242 for i, f in enumerate(msgdef):
243 if type(f) is dict and 'crc' in f:
247 if f_type not in types:
248 logger.debug('Unknown union type {}'.format(f_type))
249 raise VPPSerializerValueError(
250 'Unknown message type {}'.format(f_type))
251 fields.append(f_name)
252 size = types[f_type].size
253 self.packers[f_name] = types[f_type]
259 self.tuple = collections.namedtuple(name, fields, rename=True)
260 logger.debug('Adding union {}'.format(name))
262 # Union of variable length?
263 def pack(self, data, kwargs=None):
265 return b'\x00' * self.size
267 for k, v in data.items():
268 logger.debug("Key: {} Value: {}".format(k, v))
269 b = self.packers[k].pack(v, kwargs)
271 r = bytearray(self.size)
275 def unpack(self, data, offset=0, result=None):
278 for k, p in self.packers.items():
279 x, size = p.unpack(data, offset)
283 return self.tuple._make(r), maxsize
286 def VPPTypeAlias(name, msgdef):
287 t = vpp_get_type(msgdef['type'])
290 if 'length' in msgdef:
291 if msgdef['length'] == 0:
293 if msgdef['type'] == 'u8':
294 types[name] = FixedList_u8(name, msgdef['type'],
297 types[name] = FixedList(name, msgdef['type'], msgdef['length'])
302 class VPPType(object):
303 # Set everything up to be able to pack / unpack
304 def __init__(self, name, msgdef):
310 self.field_by_name = {}
312 for i, f in enumerate(msgdef):
313 if type(f) is dict and 'crc' in f:
316 f_type, f_name = f[:2]
317 self.fields.append(f_name)
318 self.field_by_name[f_name] = None
319 self.fieldtypes.append(f_type)
320 if f_type not in types:
321 logger.debug('Unknown type {}'.format(f_type))
322 raise VPPSerializerValueError(
323 'Unknown message type {}'.format(f_type))
324 if len(f) == 3: # list
326 if list_elements == 0:
327 p = VLAList_legacy(f_name, f_type)
328 self.packers.append(p)
330 p = FixedList_u8(f_name, f_type, list_elements)
331 self.packers.append(p)
334 p = FixedList(f_name, f_type, list_elements)
335 self.packers.append(p)
337 elif len(f) == 4: # Variable length list
338 # Find index of length field
339 length_index = self.fields.index(f[3])
340 p = VLAList(f_name, f_type, f[3], length_index)
341 self.packers.append(p)
343 self.packers.append(types[f_type])
344 size += types[f_type].size
347 self.tuple = collections.namedtuple(name, self.fields, rename=True)
349 logger.debug('Adding type {}'.format(name))
351 def pack(self, data, kwargs=None):
355 for i, a in enumerate(self.fields):
357 # Try one of the format functions
358 if data and type(data) is not dict and a not in data:
359 raise VPPSerializerValueError(
360 "Invalid argument: {} expected {}.{}".
361 format(data, self.name, a))
363 # Defaulting to zero.
364 if not data or a not in data: # Default to 0
366 kwarg = None # No default for VLA
369 kwarg = kwargs[a] if a in kwargs else None
371 if isinstance(self.packers[i], VPPType):
373 b += self.packers[i].pack(arg, kwarg)
375 # Invalid argument, can we convert it?
376 arg = VPPFormat.format(self.packers[i].name, data[a])
379 b += self.packers[i].pack(arg, kwarg)
381 b += self.packers[i].pack(arg, kwargs)
385 def unpack(self, data, offset=0, result=None):
386 # Return a list of arguments
389 for p in self.packers:
390 x, size = p.unpack(data, offset, result)
391 if type(x) is tuple and len(x) == 1:
396 t = self.tuple._make(result)
400 class VPPMessage(VPPType):