2 # Copyright (c) 2018 Cisco and/or its affiliates.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
21 if sys.version_info <= (3, 4):
22 from aenum import IntEnum # noqa: F401
24 from enum import IntEnum # noqa: F401
26 if sys.version_info <= (3, 6):
27 from aenum import IntFlag # noqa: F401
30 from enum import IntFlag # noqa: F401
32 from . import vpp_format # noqa: E402
35 # Set log-level in application by doing e.g.:
36 # logger = logging.getLogger('vpp_serializer')
37 # logger.setLevel(logging.DEBUG)
39 logger = logging.getLogger(__name__)
41 if sys.version[0] == '2':
42 def check(d): type(d) is dict
44 def check(d): type(d) is dict or type(d) is bytes
47 def conversion_required(data, field_type):
51 if type(data).__name__ in vpp_format.conversion_table[field_type]:
57 def conversion_packer(data, field_type):
58 t = type(data).__name__
59 return types[field_type].pack(vpp_format.
60 conversion_table[field_type][t](data))
63 def conversion_unpacker(data, field_type):
64 if field_type not in vpp_format.conversion_unpacker_table:
66 return vpp_format.conversion_unpacker_table[field_type](data)
69 class BaseTypes(object):
70 def __init__(self, type, elements=0, options=None):
71 base_types = {'u8': '>B',
84 if elements > 0 and (type == 'u8' or type == 'string'):
85 self.packer = struct.Struct('>%ss' % elements)
87 self.packer = struct.Struct(base_types[type])
88 self.size = self.packer.size
89 self.options = options
91 def __call__(self, args):
95 def pack(self, data, kwargs=None):
96 if not data: # Default to zero if not specified
97 if self.options and 'default' in self.options:
98 data = self.options['default']
101 return self.packer.pack(data)
103 def unpack(self, data, offset, result=None, ntc=False):
104 return self.packer.unpack_from(data, offset)[0], self.packer.size
107 class String(object):
108 def __init__(self, name, num, options):
112 self.length_field_packer = BaseTypes('u32')
113 self.limit = options['limit'] if 'limit' in options else num
114 self.fixed = True if num else False
115 if self.fixed and not self.limit:
116 raise VPPSerializerValueError(
117 "Invalid argument length for: {}, {} maximum {}".
118 format(list, len(list), self.limit))
120 def pack(self, list, kwargs=None):
123 return b"\x00" * self.limit
124 return self.length_field_packer.pack(0) + b""
125 if self.limit and len(list) > self.limit - 1:
126 raise VPPSerializerValueError(
127 "Invalid argument length for: {}, {} maximum {}".
128 format(list, len(list), self.limit - 1))
130 return list.encode('ascii').ljust(self.limit, b'\x00')
131 return self.length_field_packer.pack(len(list)) + list.encode('ascii')
133 def unpack(self, data, offset=0, result=None, ntc=False):
135 p = BaseTypes('u8', self.num)
136 s = p.unpack(data, offset)
137 s2 = s[0].split(b'\0', 1)[0]
138 return (s2.decode('ascii'), self.num)
140 length, length_field_size = self.length_field_packer.unpack(data,
144 p = BaseTypes('u8', length)
145 x, size = p.unpack(data, offset + length_field_size)
146 #x2 = x.split(b'\0', 1)[0]
147 return (x.decode('ascii', errors='replace'), size + length_field_size)
150 types = {'u8': BaseTypes('u8'), 'u16': BaseTypes('u16'),
151 'u32': BaseTypes('u32'), 'i32': BaseTypes('i32'),
152 'u64': BaseTypes('u64'), 'f64': BaseTypes('f64'),
153 'bool': BaseTypes('bool'), 'string': String}
156 def vpp_get_type(name):
163 class VPPSerializerValueError(ValueError):
167 class FixedList_u8(object):
168 def __init__(self, name, field_type, num):
171 self.packer = BaseTypes(field_type, num)
172 self.size = self.packer.size
173 self.field_type = field_type
175 def __call__(self, args):
179 def pack(self, data, kwargs=None):
180 """Packs a fixed length bytestring. Left-pads with zeros
181 if input data is too short."""
183 return b'\x00' * self.size
185 if len(data) > self.num:
186 raise VPPSerializerValueError(
187 'Fixed list length error for "{}", got: {}'
189 .format(self.name, len(data), self.num))
192 return self.packer.pack(data)
194 raise VPPSerializerValueError(
195 'Packing failed for "{}" {}'
196 .format(self.name, kwargs))
197 def unpack(self, data, offset=0, result=None, ntc=False):
198 if len(data[offset:]) < self.num:
199 raise VPPSerializerValueError(
200 'Invalid array length for "{}" got {}'
202 .format(self.name, len(data[offset:]), self.num))
203 return self.packer.unpack(data, offset)
206 class FixedList(object):
207 def __init__(self, name, field_type, num):
209 self.packer = types[field_type]
210 self.size = self.packer.size * num
212 self.field_type = field_type
214 def __call__(self, args):
218 def pack(self, list, kwargs):
219 if len(list) != self.num:
220 raise VPPSerializerValueError(
221 'Fixed list length error, got: {} expected: {}'
222 .format(len(list), self.num))
225 b += self.packer.pack(e)
228 def unpack(self, data, offset=0, result=None, ntc=False):
229 # Return a list of arguments
232 for e in range(self.num):
233 x, size = self.packer.unpack(data, offset, ntc=ntc)
240 class VLAList(object):
241 def __init__(self, name, field_type, len_field_name, index):
243 self.field_type = field_type
245 self.packer = types[field_type]
246 self.size = self.packer.size
247 self.length_field = len_field_name
249 def __call__(self, args):
253 def pack(self, lst, kwargs=None):
256 if len(lst) != kwargs[self.length_field]:
257 raise VPPSerializerValueError(
258 'Variable length error, got: {} expected: {}'
259 .format(len(lst), kwargs[self.length_field]))
262 if self.packer.size == 1:
263 if isinstance(lst, list):
269 b += self.packer.pack(e)
272 def unpack(self, data, offset=0, result=None, ntc=False):
273 # Return a list of arguments
277 if self.packer.size == 1:
278 if result[self.index] == 0:
280 p = BaseTypes('u8', result[self.index])
281 return p.unpack(data, offset, ntc=ntc)
284 for e in range(result[self.index]):
285 x, size = self.packer.unpack(data, offset, ntc=ntc)
292 class VLAList_legacy():
293 def __init__(self, name, field_type):
294 self.packer = types[field_type]
295 self.size = self.packer.size
297 def __call__(self, args):
301 def pack(self, list, kwargs=None):
302 if self.packer.size == 1:
307 b += self.packer.pack(e)
310 def unpack(self, data, offset=0, result=None, ntc=False):
312 # Return a list of arguments
313 if (len(data) - offset) % self.packer.size:
314 raise VPPSerializerValueError(
315 'Legacy Variable Length Array length mismatch.')
316 elements = int((len(data) - offset) / self.packer.size)
318 for e in range(elements):
319 x, size = self.packer.unpack(data, offset, ntc=ntc)
321 offset += self.packer.size
326 class VPPEnumType(object):
327 def __init__(self, name, msgdef):
328 self.size = types['u32'].size
329 self.enumtype = 'u32'
332 if type(f) is dict and 'enumtype' in f:
333 if f['enumtype'] != 'u32':
334 self.size = types[f['enumtype']].size
335 self.enumtype = f['enumtype']
338 e_hash[ename] = evalue
339 self.enum = IntFlag(name, e_hash)
342 def __call__(self, args):
346 def __getattr__(self, name):
347 return self.enum[name]
352 if sys.version[0] == '2':
353 __nonzero__ = __bool__
355 def pack(self, data, kwargs=None):
356 return types[self.enumtype].pack(data)
358 def unpack(self, data, offset=0, result=None, ntc=False):
359 x, size = types[self.enumtype].unpack(data, offset)
360 return self.enum(x), size
363 class VPPUnionType(object):
364 def __init__(self, name, msgdef):
369 self.packers = collections.OrderedDict()
370 for i, f in enumerate(msgdef):
371 if type(f) is dict and 'crc' in f:
375 if f_type not in types:
376 logger.debug('Unknown union type {}'.format(f_type))
377 raise VPPSerializerValueError(
378 'Unknown message type {}'.format(f_type))
379 fields.append(f_name)
380 size = types[f_type].size
381 self.packers[f_name] = types[f_type]
387 self.tuple = collections.namedtuple(name, fields, rename=True)
389 def __call__(self, args):
393 # Union of variable length?
394 def pack(self, data, kwargs=None):
396 return b'\x00' * self.size
398 for k, v in data.items():
399 logger.debug("Key: {} Value: {}".format(k, v))
400 b = self.packers[k].pack(v, kwargs)
402 r = bytearray(self.size)
406 def unpack(self, data, offset=0, result=None, ntc=False):
409 for k, p in self.packers.items():
410 x, size = p.unpack(data, offset, ntc=ntc)
414 return self.tuple._make(r), maxsize
417 class VPPTypeAlias(object):
418 def __init__(self, name, msgdef):
420 t = vpp_get_type(msgdef['type'])
422 raise ValueError('No such type: {}'.format(msgdef['type']))
423 if 'length' in msgdef:
424 if msgdef['length'] == 0:
426 if msgdef['type'] == 'u8':
427 self.packer = FixedList_u8(name, msgdef['type'],
429 self.size = self.packer.size
431 self.packer = FixedList(name, msgdef['type'], msgdef['length'])
437 self.toplevelconversion = False
439 def __call__(self, args):
443 def pack(self, data, kwargs=None):
444 if data and conversion_required(data, self.name):
446 return conversion_packer(data, self.name)
447 # Python 2 and 3 raises different exceptions from inet_pton
448 except(OSError, socket.error, TypeError):
451 return self.packer.pack(data, kwargs)
453 def unpack(self, data, offset=0, result=None, ntc=False):
454 if ntc == False and self.name in vpp_format.conversion_unpacker_table:
455 # Disable type conversion for dependent types
457 self.toplevelconversion = True
458 t, size = self.packer.unpack(data, offset, result, ntc=ntc)
459 if self.toplevelconversion:
460 self.toplevelconversion = False
461 return conversion_unpacker(t, self.name), size
465 class VPPType(object):
466 # Set everything up to be able to pack / unpack
467 def __init__(self, name, msgdef):
473 self.field_by_name = {}
475 for i, f in enumerate(msgdef):
476 if type(f) is dict and 'crc' in f:
479 f_type, f_name = f[:2]
480 self.fields.append(f_name)
481 self.field_by_name[f_name] = None
482 self.fieldtypes.append(f_type)
483 if f_type not in types:
484 logger.debug('Unknown type {}'.format(f_type))
485 raise VPPSerializerValueError(
486 'Unknown message type {}'.format(f_type))
489 options = [x for x in f if type(x) is dict]
491 self.options = options[0]
495 if fieldlen == 3: # list
497 if list_elements == 0:
498 if f_type == 'string':
499 p = String(f_name, 0, self.options)
501 p = VLAList_legacy(f_name, f_type)
502 self.packers.append(p)
504 p = FixedList_u8(f_name, f_type, list_elements)
505 self.packers.append(p)
507 elif f_type == 'string':
508 p = String(f_name, list_elements, self.options)
509 self.packers.append(p)
512 p = FixedList(f_name, f_type, list_elements)
513 self.packers.append(p)
515 elif fieldlen == 4: # Variable length list
516 length_index = self.fields.index(f[3])
517 p = VLAList(f_name, f_type, f[3], length_index)
518 self.packers.append(p)
520 p = types[f_type](self.options)
521 self.packers.append(p)
525 self.tuple = collections.namedtuple(name, self.fields, rename=True)
527 self.toplevelconversion = False
529 def __call__(self, args):
533 def pack(self, data, kwargs=None):
538 # Try one of the format functions
539 if data and conversion_required(data, self.name):
540 return conversion_packer(data, self.name)
542 for i, a in enumerate(self.fields):
543 if data and type(data) is not dict and a not in data:
544 raise VPPSerializerValueError(
545 "Invalid argument: {} expected {}.{}".
546 format(data, self.name, a))
548 # Defaulting to zero.
549 if not data or a not in data: # Default to 0
551 kwarg = None # No default for VLA
554 kwarg = kwargs[a] if a in kwargs else None
555 if isinstance(self.packers[i], VPPType):
556 b += self.packers[i].pack(arg, kwarg)
558 b += self.packers[i].pack(arg, kwargs)
562 def unpack(self, data, offset=0, result=None, ntc=False):
563 # Return a list of arguments
566 if ntc == False and self.name in vpp_format.conversion_unpacker_table:
567 # Disable type conversion for dependent types
569 self.toplevelconversion = True
571 for p in self.packers:
572 x, size = p.unpack(data, offset, result, ntc)
573 if type(x) is tuple and len(x) == 1:
578 t = self.tuple._make(result)
580 if self.toplevelconversion:
581 self.toplevelconversion = False
582 t = conversion_unpacker(t, self.name)
586 class VPPMessage(VPPType):