2 # Copyright (c) 2018 Cisco and/or its affiliates.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
18 from enum import IntEnum
20 from . import vpp_format
26 # Set log-level in application by doing e.g.:
27 # logger = logging.getLogger('vpp_serializer')
28 # logger.setLevel(logging.DEBUG)
30 logger = logging.getLogger(__name__)
32 if sys.version[0] == '2':
33 check = lambda d: type(d) is dict
35 check = lambda d: type(d) is dict or type(d) is bytes
37 def conversion_required(data, field_type):
41 if type(data).__name__ in vpp_format.conversion_table[field_type]:
47 def conversion_packer(data, field_type):
48 t = type(data).__name__
49 return types[field_type].pack(vpp_format.
50 conversion_table[field_type][t](data))
53 def conversion_unpacker(data, field_type):
54 if field_type not in vpp_format.conversion_unpacker_table:
56 return vpp_format.conversion_unpacker_table[field_type](data)
59 class BaseTypes(object):
60 def __init__(self, type, elements=0):
61 base_types = {'u8': '>B',
70 if elements > 0 and type == 'u8':
71 self.packer = struct.Struct('>%ss' % elements)
73 self.packer = struct.Struct(base_types[type])
74 self.size = self.packer.size
75 logger.debug('Adding {} with format: {}'
76 .format(type, base_types[type]))
78 def pack(self, data, kwargs=None):
79 if not data: # Default to zero if not specified
81 return self.packer.pack(data)
83 def unpack(self, data, offset, result=None, ntc=False):
84 return self.packer.unpack_from(data, offset)[0], self.packer.size
87 types = {'u8': BaseTypes('u8'), 'u16': BaseTypes('u16'),
88 'u32': BaseTypes('u32'), 'i32': BaseTypes('i32'),
89 'u64': BaseTypes('u64'), 'f64': BaseTypes('f64'),
90 'bool': BaseTypes('bool')}
93 def vpp_get_type(name):
100 class VPPSerializerValueError(ValueError):
104 class FixedList_u8(object):
105 def __init__(self, name, field_type, num):
108 self.packer = BaseTypes(field_type, num)
109 self.size = self.packer.size
111 def pack(self, data, kwargs=None):
112 """Packs a fixed length bytestring. Left-pads with zeros
113 if input data is too short."""
115 return b'\x00' * self.size
117 if len(data) > self.num:
118 raise VPPSerializerValueError(
119 'Fixed list length error for "{}", got: {}'
121 .format(self.name, len(data), self.num))
123 return self.packer.pack(data)
125 def unpack(self, data, offset=0, result=None, ntc=False):
126 if len(data[offset:]) < self.num:
127 raise VPPSerializerValueError(
128 'Invalid array length for "{}" got {}'
130 .format(self.name, len(data[offset:]), self.num))
131 return self.packer.unpack(data, offset)
134 class FixedList(object):
135 def __init__(self, name, field_type, num):
137 self.packer = types[field_type]
138 self.size = self.packer.size * num
140 self.field_type = field_type
142 def pack(self, list, kwargs):
143 if len(list) != self.num:
144 raise VPPSerializerValueError(
145 'Fixed list length error, got: {} expected: {}'
146 .format(len(list), self.num))
149 b += self.packer.pack(e)
152 def unpack(self, data, offset=0, result=None, ntc=False):
153 # Return a list of arguments
156 for e in range(self.num):
157 x, size = self.packer.unpack(data, offset, ntc=ntc)
164 class VLAList(object):
165 def __init__(self, name, field_type, len_field_name, index):
168 self.packer = types[field_type]
169 self.size = self.packer.size
170 self.length_field = len_field_name
172 def pack(self, list, kwargs=None):
175 if len(list) != kwargs[self.length_field]:
176 raise VPPSerializerValueError(
177 'Variable length error, got: {} expected: {}'
178 .format(len(list), kwargs[self.length_field]))
182 if self.packer.size == 1:
183 return bytearray(list)
186 b += self.packer.pack(e)
189 def unpack(self, data, offset=0, result=None, ntc=False):
190 # Return a list of arguments
194 if self.packer.size == 1:
195 if result[self.index] == 0:
197 p = BaseTypes('u8', result[self.index])
198 return p.unpack(data, offset, ntc=ntc)
201 for e in range(result[self.index]):
202 x, size = self.packer.unpack(data, offset, ntc=ntc)
209 class VLAList_legacy():
210 def __init__(self, name, field_type):
211 self.packer = types[field_type]
212 self.size = self.packer.size
214 def pack(self, list, kwargs=None):
215 if self.packer.size == 1:
220 b += self.packer.pack(e)
223 def unpack(self, data, offset=0, result=None, ntc=False):
225 # Return a list of arguments
226 if (len(data) - offset) % self.packer.size:
227 raise VPPSerializerValueError(
228 'Legacy Variable Length Array length mismatch.')
229 elements = int((len(data) - offset) / self.packer.size)
231 for e in range(elements):
232 x, size = self.packer.unpack(data, offset, ntc=ntc)
234 offset += self.packer.size
239 class VPPEnumType(object):
240 def __init__(self, name, msgdef):
241 self.size = types['u32'].size
244 if type(f) is dict and 'enumtype' in f:
245 if f['enumtype'] != 'u32':
246 raise NotImplementedError
249 e_hash[ename] = evalue
250 self.enum = IntEnum(name, e_hash)
252 logger.debug('Adding enum {}'.format(name))
254 def __getattr__(self, name):
255 return self.enum[name]
257 def __nonzero__(self):
260 def pack(self, data, kwargs=None):
261 return types['u32'].pack(data)
263 def unpack(self, data, offset=0, result=None, ntc=False):
264 x, size = types['u32'].unpack(data, offset)
265 return self.enum(x), size
268 class VPPUnionType(object):
269 def __init__(self, name, msgdef):
274 self.packers = collections.OrderedDict()
275 for i, f in enumerate(msgdef):
276 if type(f) is dict and 'crc' in f:
280 if f_type not in types:
281 logger.debug('Unknown union type {}'.format(f_type))
282 raise VPPSerializerValueError(
283 'Unknown message type {}'.format(f_type))
284 fields.append(f_name)
285 size = types[f_type].size
286 self.packers[f_name] = types[f_type]
292 self.tuple = collections.namedtuple(name, fields, rename=True)
293 logger.debug('Adding union {}'.format(name))
295 # Union of variable length?
296 def pack(self, data, kwargs=None):
298 return b'\x00' * self.size
300 for k, v in data.items():
301 logger.debug("Key: {} Value: {}".format(k, v))
302 b = self.packers[k].pack(v, kwargs)
304 r = bytearray(self.size)
308 def unpack(self, data, offset=0, result=None, ntc=False):
311 for k, p in self.packers.items():
312 x, size = p.unpack(data, offset, ntc=ntc)
316 return self.tuple._make(r), maxsize
319 class VPPTypeAlias(object):
320 def __init__(self, name, msgdef):
322 t = vpp_get_type(msgdef['type'])
325 if 'length' in msgdef:
326 if msgdef['length'] == 0:
328 if msgdef['type'] == 'u8':
329 self.packer = FixedList_u8(name, msgdef['type'],
331 self.size = self.packer.size
333 self.packer = FixedList(name, msgdef['type'], msgdef['length'])
340 def pack(self, data, kwargs=None):
341 if data and conversion_required(data, self.name):
343 return conversion_packer(data, self.name)
344 # Python 2 and 3 raises different exceptions from inet_pton
345 except(OSError, socket.error, TypeError):
348 return self.packer.pack(data, kwargs)
350 def unpack(self, data, offset=0, result=None, ntc=False):
351 t, size = self.packer.unpack(data, offset, result, ntc=ntc)
353 return conversion_unpacker(t, self.name), size
357 class VPPType(object):
358 # Set everything up to be able to pack / unpack
359 def __init__(self, name, msgdef):
365 self.field_by_name = {}
367 for i, f in enumerate(msgdef):
368 if type(f) is dict and 'crc' in f:
371 f_type, f_name = f[:2]
372 self.fields.append(f_name)
373 self.field_by_name[f_name] = None
374 self.fieldtypes.append(f_type)
375 if f_type not in types:
376 logger.debug('Unknown type {}'.format(f_type))
377 raise VPPSerializerValueError(
378 'Unknown message type {}'.format(f_type))
379 if len(f) == 3: # list
381 if list_elements == 0:
382 p = VLAList_legacy(f_name, f_type)
383 self.packers.append(p)
385 p = FixedList_u8(f_name, f_type, list_elements)
386 self.packers.append(p)
389 p = FixedList(f_name, f_type, list_elements)
390 self.packers.append(p)
392 elif len(f) == 4: # Variable length list
393 # Find index of length field
394 length_index = self.fields.index(f[3])
395 p = VLAList(f_name, f_type, f[3], length_index)
396 self.packers.append(p)
398 self.packers.append(types[f_type])
399 size += types[f_type].size
402 self.tuple = collections.namedtuple(name, self.fields, rename=True)
404 logger.debug('Adding type {}'.format(name))
406 def pack(self, data, kwargs=None):
411 # Try one of the format functions
412 if data and conversion_required(data, self.name):
413 return conversion_packer(data, self.name)
415 for i, a in enumerate(self.fields):
416 if data and type(data) is not dict and a not in data:
417 raise VPPSerializerValueError(
418 "Invalid argument: {} expected {}.{}".
419 format(data, self.name, a))
421 # Defaulting to zero.
422 if not data or a not in data: # Default to 0
424 kwarg = None # No default for VLA
427 kwarg = kwargs[a] if a in kwargs else None
428 if isinstance(self.packers[i], VPPType):
429 b += self.packers[i].pack(arg, kwarg)
431 b += self.packers[i].pack(arg, kwargs)
435 def unpack(self, data, offset=0, result=None, ntc=False):
436 # Return a list of arguments
439 for p in self.packers:
440 x, size = p.unpack(data, offset, result, ntc)
441 if type(x) is tuple and len(x) == 1:
446 t = self.tuple._make(result)
448 t = conversion_unpacker(t, self.name)
452 class VPPMessage(VPPType):