PAPI: Expose API enums to tests / applications
[vpp.git] / src / vpp-api / python / vpp_papi / vpp_serializer.py
1 #
2 # Copyright (c) 2018 Cisco and/or its affiliates.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 #
15
16 import struct
17 import collections
18 from enum import IntEnum
19 import logging
20
21 #
22 # Set log-level in application by doing e.g.:
23 # logger = logging.getLogger('vpp_serializer')
24 # logger.setLevel(logging.DEBUG)
25 #
26 logger = logging.getLogger(__name__)
27
28
29 class BaseTypes():
30     def __init__(self, type, elements=0):
31         base_types = {'u8': '>B',
32                       'u16': '>H',
33                       'u32': '>I',
34                       'i32': '>i',
35                       'u64': '>Q',
36                       'f64': '>d',
37                       'bool': '>?',
38                       'header': '>HI'}
39
40         if elements > 0 and type == 'u8':
41             self.packer = struct.Struct('>%ss' % elements)
42         else:
43             self.packer = struct.Struct(base_types[type])
44         self.size = self.packer.size
45         logger.debug('Adding {} with format: {}'
46                      .format(type, base_types[type]))
47
48     def pack(self, data, kwargs=None):
49         return self.packer.pack(data)
50
51     def unpack(self, data, offset, result=None):
52         return self.packer.unpack_from(data, offset)[0], self.packer.size
53
54
55 types = {}
56 types['u8'] = BaseTypes('u8')
57 types['u16'] = BaseTypes('u16')
58 types['u32'] = BaseTypes('u32')
59 types['i32'] = BaseTypes('i32')
60 types['u64'] = BaseTypes('u64')
61 types['f64'] = BaseTypes('f64')
62 types['bool'] = BaseTypes('bool')
63
64
65 def vpp_get_type(name):
66     try:
67         return types[name]
68     except KeyError:
69         return None
70
71
72 class FixedList_u8():
73     def __init__(self, name, field_type, num):
74         self.name = name
75         self.num = num
76         self.packer = BaseTypes(field_type, num)
77         self.size = self.packer.size
78
79     def pack(self, list, kwargs):
80         """Packs a fixed length bytestring. Left-pads with zeros
81         if input data is too short."""
82         if len(list) > self.num:
83             raise ValueError('Fixed list length error for "{}", got: {}'
84                              ' expected: {}'
85                              .format(self.name, len(list), self.num))
86         return self.packer.pack(list)
87
88     def unpack(self, data, offset=0, result=None):
89         if len(data[offset:]) < self.num:
90             raise ValueError('Invalid array length for "{}" got {}'
91                              ' expected {}'
92                              .format(self.name, len(data[offset:]), self.num))
93         return self.packer.unpack(data, offset)
94
95
96 class FixedList():
97     def __init__(self, name, field_type, num):
98         self.num = num
99         self.packer = types[field_type]
100         self.size = self.packer.size * num
101
102     def pack(self, list, kwargs):
103         if len(list) != self.num:
104             raise ValueError('Fixed list length error, got: {} expected: {}'
105                              .format(len(list), self.num))
106         b = bytes()
107         for e in list:
108             b += self.packer.pack(e)
109         return b
110
111     def unpack(self, data, offset=0, result=None):
112         # Return a list of arguments
113         result = []
114         total = 0
115         for e in range(self.num):
116             x, size = self.packer.unpack(data, offset)
117             result.append(x)
118             offset += size
119             total += size
120         return result, total
121
122
123 class VLAList():
124     def __init__(self, name, field_type, len_field_name, index):
125         self.name = name
126         self.index = index
127         self.packer = types[field_type]
128         self.size = self.packer.size
129         self.length_field = len_field_name
130
131     def pack(self, list, kwargs=None):
132         if len(list) != kwargs[self.length_field]:
133             raise ValueError('Variable length error, got: {} expected: {}'
134                              .format(len(list), kwargs[self.length_field]))
135         b = bytes()
136
137         # u8 array
138         if self.packer.size == 1:
139             return bytearray(list)
140
141         for e in list:
142             b += self.packer.pack(e)
143         return b
144
145     def unpack(self, data, offset=0, result=None):
146         # Return a list of arguments
147         total = 0
148
149         # u8 array
150         if self.packer.size == 1:
151             if result[self.index] == 0:
152                 return b'', 0
153             p = BaseTypes('u8', result[self.index])
154             return p.unpack(data, offset)
155
156         r = []
157         for e in range(result[self.index]):
158             x, size = self.packer.unpack(data, offset)
159             r.append(x)
160             offset += size
161             total += size
162         return r, total
163
164
165 class VLAList_legacy():
166     def __init__(self, name, field_type):
167         self.packer = types[field_type]
168         self.size = self.packer.size
169
170     def pack(self, list, kwargs=None):
171         if self.packer.size == 1:
172             return bytes(list)
173
174         b = bytes()
175         for e in list:
176             b += self.packer.pack(e)
177         return b
178
179     def unpack(self, data, offset=0, result=None):
180         total = 0
181         # Return a list of arguments
182         if (len(data) - offset) % self.packer.size:
183             raise ValueError('Legacy Variable Length Array length mismatch.')
184         elements = int((len(data) - offset) / self.packer.size)
185         r = []
186         for e in range(elements):
187             x, size = self.packer.unpack(data, offset)
188             r.append(x)
189             offset += self.packer.size
190             total += size
191         return r, total
192
193
194 class VPPEnumType():
195     def __init__(self, name, msgdef):
196         self.size = types['u32'].size
197         e_hash = {}
198         for f in msgdef:
199             if type(f) is dict and 'enumtype' in f:
200                 if f['enumtype'] != 'u32':
201                     raise NotImplementedError
202                 continue
203             ename, evalue = f
204             e_hash[ename] = evalue
205         self.enum = IntEnum(name, e_hash)
206         types[name] = self
207         logger.debug('Adding enum {}'.format(name))
208
209     def __getattr__(self, name):
210         return self.enum[name]
211
212     def __nonzero__(self):
213         return True
214
215     def pack(self, data, kwargs=None):
216         return types['u32'].pack(data, kwargs)
217
218     def unpack(self, data, offset=0, result=None):
219         x, size = types['u32'].unpack(data, offset)
220         return self.enum(x), size
221
222
223 class VPPUnionType():
224     def __init__(self, name, msgdef):
225         self.name = name
226         self.size = 0
227         self.maxindex = 0
228         fields = []
229         self.packers = collections.OrderedDict()
230         for i, f in enumerate(msgdef):
231             if type(f) is dict and 'crc' in f:
232                 self.crc = f['crc']
233                 continue
234             f_type, f_name = f
235             if f_type not in types:
236                 logger.debug('Unknown union type {}'.format(f_type))
237                 raise ValueError('Unknown message type {}'.format(f_type))
238             fields.append(f_name)
239             size = types[f_type].size
240             self.packers[f_name] = types[f_type]
241             if size > self.size:
242                 self.size = size
243                 self.maxindex = i
244
245         types[name] = self
246         self.tuple = collections.namedtuple(name, fields, rename=True)
247         logger.debug('Adding union {}'.format(name))
248
249     def pack(self, data, kwargs=None):
250         for k, v in data.items():
251             logger.debug("Key: {} Value: {}".format(k, v))
252             b = self.packers[k].pack(v, kwargs)
253             break
254         r = bytearray(self.size)
255         r[:len(b)] = b
256         return r
257
258     def unpack(self, data, offset=0, result=None):
259         r = []
260         maxsize = 0
261         for k, p in self.packers.items():
262             x, size = p.unpack(data, offset)
263             if size > maxsize:
264                 maxsize = size
265             r.append(x)
266         return self.tuple._make(r), maxsize
267
268
269 class VPPType():
270     # Set everything up to be able to pack / unpack
271     def __init__(self, name, msgdef):
272         self.name = name
273         self.msgdef = msgdef
274         self.packers = []
275         self.fields = []
276         self.fieldtypes = []
277         self.field_by_name = {}
278         size = 0
279         for i, f in enumerate(msgdef):
280             if type(f) is dict and 'crc' in f:
281                 self.crc = f['crc']
282                 continue
283             f_type, f_name = f[:2]
284             self.fields.append(f_name)
285             self.field_by_name[f_name] = None
286             self.fieldtypes.append(f_type)
287             if f_type not in types:
288                 logger.debug('Unknown type {}'.format(f_type))
289                 raise ValueError('Unknown message type {}'.format(f_type))
290             if len(f) == 3:  # list
291                 list_elements = f[2]
292                 if list_elements == 0:
293                     p = VLAList_legacy(f_name, f_type)
294                     self.packers.append(p)
295                 elif f_type == 'u8':
296                     p = FixedList_u8(f_name, f_type, list_elements)
297                     self.packers.append(p)
298                     size += p.size
299                 else:
300                     p = FixedList(f_name, f_type, list_elements)
301                     self.packers.append(p)
302                     size += p.size
303             elif len(f) == 4:  # Variable length list
304                     # Find index of length field
305                     length_index = self.fields.index(f[3])
306                     p = VLAList(f_name, f_type, f[3], length_index)
307                     self.packers.append(p)
308             else:
309                 self.packers.append(types[f_type])
310                 size += types[f_type].size
311
312         self.size = size
313         self.tuple = collections.namedtuple(name, self.fields, rename=True)
314         types[name] = self
315         logger.debug('Adding type {}'.format(name))
316
317     def pack(self, data, kwargs=None):
318         if not kwargs:
319             kwargs = data
320         b = bytes()
321         for i, a in enumerate(self.fields):
322             if a not in data:
323                 b += b'\x00' * self.packers[i].size
324                 continue
325
326             if isinstance(self.packers[i], VPPType):
327                 b += self.packers[i].pack(data[a], kwargs[a])
328             else:
329                 b += self.packers[i].pack(data[a], kwargs)
330         return b
331
332     def unpack(self, data, offset=0, result=None):
333         # Return a list of arguments
334         result = []
335         total = 0
336         for p in self.packers:
337             x, size = p.unpack(data, offset, result)
338             if type(x) is tuple and len(x) == 1:
339                 x = x[0]
340             result.append(x)
341             offset += size
342             total += size
343         t = self.tuple._make(result)
344         return t, total
345
346
347 class VPPMessage(VPPType):
348     pass