2 # Copyright (c) 2018 Cisco and/or its affiliates.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
18 from enum import IntEnum
20 from . import vpp_format
26 # Set log-level in application by doing e.g.:
27 # logger = logging.getLogger('vpp_serializer')
28 # logger.setLevel(logging.DEBUG)
30 logger = logging.getLogger(__name__)
32 if sys.version[0] == '2':
33 def check(d): type(d) is dict
35 def check(d): type(d) is dict or type(d) is bytes
38 def conversion_required(data, field_type):
42 if type(data).__name__ in vpp_format.conversion_table[field_type]:
48 def conversion_packer(data, field_type):
49 t = type(data).__name__
50 return types[field_type].pack(vpp_format.
51 conversion_table[field_type][t](data))
54 def conversion_unpacker(data, field_type):
55 if field_type not in vpp_format.conversion_unpacker_table:
57 return vpp_format.conversion_unpacker_table[field_type](data)
60 class BaseTypes(object):
61 def __init__(self, type, elements=0):
62 base_types = {'u8': '>B',
72 if elements > 0 and (type == 'u8' or type == 'string'):
73 self.packer = struct.Struct('>%ss' % elements)
75 self.packer = struct.Struct(base_types[type])
76 self.size = self.packer.size
78 def pack(self, data, kwargs=None):
79 if not data: # Default to zero if not specified
81 return self.packer.pack(data)
83 def unpack(self, data, offset, result=None, ntc=False):
84 return self.packer.unpack_from(data, offset)[0], self.packer.size
91 self.length_field_packer = BaseTypes('u32')
93 def pack(self, list, kwargs=None):
95 return self.length_field_packer.pack(0) + b""
96 return self.length_field_packer.pack(len(list)) + list.encode('utf8')
98 def unpack(self, data, offset=0, result=None, ntc=False):
99 length, length_field_size = self.length_field_packer.unpack(data,
103 p = BaseTypes('u8', length)
104 x, size = p.unpack(data, offset + length_field_size)
105 x2 = x.split(b'\0', 1)[0]
106 return (x2.decode('utf8'), size + length_field_size)
109 types = {'u8': BaseTypes('u8'), 'u16': BaseTypes('u16'),
110 'u32': BaseTypes('u32'), 'i32': BaseTypes('i32'),
111 'u64': BaseTypes('u64'), 'f64': BaseTypes('f64'),
112 'bool': BaseTypes('bool'), 'string': String()}
115 def vpp_get_type(name):
122 class VPPSerializerValueError(ValueError):
126 class FixedList_u8(object):
127 def __init__(self, name, field_type, num):
130 self.packer = BaseTypes(field_type, num)
131 self.size = self.packer.size
132 self.field_type = field_type
134 def pack(self, data, kwargs=None):
135 """Packs a fixed length bytestring. Left-pads with zeros
136 if input data is too short."""
138 return b'\x00' * self.size
140 if len(data) > self.num:
141 raise VPPSerializerValueError(
142 'Fixed list length error for "{}", got: {}'
144 .format(self.name, len(data), self.num))
146 return self.packer.pack(data)
148 def unpack(self, data, offset=0, result=None, ntc=False):
149 if len(data[offset:]) < self.num:
150 raise VPPSerializerValueError(
151 'Invalid array length for "{}" got {}'
153 .format(self.name, len(data[offset:]), self.num))
154 if self.field_type == 'string':
155 s = self.packer.unpack(data, offset)
156 s2 = s[0].split(b'\0', 1)[0]
157 return (s2.decode('utf-8'), self.num)
158 return self.packer.unpack(data, offset)
161 class FixedList(object):
162 def __init__(self, name, field_type, num):
164 self.packer = types[field_type]
165 self.size = self.packer.size * num
167 self.field_type = field_type
169 def pack(self, list, kwargs):
170 if len(list) != self.num:
171 raise VPPSerializerValueError(
172 'Fixed list length error, got: {} expected: {}'
173 .format(len(list), self.num))
176 b += self.packer.pack(e)
179 def unpack(self, data, offset=0, result=None, ntc=False):
180 # Return a list of arguments
183 for e in range(self.num):
184 x, size = self.packer.unpack(data, offset, ntc=ntc)
191 class VLAList(object):
192 def __init__(self, name, field_type, len_field_name, index):
194 self.field_type = field_type
196 self.packer = types[field_type]
197 self.size = self.packer.size
198 self.length_field = len_field_name
200 def pack(self, list, kwargs=None):
203 if len(list) != kwargs[self.length_field]:
204 raise VPPSerializerValueError(
205 'Variable length error, got: {} expected: {}'
206 .format(len(list), kwargs[self.length_field]))
211 if self.packer.size == 1:
212 return bytearray(list)
215 b += self.packer.pack(e)
218 def unpack(self, data, offset=0, result=None, ntc=False):
219 # Return a list of arguments
223 if self.packer.size == 1:
224 if result[self.index] == 0:
226 p = BaseTypes('u8', result[self.index])
227 return p.unpack(data, offset, ntc=ntc)
230 for e in range(result[self.index]):
231 x, size = self.packer.unpack(data, offset, ntc=ntc)
238 class VLAList_legacy():
239 def __init__(self, name, field_type):
240 self.packer = types[field_type]
241 self.size = self.packer.size
243 def pack(self, list, kwargs=None):
244 if self.packer.size == 1:
249 b += self.packer.pack(e)
252 def unpack(self, data, offset=0, result=None, ntc=False):
254 # Return a list of arguments
255 if (len(data) - offset) % self.packer.size:
256 raise VPPSerializerValueError(
257 'Legacy Variable Length Array length mismatch.')
258 elements = int((len(data) - offset) / self.packer.size)
260 for e in range(elements):
261 x, size = self.packer.unpack(data, offset, ntc=ntc)
263 offset += self.packer.size
268 class VPPEnumType(object):
269 def __init__(self, name, msgdef):
270 self.size = types['u32'].size
273 if type(f) is dict and 'enumtype' in f:
274 if f['enumtype'] != 'u32':
275 raise NotImplementedError
278 e_hash[ename] = evalue
279 self.enum = IntEnum(name, e_hash)
282 def __getattr__(self, name):
283 return self.enum[name]
285 def __nonzero__(self):
288 def pack(self, data, kwargs=None):
289 return types['u32'].pack(data)
291 def unpack(self, data, offset=0, result=None, ntc=False):
292 x, size = types['u32'].unpack(data, offset)
293 return self.enum(x), size
296 class VPPUnionType(object):
297 def __init__(self, name, msgdef):
302 self.packers = collections.OrderedDict()
303 for i, f in enumerate(msgdef):
304 if type(f) is dict and 'crc' in f:
308 if f_type not in types:
309 logger.debug('Unknown union type {}'.format(f_type))
310 raise VPPSerializerValueError(
311 'Unknown message type {}'.format(f_type))
312 fields.append(f_name)
313 size = types[f_type].size
314 self.packers[f_name] = types[f_type]
320 self.tuple = collections.namedtuple(name, fields, rename=True)
322 # Union of variable length?
323 def pack(self, data, kwargs=None):
325 return b'\x00' * self.size
327 for k, v in data.items():
328 logger.debug("Key: {} Value: {}".format(k, v))
329 b = self.packers[k].pack(v, kwargs)
331 r = bytearray(self.size)
335 def unpack(self, data, offset=0, result=None, ntc=False):
338 for k, p in self.packers.items():
339 x, size = p.unpack(data, offset, ntc=ntc)
343 return self.tuple._make(r), maxsize
346 class VPPTypeAlias(object):
347 def __init__(self, name, msgdef):
349 t = vpp_get_type(msgdef['type'])
352 if 'length' in msgdef:
353 if msgdef['length'] == 0:
355 if msgdef['type'] == 'u8':
356 self.packer = FixedList_u8(name, msgdef['type'],
358 self.size = self.packer.size
360 self.packer = FixedList(name, msgdef['type'], msgdef['length'])
367 def pack(self, data, kwargs=None):
368 if data and conversion_required(data, self.name):
370 return conversion_packer(data, self.name)
371 # Python 2 and 3 raises different exceptions from inet_pton
372 except(OSError, socket.error, TypeError):
375 return self.packer.pack(data, kwargs)
377 def unpack(self, data, offset=0, result=None, ntc=False):
378 t, size = self.packer.unpack(data, offset, result, ntc=ntc)
380 return conversion_unpacker(t, self.name), size
384 class VPPType(object):
385 # Set everything up to be able to pack / unpack
386 def __init__(self, name, msgdef):
392 self.field_by_name = {}
394 for i, f in enumerate(msgdef):
395 if type(f) is dict and 'crc' in f:
398 f_type, f_name = f[:2]
399 self.fields.append(f_name)
400 self.field_by_name[f_name] = None
401 self.fieldtypes.append(f_type)
402 if f_type not in types:
403 logger.debug('Unknown type {}'.format(f_type))
404 raise VPPSerializerValueError(
405 'Unknown message type {}'.format(f_type))
406 if len(f) == 3: # list
408 if list_elements == 0:
409 p = VLAList_legacy(f_name, f_type)
410 self.packers.append(p)
411 elif f_type == 'u8' or f_type == 'string':
412 p = FixedList_u8(f_name, f_type, list_elements)
413 self.packers.append(p)
416 p = FixedList(f_name, f_type, list_elements)
417 self.packers.append(p)
419 elif len(f) == 4: # Variable length list
420 length_index = self.fields.index(f[3])
421 p = VLAList(f_name, f_type, f[3], length_index)
422 self.packers.append(p)
424 self.packers.append(types[f_type])
425 size += types[f_type].size
428 self.tuple = collections.namedtuple(name, self.fields, rename=True)
431 def pack(self, data, kwargs=None):
436 # Try one of the format functions
437 if data and conversion_required(data, self.name):
438 return conversion_packer(data, self.name)
440 for i, a in enumerate(self.fields):
441 if data and type(data) is not dict and a not in data:
442 raise VPPSerializerValueError(
443 "Invalid argument: {} expected {}.{}".
444 format(data, self.name, a))
446 # Defaulting to zero.
447 if not data or a not in data: # Default to 0
449 kwarg = None # No default for VLA
452 kwarg = kwargs[a] if a in kwargs else None
453 if isinstance(self.packers[i], VPPType):
454 b += self.packers[i].pack(arg, kwarg)
456 b += self.packers[i].pack(arg, kwargs)
460 def unpack(self, data, offset=0, result=None, ntc=False):
461 # Return a list of arguments
464 for p in self.packers:
465 x, size = p.unpack(data, offset, result, ntc)
466 if type(x) is tuple and len(x) == 1:
471 t = self.tuple._make(result)
473 t = conversion_unpacker(t, self.name)
477 class VPPMessage(VPPType):