2 # Copyright (c) 2018 Cisco and/or its affiliates.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
21 if sys.version_info <= (3, 4):
22 from aenum import IntEnum # noqa: F401
24 from enum import IntEnum # noqa: F401
26 if sys.version_info <= (3, 6):
27 from aenum import IntFlag # noqa: F401
30 from enum import IntFlag # noqa: F401
32 from . import vpp_format # noqa: E402
35 # Set log-level in application by doing e.g.:
36 # logger = logging.getLogger('vpp_serializer')
37 # logger.setLevel(logging.DEBUG)
39 logger = logging.getLogger(__name__)
41 if sys.version[0] == '2':
42 def check(d): type(d) is dict
44 def check(d): type(d) is dict or type(d) is bytes
47 def conversion_required(data, field_type):
51 if type(data).__name__ in vpp_format.conversion_table[field_type]:
57 def conversion_packer(data, field_type):
58 t = type(data).__name__
59 return types[field_type].pack(vpp_format.
60 conversion_table[field_type][t](data))
63 def conversion_unpacker(data, field_type):
64 if field_type not in vpp_format.conversion_unpacker_table:
66 return vpp_format.conversion_unpacker_table[field_type](data)
69 class BaseTypes(object):
70 def __init__(self, type, elements=0, options=None):
71 base_types = {'u8': '>B',
84 if elements > 0 and (type == 'u8' or type == 'string'):
85 self.packer = struct.Struct('>%ss' % elements)
87 self.packer = struct.Struct(base_types[type])
88 self.size = self.packer.size
89 self.options = options
91 def pack(self, data, kwargs=None):
92 if data is None: # Default to zero if not specified
93 if self.options and 'default' in self.options:
94 data = self.options['default']
97 return self.packer.pack(data)
99 def unpack(self, data, offset, result=None, ntc=False):
100 return self.packer.unpack_from(data, offset)[0], self.packer.size
103 class String(object):
104 def __init__(self, name, num, options):
108 self.length_field_packer = BaseTypes('u32')
109 self.limit = options['limit'] if 'limit' in options else num
110 self.fixed = True if num else False
111 if self.fixed and not self.limit:
112 raise VPPSerializerValueError(
113 "Invalid argument length for: {}, {} maximum {}".
114 format(list, len(list), self.limit))
116 def pack(self, list, kwargs=None):
119 return b"\x00" * self.limit
120 return self.length_field_packer.pack(0) + b""
121 if self.limit and len(list) > self.limit - 1:
122 raise VPPSerializerValueError(
123 "Invalid argument length for: {}, {} maximum {}".
124 format(list, len(list), self.limit - 1))
126 return list.encode('ascii').ljust(self.limit, b'\x00')
127 return self.length_field_packer.pack(len(list)) + list.encode('ascii')
129 def unpack(self, data, offset=0, result=None, ntc=False):
131 p = BaseTypes('u8', self.num)
132 s = p.unpack(data, offset)
133 s2 = s[0].split(b'\0', 1)[0]
134 return (s2.decode('ascii'), self.num)
136 length, length_field_size = self.length_field_packer.unpack(data,
140 p = BaseTypes('u8', length)
141 x, size = p.unpack(data, offset + length_field_size)
142 #x2 = x.split(b'\0', 1)[0]
143 return (x.decode('ascii', errors='replace'), size + length_field_size)
146 types = {'u8': BaseTypes('u8'), 'u16': BaseTypes('u16'),
147 'u32': BaseTypes('u32'), 'i32': BaseTypes('i32'),
148 'u64': BaseTypes('u64'), 'f64': BaseTypes('f64'),
149 'bool': BaseTypes('bool'), 'string': String}
153 def vpp_get_type(name):
160 class VPPSerializerValueError(ValueError):
164 class FixedList_u8(object):
165 def __init__(self, name, field_type, num):
168 self.packer = BaseTypes(field_type, num)
169 self.size = self.packer.size
170 self.field_type = field_type
172 def pack(self, data, kwargs=None):
173 """Packs a fixed length bytestring. Left-pads with zeros
174 if input data is too short."""
176 return b'\x00' * self.size
178 if len(data) > self.num:
179 raise VPPSerializerValueError(
180 'Fixed list length error for "{}", got: {}'
182 .format(self.name, len(data), self.num))
185 return self.packer.pack(data)
187 raise VPPSerializerValueError(
188 'Packing failed for "{}" {}'
189 .format(self.name, kwargs))
190 def unpack(self, data, offset=0, result=None, ntc=False):
191 if len(data[offset:]) < self.num:
192 raise VPPSerializerValueError(
193 'Invalid array length for "{}" got {}'
195 .format(self.name, len(data[offset:]), self.num))
196 return self.packer.unpack(data, offset)
199 class FixedList(object):
200 def __init__(self, name, field_type, num):
202 self.packer = types[field_type]
203 self.size = self.packer.size * num
205 self.field_type = field_type
207 def pack(self, list, kwargs):
208 if len(list) != self.num:
209 raise VPPSerializerValueError(
210 'Fixed list length error, got: {} expected: {}'
211 .format(len(list), self.num))
214 b += self.packer.pack(e)
217 def unpack(self, data, offset=0, result=None, ntc=False):
218 # Return a list of arguments
221 for e in range(self.num):
222 x, size = self.packer.unpack(data, offset, ntc=ntc)
229 class VLAList(object):
230 def __init__(self, name, field_type, len_field_name, index):
232 self.field_type = field_type
234 self.packer = types[field_type]
235 self.size = self.packer.size
236 self.length_field = len_field_name
238 def pack(self, lst, kwargs=None):
241 if len(lst) != kwargs[self.length_field]:
242 raise VPPSerializerValueError(
243 'Variable length error, got: {} expected: {}'
244 .format(len(lst), kwargs[self.length_field]))
247 if self.packer.size == 1:
248 if isinstance(lst, list):
254 b += self.packer.pack(e)
257 def unpack(self, data, offset=0, result=None, ntc=False):
258 # Return a list of arguments
262 if self.packer.size == 1:
263 if result[self.index] == 0:
265 p = BaseTypes('u8', result[self.index])
266 return p.unpack(data, offset, ntc=ntc)
269 for e in range(result[self.index]):
270 x, size = self.packer.unpack(data, offset, ntc=ntc)
277 class VLAList_legacy():
278 def __init__(self, name, field_type):
279 self.packer = types[field_type]
280 self.size = self.packer.size
282 def pack(self, list, kwargs=None):
283 if self.packer.size == 1:
288 b += self.packer.pack(e)
291 def unpack(self, data, offset=0, result=None, ntc=False):
293 # Return a list of arguments
294 if (len(data) - offset) % self.packer.size:
295 raise VPPSerializerValueError(
296 'Legacy Variable Length Array length mismatch.')
297 elements = int((len(data) - offset) / self.packer.size)
299 for e in range(elements):
300 x, size = self.packer.unpack(data, offset, ntc=ntc)
302 offset += self.packer.size
307 class VPPEnumType(object):
308 def __init__(self, name, msgdef, options=None):
309 self.size = types['u32'].size
311 self.enumtype = 'u32'
315 if type(f) is dict and 'enumtype' in f:
316 if f['enumtype'] != 'u32':
317 self.size = types[f['enumtype']].size
318 self.enumtype = f['enumtype']
321 e_hash[ename] = evalue
322 self.enum = IntFlag(name, e_hash)
324 class_types[name] = VPPEnumType
325 self.options = options
327 def __getattr__(self, name):
328 return self.enum[name]
333 if sys.version[0] == '2':
334 __nonzero__ = __bool__
336 def pack(self, data, kwargs=None):
337 if data is None: # Default to zero if not specified
338 if self.options and 'default' in self.options:
339 data = self.options['default']
343 return types[self.enumtype].pack(data)
345 def unpack(self, data, offset=0, result=None, ntc=False):
346 x, size = types[self.enumtype].unpack(data, offset)
347 return self.enum(x), size
350 class VPPUnionType(object):
351 def __init__(self, name, msgdef):
356 self.packers = collections.OrderedDict()
357 for i, f in enumerate(msgdef):
358 if type(f) is dict and 'crc' in f:
362 if f_type not in types:
363 logger.debug('Unknown union type {}'.format(f_type))
364 raise VPPSerializerValueError(
365 'Unknown message type {}'.format(f_type))
366 fields.append(f_name)
367 size = types[f_type].size
368 self.packers[f_name] = types[f_type]
374 self.tuple = collections.namedtuple(name, fields, rename=True)
376 # Union of variable length?
377 def pack(self, data, kwargs=None):
379 return b'\x00' * self.size
381 for k, v in data.items():
382 logger.debug("Key: {} Value: {}".format(k, v))
383 b = self.packers[k].pack(v, kwargs)
385 r = bytearray(self.size)
389 def unpack(self, data, offset=0, result=None, ntc=False):
392 for k, p in self.packers.items():
393 x, size = p.unpack(data, offset, ntc=ntc)
397 return self.tuple._make(r), maxsize
400 class VPPTypeAlias(object):
401 def __init__(self, name, msgdef):
403 t = vpp_get_type(msgdef['type'])
405 raise ValueError('No such type: {}'.format(msgdef['type']))
406 if 'length' in msgdef:
407 if msgdef['length'] == 0:
409 if msgdef['type'] == 'u8':
410 self.packer = FixedList_u8(name, msgdef['type'],
412 self.size = self.packer.size
414 self.packer = FixedList(name, msgdef['type'], msgdef['length'])
420 self.toplevelconversion = False
422 def pack(self, data, kwargs=None):
423 if data and conversion_required(data, self.name):
425 return conversion_packer(data, self.name)
426 # Python 2 and 3 raises different exceptions from inet_pton
427 except(OSError, socket.error, TypeError):
430 return self.packer.pack(data, kwargs)
432 def unpack(self, data, offset=0, result=None, ntc=False):
433 if ntc == False and self.name in vpp_format.conversion_unpacker_table:
434 # Disable type conversion for dependent types
436 self.toplevelconversion = True
437 t, size = self.packer.unpack(data, offset, result, ntc=ntc)
438 if self.toplevelconversion:
439 self.toplevelconversion = False
440 return conversion_unpacker(t, self.name), size
444 class VPPType(object):
445 # Set everything up to be able to pack / unpack
446 def __init__(self, name, msgdef):
452 self.field_by_name = {}
454 for i, f in enumerate(msgdef):
455 if type(f) is dict and 'crc' in f:
458 f_type, f_name = f[:2]
459 self.fields.append(f_name)
460 self.field_by_name[f_name] = None
461 self.fieldtypes.append(f_type)
462 if f_type not in types:
463 logger.debug('Unknown type {}'.format(f_type))
464 raise VPPSerializerValueError(
465 'Unknown message type {}'.format(f_type))
468 options = [x for x in f if type(x) is dict]
470 self.options = options[0]
474 if fieldlen == 3: # list
476 if list_elements == 0:
477 if f_type == 'string':
478 p = String(f_name, 0, self.options)
480 p = VLAList_legacy(f_name, f_type)
481 self.packers.append(p)
483 p = FixedList_u8(f_name, f_type, list_elements)
484 self.packers.append(p)
486 elif f_type == 'string':
487 p = String(f_name, list_elements, self.options)
488 self.packers.append(p)
491 p = FixedList(f_name, f_type, list_elements)
492 self.packers.append(p)
494 elif fieldlen == 4: # Variable length list
495 length_index = self.fields.index(f[3])
496 p = VLAList(f_name, f_type, f[3], length_index)
497 self.packers.append(p)
499 # Support default for basetypes and enums
500 if 'default' in self.options:
502 p = BaseTypes(f_type, 0, self.options)
504 p = class_types[f_type](f_name, types[f_type].msgdef, self.options)
507 self.packers.append(p)
511 self.tuple = collections.namedtuple(name, self.fields, rename=True)
513 self.toplevelconversion = False
515 def pack(self, data, kwargs=None):
520 # Try one of the format functions
521 if data and conversion_required(data, self.name):
522 return conversion_packer(data, self.name)
524 for i, a in enumerate(self.fields):
525 if data and type(data) is not dict and a not in data:
526 raise VPPSerializerValueError(
527 "Invalid argument: {} expected {}.{}".
528 format(data, self.name, a))
530 # Defaulting to zero.
531 if not data or a not in data: # Default to 0
533 kwarg = None # No default for VLA
536 kwarg = kwargs[a] if a in kwargs else None
537 if isinstance(self.packers[i], VPPType):
538 b += self.packers[i].pack(arg, kwarg)
540 b += self.packers[i].pack(arg, kwargs)
544 def unpack(self, data, offset=0, result=None, ntc=False):
545 # Return a list of arguments
548 if ntc == False and self.name in vpp_format.conversion_unpacker_table:
549 # Disable type conversion for dependent types
551 self.toplevelconversion = True
553 for p in self.packers:
554 x, size = p.unpack(data, offset, result, ntc)
555 if type(x) is tuple and len(x) == 1:
560 t = self.tuple._make(result)
562 if self.toplevelconversion:
563 self.toplevelconversion = False
564 t = conversion_unpacker(t, self.name)
568 class VPPMessage(VPPType):