2 # Copyright (c) 2018 Cisco and/or its affiliates.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
20 from . import vpp_format
24 if sys.version_info <= (3, 4):
25 from aenum import IntEnum
27 from enum import IntEnum
29 if sys.version_info <= (3, 6):
30 from aenum import IntFlag
32 from enum import IntFlag
36 # Set log-level in application by doing e.g.:
37 # logger = logging.getLogger('vpp_serializer')
38 # logger.setLevel(logging.DEBUG)
40 logger = logging.getLogger(__name__)
42 if sys.version[0] == '2':
43 def check(d): type(d) is dict
45 def check(d): type(d) is dict or type(d) is bytes
48 def conversion_required(data, field_type):
52 if type(data).__name__ in vpp_format.conversion_table[field_type]:
58 def conversion_packer(data, field_type):
59 t = type(data).__name__
60 return types[field_type].pack(vpp_format.
61 conversion_table[field_type][t](data))
64 def conversion_unpacker(data, field_type):
65 if field_type not in vpp_format.conversion_unpacker_table:
67 return vpp_format.conversion_unpacker_table[field_type](data)
70 class BaseTypes(object):
71 def __init__(self, type, elements=0, options=None):
72 base_types = {'u8': '>B',
82 if elements > 0 and (type == 'u8' or type == 'string'):
83 self.packer = struct.Struct('>%ss' % elements)
85 self.packer = struct.Struct(base_types[type])
86 self.size = self.packer.size
87 self.options = options
89 def __call__(self, args):
93 def pack(self, data, kwargs=None):
94 if not data: # Default to zero if not specified
95 if self.options and 'default' in self.options:
96 data = self.options['default']
99 return self.packer.pack(data)
101 def unpack(self, data, offset, result=None, ntc=False):
102 return self.packer.unpack_from(data, offset)[0], self.packer.size
105 class String(object):
106 def __init__(self, options):
109 self.length_field_packer = BaseTypes('u32')
110 self.limit = options['limit'] if 'limit' in options else None
112 def pack(self, list, kwargs=None):
114 return self.length_field_packer.pack(0) + b""
115 if self.limit and len(list) > self.limit:
116 raise VPPSerializerValueError(
117 "Invalid argument length for: {}, {} maximum {}".
118 format(list, len(list), self.limit))
120 return self.length_field_packer.pack(len(list)) + list.encode('utf8')
122 def unpack(self, data, offset=0, result=None, ntc=False):
123 length, length_field_size = self.length_field_packer.unpack(data,
127 p = BaseTypes('u8', length)
128 x, size = p.unpack(data, offset + length_field_size)
129 x2 = x.split(b'\0', 1)[0]
130 return (x2.decode('utf8'), size + length_field_size)
133 types = {'u8': BaseTypes('u8'), 'u16': BaseTypes('u16'),
134 'u32': BaseTypes('u32'), 'i32': BaseTypes('i32'),
135 'u64': BaseTypes('u64'), 'f64': BaseTypes('f64'),
136 'bool': BaseTypes('bool'), 'string': String}
139 def vpp_get_type(name):
146 class VPPSerializerValueError(ValueError):
150 class FixedList_u8(object):
151 def __init__(self, name, field_type, num):
154 self.packer = BaseTypes(field_type, num)
155 self.size = self.packer.size
156 self.field_type = field_type
158 def __call__(self, args):
162 def pack(self, data, kwargs=None):
163 """Packs a fixed length bytestring. Left-pads with zeros
164 if input data is too short."""
166 return b'\x00' * self.size
168 if len(data) > self.num:
169 raise VPPSerializerValueError(
170 'Fixed list length error for "{}", got: {}'
172 .format(self.name, len(data), self.num))
174 return self.packer.pack(data)
176 def unpack(self, data, offset=0, result=None, ntc=False):
177 if len(data[offset:]) < self.num:
178 raise VPPSerializerValueError(
179 'Invalid array length for "{}" got {}'
181 .format(self.name, len(data[offset:]), self.num))
182 if self.field_type == 'string':
183 s = self.packer.unpack(data, offset)
184 s2 = s[0].split(b'\0', 1)[0]
185 return (s2.decode('utf-8'), self.num)
186 return self.packer.unpack(data, offset)
189 class FixedList(object):
190 def __init__(self, name, field_type, num):
192 self.packer = types[field_type]
193 self.size = self.packer.size * num
195 self.field_type = field_type
197 def __call__(self, args):
201 def pack(self, list, kwargs):
202 if len(list) != self.num:
203 raise VPPSerializerValueError(
204 'Fixed list length error, got: {} expected: {}'
205 .format(len(list), self.num))
208 b += self.packer.pack(e)
211 def unpack(self, data, offset=0, result=None, ntc=False):
212 # Return a list of arguments
215 for e in range(self.num):
216 x, size = self.packer.unpack(data, offset, ntc=ntc)
223 class VLAList(object):
224 def __init__(self, name, field_type, len_field_name, index):
226 self.field_type = field_type
228 self.packer = types[field_type]
229 self.size = self.packer.size
230 self.length_field = len_field_name
232 def __call__(self, args):
236 def pack(self, list, kwargs=None):
239 if len(list) != kwargs[self.length_field]:
240 raise VPPSerializerValueError(
241 'Variable length error, got: {} expected: {}'
242 .format(len(list), kwargs[self.length_field]))
247 if self.packer.size == 1:
248 return bytearray(list)
251 b += self.packer.pack(e)
254 def unpack(self, data, offset=0, result=None, ntc=False):
255 # Return a list of arguments
259 if self.packer.size == 1:
260 if result[self.index] == 0:
262 p = BaseTypes('u8', result[self.index])
263 return p.unpack(data, offset, ntc=ntc)
266 for e in range(result[self.index]):
267 x, size = self.packer.unpack(data, offset, ntc=ntc)
274 class VLAList_legacy():
275 def __init__(self, name, field_type):
276 self.packer = types[field_type]
277 self.size = self.packer.size
279 def __call__(self, args):
283 def pack(self, list, kwargs=None):
284 if self.packer.size == 1:
289 b += self.packer.pack(e)
292 def unpack(self, data, offset=0, result=None, ntc=False):
294 # Return a list of arguments
295 if (len(data) - offset) % self.packer.size:
296 raise VPPSerializerValueError(
297 'Legacy Variable Length Array length mismatch.')
298 elements = int((len(data) - offset) / self.packer.size)
300 for e in range(elements):
301 x, size = self.packer.unpack(data, offset, ntc=ntc)
303 offset += self.packer.size
308 class VPPEnumType(object):
309 def __init__(self, name, msgdef):
310 self.size = types['u32'].size
311 self.enumtype = 'u32'
314 if type(f) is dict and 'enumtype' in f:
315 if f['enumtype'] != 'u32':
316 self.size = types[f['enumtype']].size
317 self.enumtype = f['enumtype']
320 e_hash[ename] = evalue
321 self.enum = IntFlag(name, e_hash)
324 def __call__(self, args):
328 def __getattr__(self, name):
329 return self.enum[name]
331 def __nonzero__(self):
334 def pack(self, data, kwargs=None):
335 return types[self.enumtype].pack(data)
337 def unpack(self, data, offset=0, result=None, ntc=False):
338 x, size = types[self.enumtype].unpack(data, offset)
339 return self.enum(x), size
342 class VPPUnionType(object):
343 def __init__(self, name, msgdef):
348 self.packers = collections.OrderedDict()
349 for i, f in enumerate(msgdef):
350 if type(f) is dict and 'crc' in f:
354 if f_type not in types:
355 logger.debug('Unknown union type {}'.format(f_type))
356 raise VPPSerializerValueError(
357 'Unknown message type {}'.format(f_type))
358 fields.append(f_name)
359 size = types[f_type].size
360 self.packers[f_name] = types[f_type]
366 self.tuple = collections.namedtuple(name, fields, rename=True)
368 def __call__(self, args):
372 # Union of variable length?
373 def pack(self, data, kwargs=None):
375 return b'\x00' * self.size
377 for k, v in data.items():
378 logger.debug("Key: {} Value: {}".format(k, v))
379 b = self.packers[k].pack(v, kwargs)
381 r = bytearray(self.size)
385 def unpack(self, data, offset=0, result=None, ntc=False):
388 for k, p in self.packers.items():
389 x, size = p.unpack(data, offset, ntc=ntc)
393 return self.tuple._make(r), maxsize
396 class VPPTypeAlias(object):
397 def __init__(self, name, msgdef):
399 t = vpp_get_type(msgdef['type'])
402 if 'length' in msgdef:
403 if msgdef['length'] == 0:
405 if msgdef['type'] == 'u8':
406 self.packer = FixedList_u8(name, msgdef['type'],
408 self.size = self.packer.size
410 self.packer = FixedList(name, msgdef['type'], msgdef['length'])
417 def __call__(self, args):
421 def pack(self, data, kwargs=None):
422 if data and conversion_required(data, self.name):
424 return conversion_packer(data, self.name)
425 # Python 2 and 3 raises different exceptions from inet_pton
426 except(OSError, socket.error, TypeError):
429 return self.packer.pack(data, kwargs)
431 def unpack(self, data, offset=0, result=None, ntc=False):
432 t, size = self.packer.unpack(data, offset, result, ntc=ntc)
434 return conversion_unpacker(t, self.name), size
438 class VPPType(object):
439 # Set everything up to be able to pack / unpack
440 def __init__(self, name, msgdef):
446 self.field_by_name = {}
448 for i, f in enumerate(msgdef):
449 if type(f) is dict and 'crc' in f:
452 f_type, f_name = f[:2]
453 self.fields.append(f_name)
454 self.field_by_name[f_name] = None
455 self.fieldtypes.append(f_type)
456 if f_type not in types:
457 logger.debug('Unknown type {}'.format(f_type))
458 raise VPPSerializerValueError(
459 'Unknown message type {}'.format(f_type))
462 options = [x for x in f if type(x) is dict]
464 self.options = options[0]
468 if fieldlen == 3: # list
470 if list_elements == 0:
471 p = VLAList_legacy(f_name, f_type)
472 self.packers.append(p)
473 elif f_type == 'u8' or f_type == 'string':
474 p = FixedList_u8(f_name, f_type, list_elements)
475 self.packers.append(p)
478 p = FixedList(f_name, f_type, list_elements)
479 self.packers.append(p)
481 elif fieldlen == 4: # Variable length list
482 length_index = self.fields.index(f[3])
483 p = VLAList(f_name, f_type, f[3], length_index)
484 self.packers.append(p)
486 p = types[f_type](self.options)
487 self.packers.append(p)
491 self.tuple = collections.namedtuple(name, self.fields, rename=True)
494 def __call__(self, args):
498 def pack(self, data, kwargs=None):
503 # Try one of the format functions
504 if data and conversion_required(data, self.name):
505 return conversion_packer(data, self.name)
507 for i, a in enumerate(self.fields):
508 if data and type(data) is not dict and a not in data:
509 raise VPPSerializerValueError(
510 "Invalid argument: {} expected {}.{}".
511 format(data, self.name, a))
513 # Defaulting to zero.
514 if not data or a not in data: # Default to 0
516 kwarg = None # No default for VLA
519 kwarg = kwargs[a] if a in kwargs else None
520 if isinstance(self.packers[i], VPPType):
521 b += self.packers[i].pack(arg, kwarg)
523 b += self.packers[i].pack(arg, kwargs)
527 def unpack(self, data, offset=0, result=None, ntc=False):
528 # Return a list of arguments
531 for p in self.packers:
532 x, size = p.unpack(data, offset, result, ntc)
533 if type(x) is tuple and len(x) == 1:
538 t = self.tuple._make(result)
540 t = conversion_unpacker(t, self.name)
544 class VPPMessage(VPPType):