Python API: Add enum and union support.
[vpp.git] / src / vpp-api / python / vpp_papi / vpp_serializer.py
1 #
2 # Copyright (c) 2018 Cisco and/or its affiliates.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 #
15
16 import struct
17 import collections
18 from enum import IntEnum
19 import logging
20
21 #
22 # Set log-level in application by doing e.g.:
23 # logger = logging.getLogger('vpp_serializer')
24 # logger.setLevel(logging.DEBUG)
25 #
26 logger = logging.getLogger(__name__)
27
28
29 class BaseTypes():
30     def __init__(self, type, elements=0):
31         base_types = {'u8': '>B',
32                       'u16': '>H',
33                       'u32': '>I',
34                       'i32': '>i',
35                       'u64': '>Q',
36                       'f64': '>d',
37                       'header': '>HI'}
38
39         if elements > 0 and type == 'u8':
40             self.packer = struct.Struct('>%ss' % elements)
41         else:
42             self.packer = struct.Struct(base_types[type])
43         self.size = self.packer.size
44         logger.debug('Adding {} with format: {}'
45                      .format(type, base_types[type]))
46
47     def pack(self, data, kwargs=None):
48         logger.debug("Data: {} Format: {}".format(data, self.packer.format))
49         return self.packer.pack(data)
50
51     def unpack(self, data, offset, result=None):
52         logger.debug("@ {} Format: {}".format(offset, self.packer.format))
53         return self.packer.unpack_from(data, offset)[0]
54
55
56 types = {}
57 types['u8'] = BaseTypes('u8')
58 types['u16'] = BaseTypes('u16')
59 types['u32'] = BaseTypes('u32')
60 types['i32'] = BaseTypes('i32')
61 types['u64'] = BaseTypes('u64')
62 types['f64'] = BaseTypes('f64')
63
64
65 class FixedList_u8():
66     def __init__(self, name, field_type, num):
67         self.name = name
68         self.num = num
69         self.packer = BaseTypes(field_type, num)
70         self.size = self.packer.size
71
72     def pack(self, list, kwargs):
73         """Packs a fixed length bytestring. Left-pads with zeros
74         if input data is too short."""
75         logger.debug("Data: {}".format(list))
76
77         if len(list) > self.num:
78             raise ValueError('Fixed list length error for "{}", got: {}'
79                              ' expected: {}'
80                              .format(self.name, len(list), self.num))
81         return self.packer.pack(list)
82
83     def unpack(self, data, offset=0, result=None):
84         if len(data[offset:]) < self.num:
85             raise ValueError('Invalid array length for "{}" got {}'
86                              ' expected {}'
87                              .format(self.name, len(data), self.num))
88         return self.packer.unpack(data, offset)
89
90
91 class FixedList():
92     def __init__(self, name, field_type, num):
93         self.num = num
94         self.packer = types[field_type]
95         self.size = self.packer.size * num
96
97     def pack(self, list, kwargs):
98         logger.debug("Data: {}".format(list))
99
100         if len(list) != self.num:
101             raise ValueError('Fixed list length error, got: {} expected: {}'
102                              .format(len(list), self.num))
103         b = bytes()
104         for e in list:
105             b += self.packer.pack(e)
106         return b
107
108     def unpack(self, data, offset=0, result=None):
109         # Return a list of arguments
110         result = []
111         for e in range(self.num):
112             x = self.packer.unpack(data, offset)
113             result.append(x)
114             offset += self.packer.size
115         return result
116
117
118 class VLAList():
119     def __init__(self, name, field_type, len_field_name, index):
120         self.index = index
121         self.packer = types[field_type]
122         self.size = self.packer.size
123         self.length_field = len_field_name
124
125     def pack(self, list, kwargs=None):
126         logger.debug("Data: {}".format(list))
127         if len(list) != kwargs[self.length_field]:
128             raise ValueError('Variable length error, got: {} expected: {}'
129                              .format(len(list), kwargs[self.length_field]))
130         b = bytes()
131
132         # u8 array
133         if self.packer.size == 1:
134             return bytearray(list)
135
136         for e in list:
137             b += self.packer.pack(e)
138         return b
139
140     def unpack(self, data, offset=0, result=None):
141         logger.debug("Data: {} @ {} Result: {}"
142                      .format(list, offset, result[self.index]))
143         # Return a list of arguments
144
145         # u8 array
146         if self.packer.size == 1:
147             if result[self.index] == 0:
148                 return b''
149             p = BaseTypes('u8', result[self.index])
150             r = p.unpack(data, offset)
151             return r
152
153         r = []
154         for e in range(result[self.index]):
155             x = self.packer.unpack(data, offset)
156             r.append(x)
157             offset += self.packer.size
158         return r
159
160
161 class VLAList_legacy():
162     def __init__(self, name, field_type):
163         self.packer = types[field_type]
164         self.size = self.packer.size
165
166     def pack(self, list, kwargs=None):
167         logger.debug("Data: {}".format(list))
168
169         if self.packer.size == 1:
170             return bytes(list)
171
172         b = bytes()
173         for e in list:
174             b += self.packer.pack(e)
175         return b
176
177     def unpack(self, data, offset=0, result=None):
178         # Return a list of arguments
179         if (len(data) - offset) % self.packer.size:
180             raise ValueError('Legacy Variable Length Array length mismatch.')
181         elements = int((len(data) - offset) / self.packer.size)
182         r = []
183         logger.debug("Legacy VLA: {} elements of size {}"
184                      .format(elements, self.packer.size))
185         for e in range(elements):
186             x = self.packer.unpack(data, offset)
187             r.append(x)
188             offset += self.packer.size
189         return r
190
191
192 class VPPEnumType():
193     def __init__(self, name, msgdef):
194         self.size = types['u32'].size
195         e_hash = {}
196         for f in msgdef:
197             if type(f) is dict and 'enumtype' in f:
198                 if f['enumtype'] != 'u32':
199                     raise NotImplementedError
200                 continue
201             ename, evalue = f
202             e_hash[ename] = evalue
203         self.enum = IntEnum(name, e_hash)
204         types[name] = self
205         logger.debug('Adding enum {}'.format(name))
206
207     def __getattr__(self, name):
208         return self.enum[name]
209
210     def pack(self, data, kwargs=None):
211         logger.debug("Data: {}".format(data))
212         return types['u32'].pack(data, kwargs)
213
214     def unpack(self, data, offset=0, result=None):
215         x = types['u32'].unpack(data, offset)
216         return self.enum(x)
217
218
219 class VPPUnionType():
220     def __init__(self, name, msgdef):
221         self.name = name
222         self.size = 0
223         self.maxindex = 0
224         fields = []
225         self.packers = collections.OrderedDict()
226         for i, f in enumerate(msgdef):
227             if type(f) is dict and 'crc' in f:
228                 self.crc = f['crc']
229                 continue
230             f_type, f_name = f
231             if f_type not in types:
232                 logger.debug('Unknown union type {}'.format(f_type))
233                 raise ValueError('Unknown message type {}'.format(f_type))
234             fields.append(f_name)
235             size = types[f_type].size
236             self.packers[f_name] = types[f_type]
237             if size > self.size:
238                 self.size = size
239                 self.maxindex = i
240
241         types[name] = self
242         self.tuple = collections.namedtuple(name, fields, rename=True)
243         logger.debug('Adding union {}'.format(name))
244
245     def pack(self, data, kwargs=None):
246         logger.debug("Data: {}".format(data))
247         for k, v in data.items():
248             logger.debug("Key: {} Value: {}".format(k, v))
249             b = self.packers[k].pack(v, kwargs)
250             offset = self.size - self.packers[k].size
251             break
252         r = bytearray(self.size)
253         r[offset:] = b
254         return r
255
256     def unpack(self, data, offset=0, result=None):
257         r = []
258         for k, p in self.packers.items():
259             union_offset = self.size - p.size
260             r.append(p.unpack(data, offset + union_offset))
261         return self.tuple._make(r)
262
263
264 class VPPType():
265     # Set everything up to be able to pack / unpack
266     def __init__(self, name, msgdef):
267         self.name = name
268         self.msgdef = msgdef
269         self.packers = []
270         self.fields = []
271         self.fieldtypes = []
272         self.field_by_name = {}
273         size = 0
274         for i, f in enumerate(msgdef):
275             if type(f) is dict and 'crc' in f:
276                 self.crc = f['crc']
277                 continue
278             f_type, f_name = f[:2]
279             self.fields.append(f_name)
280             self.field_by_name[f_name] = None
281             self.fieldtypes.append(f_type)
282             if f_type not in types:
283                 logger.debug('Unknown type {}'.format(f_type))
284                 raise ValueError('Unknown message type {}'.format(f_type))
285             if len(f) == 3:  # list
286                 list_elements = f[2]
287                 if list_elements == 0:
288                     p = VLAList_legacy(f_name, f_type)
289                     self.packers.append(p)
290                 elif f_type == 'u8':
291                     p = FixedList_u8(f_name, f_type, list_elements)
292                     self.packers.append(p)
293                     size += p.size
294                 else:
295                     p = FixedList(f_name, f_type, list_elements)
296                     self.packers.append(p)
297                     size += p.size
298             elif len(f) == 4:  # Variable length list
299                     # Find index of length field
300                     length_index = self.fields.index(f[3])
301                     p = VLAList(f_name, f_type, f[3], length_index)
302                     self.packers.append(p)
303             else:
304                 self.packers.append(types[f_type])
305                 size += types[f_type].size
306
307         self.size = size
308         self.tuple = collections.namedtuple(name, self.fields, rename=True)
309         types[name] = self
310         logger.debug('Adding type {}'.format(name))
311
312     def pack(self, data, kwargs=None):
313         if not kwargs:
314             kwargs = data
315         logger.debug("Data: {}".format(data))
316         b = bytes()
317         for i, a in enumerate(self.fields):
318             if a not in data:
319                 logger.debug("Argument {} not given, defaulting to 0"
320                              .format(a))
321                 b += b'\x00' * self.packers[i].size
322                 continue
323
324             if isinstance(self.packers[i], VPPType):
325                 b += self.packers[i].pack(data[a], kwargs[a])
326             else:
327                 b += self.packers[i].pack(data[a], kwargs)
328         return b
329
330     def unpack(self, data, offset=0, result=None):
331         # Return a list of arguments
332         result = []
333         for p in self.packers:
334             x = p.unpack(data, offset, result)
335             if type(x) is tuple and len(x) == 1:
336                 x = x[0]
337             result.append(x)
338             offset += p.size
339         return self.tuple._make(result)
340
341
342 class VPPMessage(VPPType):
343     pass