2 # Copyright (c) 2018 Cisco and/or its affiliates.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
21 if sys.version_info <= (3, 4):
22 from aenum import IntEnum # noqa: F401
24 from enum import IntEnum # noqa: F401
26 if sys.version_info <= (3, 6):
27 from aenum import IntFlag # noqa: F401
30 from enum import IntFlag # noqa: F401
32 from . import vpp_format # noqa: E402
35 # Set log-level in application by doing e.g.:
36 # logger = logging.getLogger('vpp_serializer')
37 # logger.setLevel(logging.DEBUG)
39 logger = logging.getLogger(__name__)
41 if sys.version[0] == '2':
42 def check(d): type(d) is dict
44 def check(d): type(d) is dict or type(d) is bytes
47 def conversion_required(data, field_type):
51 if type(data).__name__ in vpp_format.conversion_table[field_type]:
57 def conversion_packer(data, field_type):
58 t = type(data).__name__
59 return types[field_type].pack(vpp_format.
60 conversion_table[field_type][t](data))
63 def conversion_unpacker(data, field_type):
64 if field_type not in vpp_format.conversion_unpacker_table:
66 return vpp_format.conversion_unpacker_table[field_type](data)
69 class BaseTypes(object):
70 def __init__(self, type, elements=0, options=None):
71 base_types = {'u8': '>B',
81 if elements > 0 and (type == 'u8' or type == 'string'):
82 self.packer = struct.Struct('>%ss' % elements)
84 self.packer = struct.Struct(base_types[type])
85 self.size = self.packer.size
86 self.options = options
88 def __call__(self, args):
92 def pack(self, data, kwargs=None):
93 if not data: # Default to zero if not specified
94 if self.options and 'default' in self.options:
95 data = self.options['default']
98 return self.packer.pack(data)
100 def unpack(self, data, offset, result=None, ntc=False):
101 return self.packer.unpack_from(data, offset)[0], self.packer.size
104 class String(object):
105 def __init__(self, options):
108 self.length_field_packer = BaseTypes('u32')
109 self.limit = options['limit'] if 'limit' in options else None
111 def pack(self, list, kwargs=None):
113 return self.length_field_packer.pack(0) + b""
114 if self.limit and len(list) > self.limit:
115 raise VPPSerializerValueError(
116 "Invalid argument length for: {}, {} maximum {}".
117 format(list, len(list), self.limit))
119 return self.length_field_packer.pack(len(list)) + list.encode('utf8')
121 def unpack(self, data, offset=0, result=None, ntc=False):
122 length, length_field_size = self.length_field_packer.unpack(data,
126 p = BaseTypes('u8', length)
127 x, size = p.unpack(data, offset + length_field_size)
128 x2 = x.split(b'\0', 1)[0]
129 return (x2.decode('utf8'), size + length_field_size)
132 types = {'u8': BaseTypes('u8'), 'u16': BaseTypes('u16'),
133 'u32': BaseTypes('u32'), 'i32': BaseTypes('i32'),
134 'u64': BaseTypes('u64'), 'f64': BaseTypes('f64'),
135 'bool': BaseTypes('bool'), 'string': String}
138 def vpp_get_type(name):
145 class VPPSerializerValueError(ValueError):
149 class FixedList_u8(object):
150 def __init__(self, name, field_type, num):
153 self.packer = BaseTypes(field_type, num)
154 self.size = self.packer.size
155 self.field_type = field_type
157 def __call__(self, args):
161 def pack(self, data, kwargs=None):
162 """Packs a fixed length bytestring. Left-pads with zeros
163 if input data is too short."""
165 return b'\x00' * self.size
167 if len(data) > self.num:
168 raise VPPSerializerValueError(
169 'Fixed list length error for "{}", got: {}'
171 .format(self.name, len(data), self.num))
173 return self.packer.pack(data)
175 def unpack(self, data, offset=0, result=None, ntc=False):
176 if len(data[offset:]) < self.num:
177 raise VPPSerializerValueError(
178 'Invalid array length for "{}" got {}'
180 .format(self.name, len(data[offset:]), self.num))
181 if self.field_type == 'string':
182 s = self.packer.unpack(data, offset)
183 s2 = s[0].split(b'\0', 1)[0]
184 return (s2.decode('utf-8'), self.num)
185 return self.packer.unpack(data, offset)
188 class FixedList(object):
189 def __init__(self, name, field_type, num):
191 self.packer = types[field_type]
192 self.size = self.packer.size * num
194 self.field_type = field_type
196 def __call__(self, args):
200 def pack(self, list, kwargs):
201 if len(list) != self.num:
202 raise VPPSerializerValueError(
203 'Fixed list length error, got: {} expected: {}'
204 .format(len(list), self.num))
207 b += self.packer.pack(e)
210 def unpack(self, data, offset=0, result=None, ntc=False):
211 # Return a list of arguments
214 for e in range(self.num):
215 x, size = self.packer.unpack(data, offset, ntc=ntc)
222 class VLAList(object):
223 def __init__(self, name, field_type, len_field_name, index):
225 self.field_type = field_type
227 self.packer = types[field_type]
228 self.size = self.packer.size
229 self.length_field = len_field_name
231 def __call__(self, args):
235 def pack(self, list, kwargs=None):
238 if len(list) != kwargs[self.length_field]:
239 raise VPPSerializerValueError(
240 'Variable length error, got: {} expected: {}'
241 .format(len(list), kwargs[self.length_field]))
246 if self.packer.size == 1:
247 return bytearray(list)
250 b += self.packer.pack(e)
253 def unpack(self, data, offset=0, result=None, ntc=False):
254 # Return a list of arguments
258 if self.packer.size == 1:
259 if result[self.index] == 0:
261 p = BaseTypes('u8', result[self.index])
262 return p.unpack(data, offset, ntc=ntc)
265 for e in range(result[self.index]):
266 x, size = self.packer.unpack(data, offset, ntc=ntc)
273 class VLAList_legacy():
274 def __init__(self, name, field_type):
275 self.packer = types[field_type]
276 self.size = self.packer.size
278 def __call__(self, args):
282 def pack(self, list, kwargs=None):
283 if self.packer.size == 1:
288 b += self.packer.pack(e)
291 def unpack(self, data, offset=0, result=None, ntc=False):
293 # Return a list of arguments
294 if (len(data) - offset) % self.packer.size:
295 raise VPPSerializerValueError(
296 'Legacy Variable Length Array length mismatch.')
297 elements = int((len(data) - offset) / self.packer.size)
299 for e in range(elements):
300 x, size = self.packer.unpack(data, offset, ntc=ntc)
302 offset += self.packer.size
307 class VPPEnumType(object):
308 def __init__(self, name, msgdef):
309 self.size = types['u32'].size
310 self.enumtype = 'u32'
313 if type(f) is dict and 'enumtype' in f:
314 if f['enumtype'] != 'u32':
315 self.size = types[f['enumtype']].size
316 self.enumtype = f['enumtype']
319 e_hash[ename] = evalue
320 self.enum = IntFlag(name, e_hash)
323 def __call__(self, args):
327 def __getattr__(self, name):
328 return self.enum[name]
333 if sys.version[0] == '2':
334 __nonzero__ = __bool__
336 def pack(self, data, kwargs=None):
337 return types[self.enumtype].pack(data)
339 def unpack(self, data, offset=0, result=None, ntc=False):
340 x, size = types[self.enumtype].unpack(data, offset)
341 return self.enum(x), size
344 class VPPUnionType(object):
345 def __init__(self, name, msgdef):
350 self.packers = collections.OrderedDict()
351 for i, f in enumerate(msgdef):
352 if type(f) is dict and 'crc' in f:
356 if f_type not in types:
357 logger.debug('Unknown union type {}'.format(f_type))
358 raise VPPSerializerValueError(
359 'Unknown message type {}'.format(f_type))
360 fields.append(f_name)
361 size = types[f_type].size
362 self.packers[f_name] = types[f_type]
368 self.tuple = collections.namedtuple(name, fields, rename=True)
370 def __call__(self, args):
374 # Union of variable length?
375 def pack(self, data, kwargs=None):
377 return b'\x00' * self.size
379 for k, v in data.items():
380 logger.debug("Key: {} Value: {}".format(k, v))
381 b = self.packers[k].pack(v, kwargs)
383 r = bytearray(self.size)
387 def unpack(self, data, offset=0, result=None, ntc=False):
390 for k, p in self.packers.items():
391 x, size = p.unpack(data, offset, ntc=ntc)
395 return self.tuple._make(r), maxsize
398 class VPPTypeAlias(object):
399 def __init__(self, name, msgdef):
401 t = vpp_get_type(msgdef['type'])
404 if 'length' in msgdef:
405 if msgdef['length'] == 0:
407 if msgdef['type'] == 'u8':
408 self.packer = FixedList_u8(name, msgdef['type'],
410 self.size = self.packer.size
412 self.packer = FixedList(name, msgdef['type'], msgdef['length'])
419 def __call__(self, args):
423 def pack(self, data, kwargs=None):
424 if data and conversion_required(data, self.name):
426 return conversion_packer(data, self.name)
427 # Python 2 and 3 raises different exceptions from inet_pton
428 except(OSError, socket.error, TypeError):
431 return self.packer.pack(data, kwargs)
433 def unpack(self, data, offset=0, result=None, ntc=False):
434 t, size = self.packer.unpack(data, offset, result, ntc=ntc)
436 return conversion_unpacker(t, self.name), size
440 class VPPType(object):
441 # Set everything up to be able to pack / unpack
442 def __init__(self, name, msgdef):
448 self.field_by_name = {}
450 for i, f in enumerate(msgdef):
451 if type(f) is dict and 'crc' in f:
454 f_type, f_name = f[:2]
455 self.fields.append(f_name)
456 self.field_by_name[f_name] = None
457 self.fieldtypes.append(f_type)
458 if f_type not in types:
459 logger.debug('Unknown type {}'.format(f_type))
460 raise VPPSerializerValueError(
461 'Unknown message type {}'.format(f_type))
464 options = [x for x in f if type(x) is dict]
466 self.options = options[0]
470 if fieldlen == 3: # list
472 if list_elements == 0:
473 p = VLAList_legacy(f_name, f_type)
474 self.packers.append(p)
475 elif f_type == 'u8' or f_type == 'string':
476 p = FixedList_u8(f_name, f_type, list_elements)
477 self.packers.append(p)
480 p = FixedList(f_name, f_type, list_elements)
481 self.packers.append(p)
483 elif fieldlen == 4: # Variable length list
484 length_index = self.fields.index(f[3])
485 p = VLAList(f_name, f_type, f[3], length_index)
486 self.packers.append(p)
488 p = types[f_type](self.options)
489 self.packers.append(p)
493 self.tuple = collections.namedtuple(name, self.fields, rename=True)
496 def __call__(self, args):
500 def pack(self, data, kwargs=None):
505 # Try one of the format functions
506 if data and conversion_required(data, self.name):
507 return conversion_packer(data, self.name)
509 for i, a in enumerate(self.fields):
510 if data and type(data) is not dict and a not in data:
511 raise VPPSerializerValueError(
512 "Invalid argument: {} expected {}.{}".
513 format(data, self.name, a))
515 # Defaulting to zero.
516 if not data or a not in data: # Default to 0
518 kwarg = None # No default for VLA
521 kwarg = kwargs[a] if a in kwargs else None
522 if isinstance(self.packers[i], VPPType):
523 b += self.packers[i].pack(arg, kwarg)
525 b += self.packers[i].pack(arg, kwargs)
529 def unpack(self, data, offset=0, result=None, ntc=False):
530 # Return a list of arguments
533 for p in self.packers:
534 x, size = p.unpack(data, offset, result, ntc)
535 if type(x) is tuple and len(x) == 1:
540 t = self.tuple._make(result)
542 t = conversion_unpacker(t, self.name)
546 class VPPMessage(VPPType):