2 # Copyright (c) 2018 Cisco and/or its affiliates.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
19 if sys.version[0] == '2':
20 from aenum import IntEnum, IntFlag
22 from enum import IntEnum, IntFlag
24 from . import vpp_format
30 # Set log-level in application by doing e.g.:
31 # logger = logging.getLogger('vpp_serializer')
32 # logger.setLevel(logging.DEBUG)
34 logger = logging.getLogger(__name__)
36 if sys.version[0] == '2':
37 def check(d): type(d) is dict
39 def check(d): type(d) is dict or type(d) is bytes
42 def conversion_required(data, field_type):
46 if type(data).__name__ in vpp_format.conversion_table[field_type]:
52 def conversion_packer(data, field_type):
53 t = type(data).__name__
54 return types[field_type].pack(vpp_format.
55 conversion_table[field_type][t](data))
58 def conversion_unpacker(data, field_type):
59 if field_type not in vpp_format.conversion_unpacker_table:
61 return vpp_format.conversion_unpacker_table[field_type](data)
64 class BaseTypes(object):
65 def __init__(self, type, elements=0):
66 base_types = {'u8': '>B',
76 if elements > 0 and (type == 'u8' or type == 'string'):
77 self.packer = struct.Struct('>%ss' % elements)
79 self.packer = struct.Struct(base_types[type])
80 self.size = self.packer.size
82 def pack(self, data, kwargs=None):
83 if not data: # Default to zero if not specified
85 return self.packer.pack(data)
87 def unpack(self, data, offset, result=None, ntc=False):
88 return self.packer.unpack_from(data, offset)[0], self.packer.size
95 self.length_field_packer = BaseTypes('u32')
97 def pack(self, list, kwargs=None):
99 return self.length_field_packer.pack(0) + b""
100 return self.length_field_packer.pack(len(list)) + list.encode('utf8')
102 def unpack(self, data, offset=0, result=None, ntc=False):
103 length, length_field_size = self.length_field_packer.unpack(data,
107 p = BaseTypes('u8', length)
108 x, size = p.unpack(data, offset + length_field_size)
109 x2 = x.split(b'\0', 1)[0]
110 return (x2.decode('utf8'), size + length_field_size)
113 types = {'u8': BaseTypes('u8'), 'u16': BaseTypes('u16'),
114 'u32': BaseTypes('u32'), 'i32': BaseTypes('i32'),
115 'u64': BaseTypes('u64'), 'f64': BaseTypes('f64'),
116 'bool': BaseTypes('bool'), 'string': String()}
119 def vpp_get_type(name):
126 class VPPSerializerValueError(ValueError):
130 class FixedList_u8(object):
131 def __init__(self, name, field_type, num):
134 self.packer = BaseTypes(field_type, num)
135 self.size = self.packer.size
136 self.field_type = field_type
138 def pack(self, data, kwargs=None):
139 """Packs a fixed length bytestring. Left-pads with zeros
140 if input data is too short."""
142 return b'\x00' * self.size
144 if len(data) > self.num:
145 raise VPPSerializerValueError(
146 'Fixed list length error for "{}", got: {}'
148 .format(self.name, len(data), self.num))
150 return self.packer.pack(data)
152 def unpack(self, data, offset=0, result=None, ntc=False):
153 if len(data[offset:]) < self.num:
154 raise VPPSerializerValueError(
155 'Invalid array length for "{}" got {}'
157 .format(self.name, len(data[offset:]), self.num))
158 if self.field_type == 'string':
159 s = self.packer.unpack(data, offset)
160 s2 = s[0].split(b'\0', 1)[0]
161 return (s2.decode('utf-8'), self.num)
162 return self.packer.unpack(data, offset)
165 class FixedList(object):
166 def __init__(self, name, field_type, num):
168 self.packer = types[field_type]
169 self.size = self.packer.size * num
171 self.field_type = field_type
173 def pack(self, list, kwargs):
174 if len(list) != self.num:
175 raise VPPSerializerValueError(
176 'Fixed list length error, got: {} expected: {}'
177 .format(len(list), self.num))
180 b += self.packer.pack(e)
183 def unpack(self, data, offset=0, result=None, ntc=False):
184 # Return a list of arguments
187 for e in range(self.num):
188 x, size = self.packer.unpack(data, offset, ntc=ntc)
195 class VLAList(object):
196 def __init__(self, name, field_type, len_field_name, index):
198 self.field_type = field_type
200 self.packer = types[field_type]
201 self.size = self.packer.size
202 self.length_field = len_field_name
204 def pack(self, list, kwargs=None):
207 if len(list) != kwargs[self.length_field]:
208 raise VPPSerializerValueError(
209 'Variable length error, got: {} expected: {}'
210 .format(len(list), kwargs[self.length_field]))
215 if self.packer.size == 1:
216 return bytearray(list)
219 b += self.packer.pack(e)
222 def unpack(self, data, offset=0, result=None, ntc=False):
223 # Return a list of arguments
227 if self.packer.size == 1:
228 if result[self.index] == 0:
230 p = BaseTypes('u8', result[self.index])
231 return p.unpack(data, offset, ntc=ntc)
234 for e in range(result[self.index]):
235 x, size = self.packer.unpack(data, offset, ntc=ntc)
242 class VLAList_legacy():
243 def __init__(self, name, field_type):
244 self.packer = types[field_type]
245 self.size = self.packer.size
247 def pack(self, list, kwargs=None):
248 if self.packer.size == 1:
253 b += self.packer.pack(e)
256 def unpack(self, data, offset=0, result=None, ntc=False):
258 # Return a list of arguments
259 if (len(data) - offset) % self.packer.size:
260 raise VPPSerializerValueError(
261 'Legacy Variable Length Array length mismatch.')
262 elements = int((len(data) - offset) / self.packer.size)
264 for e in range(elements):
265 x, size = self.packer.unpack(data, offset, ntc=ntc)
267 offset += self.packer.size
272 class VPPEnumType(object):
273 def __init__(self, name, msgdef):
274 self.size = types['u32'].size
277 if type(f) is dict and 'enumtype' in f:
278 if f['enumtype'] != 'u32':
279 raise NotImplementedError
282 e_hash[ename] = evalue
283 self.enum = IntFlag(name, e_hash)
286 def __getattr__(self, name):
287 return self.enum[name]
289 def __nonzero__(self):
292 def pack(self, data, kwargs=None):
293 return types['u32'].pack(data)
295 def unpack(self, data, offset=0, result=None, ntc=False):
296 x, size = types['u32'].unpack(data, offset)
297 return self.enum(x), size
300 class VPPUnionType(object):
301 def __init__(self, name, msgdef):
306 self.packers = collections.OrderedDict()
307 for i, f in enumerate(msgdef):
308 if type(f) is dict and 'crc' in f:
312 if f_type not in types:
313 logger.debug('Unknown union type {}'.format(f_type))
314 raise VPPSerializerValueError(
315 'Unknown message type {}'.format(f_type))
316 fields.append(f_name)
317 size = types[f_type].size
318 self.packers[f_name] = types[f_type]
324 self.tuple = collections.namedtuple(name, fields, rename=True)
326 # Union of variable length?
327 def pack(self, data, kwargs=None):
329 return b'\x00' * self.size
331 for k, v in data.items():
332 logger.debug("Key: {} Value: {}".format(k, v))
333 b = self.packers[k].pack(v, kwargs)
335 r = bytearray(self.size)
339 def unpack(self, data, offset=0, result=None, ntc=False):
342 for k, p in self.packers.items():
343 x, size = p.unpack(data, offset, ntc=ntc)
347 return self.tuple._make(r), maxsize
350 class VPPTypeAlias(object):
351 def __init__(self, name, msgdef):
353 t = vpp_get_type(msgdef['type'])
356 if 'length' in msgdef:
357 if msgdef['length'] == 0:
359 if msgdef['type'] == 'u8':
360 self.packer = FixedList_u8(name, msgdef['type'],
362 self.size = self.packer.size
364 self.packer = FixedList(name, msgdef['type'], msgdef['length'])
371 def pack(self, data, kwargs=None):
372 if data and conversion_required(data, self.name):
374 return conversion_packer(data, self.name)
375 # Python 2 and 3 raises different exceptions from inet_pton
376 except(OSError, socket.error, TypeError):
379 return self.packer.pack(data, kwargs)
381 def unpack(self, data, offset=0, result=None, ntc=False):
382 t, size = self.packer.unpack(data, offset, result, ntc=ntc)
384 return conversion_unpacker(t, self.name), size
388 class VPPType(object):
389 # Set everything up to be able to pack / unpack
390 def __init__(self, name, msgdef):
396 self.field_by_name = {}
398 for i, f in enumerate(msgdef):
399 if type(f) is dict and 'crc' in f:
402 f_type, f_name = f[:2]
403 self.fields.append(f_name)
404 self.field_by_name[f_name] = None
405 self.fieldtypes.append(f_type)
406 if f_type not in types:
407 logger.debug('Unknown type {}'.format(f_type))
408 raise VPPSerializerValueError(
409 'Unknown message type {}'.format(f_type))
410 if len(f) == 3: # list
412 if list_elements == 0:
413 p = VLAList_legacy(f_name, f_type)
414 self.packers.append(p)
415 elif f_type == 'u8' or f_type == 'string':
416 p = FixedList_u8(f_name, f_type, list_elements)
417 self.packers.append(p)
420 p = FixedList(f_name, f_type, list_elements)
421 self.packers.append(p)
423 elif len(f) == 4: # Variable length list
424 length_index = self.fields.index(f[3])
425 p = VLAList(f_name, f_type, f[3], length_index)
426 self.packers.append(p)
428 self.packers.append(types[f_type])
429 size += types[f_type].size
432 self.tuple = collections.namedtuple(name, self.fields, rename=True)
435 def pack(self, data, kwargs=None):
440 # Try one of the format functions
441 if data and conversion_required(data, self.name):
442 return conversion_packer(data, self.name)
444 for i, a in enumerate(self.fields):
445 if data and type(data) is not dict and a not in data:
446 raise VPPSerializerValueError(
447 "Invalid argument: {} expected {}.{}".
448 format(data, self.name, a))
450 # Defaulting to zero.
451 if not data or a not in data: # Default to 0
453 kwarg = None # No default for VLA
456 kwarg = kwargs[a] if a in kwargs else None
457 if isinstance(self.packers[i], VPPType):
458 b += self.packers[i].pack(arg, kwarg)
460 b += self.packers[i].pack(arg, kwargs)
464 def unpack(self, data, offset=0, result=None, ntc=False):
465 # Return a list of arguments
468 for p in self.packers:
469 x, size = p.unpack(data, offset, result, ntc)
470 if type(x) is tuple and len(x) == 1:
475 t = self.tuple._make(result)
477 t = conversion_unpacker(t, self.name)
481 class VPPMessage(VPPType):