ipsec: User can choose the UDP source port
[vpp.git] / test / vpp_ipsec.py
1 from vpp_object import VppObject
2 from ipaddress import ip_address
3 from vpp_papi import VppEnum
4
5 try:
6     text_type = unicode
7 except NameError:
8     text_type = str
9
10
11 def mk_counter():
12     return {'packets': 0, 'bytes': 0}
13
14
15 class VppIpsecSpd(VppObject):
16     """
17     VPP SPD DB
18     """
19
20     def __init__(self, test, id):
21         self.test = test
22         self.id = id
23
24     def add_vpp_config(self):
25         self.test.vapi.ipsec_spd_add_del(self.id)
26         self.test.registry.register(self, self.test.logger)
27
28     def remove_vpp_config(self):
29         self.test.vapi.ipsec_spd_add_del(self.id, is_add=0)
30
31     def object_id(self):
32         return "ipsec-spd-%d" % self.id
33
34     def query_vpp_config(self):
35         spds = self.test.vapi.ipsec_spds_dump()
36         for spd in spds:
37             if spd.spd_id == self.id:
38                 return True
39         return False
40
41
42 class VppIpsecSpdItfBinding(VppObject):
43     """
44     VPP SPD DB to interface binding
45     (i.e. this SPD is used on this interface)
46     """
47
48     def __init__(self, test, spd, itf):
49         self.test = test
50         self.spd = spd
51         self.itf = itf
52
53     def add_vpp_config(self):
54         self.test.vapi.ipsec_interface_add_del_spd(self.spd.id,
55                                                    self.itf.sw_if_index)
56         self.test.registry.register(self, self.test.logger)
57
58     def remove_vpp_config(self):
59         self.test.vapi.ipsec_interface_add_del_spd(self.spd.id,
60                                                    self.itf.sw_if_index,
61                                                    is_add=0)
62
63     def object_id(self):
64         return "bind-%s-to-%s" % (self.spd.id, self.itf)
65
66     def query_vpp_config(self):
67         bs = self.test.vapi.ipsec_spd_interface_dump()
68         for b in bs:
69             if b.sw_if_index == self.itf.sw_if_index:
70                 return True
71         return False
72
73
74 class VppIpsecSpdEntry(VppObject):
75     """
76     VPP SPD DB Entry
77     """
78
79     def __init__(self, test, spd, sa_id,
80                  local_start, local_stop,
81                  remote_start, remote_stop,
82                  proto,
83                  priority=100,
84                  policy=None,
85                  is_outbound=1,
86                  remote_port_start=0,
87                  remote_port_stop=65535,
88                  local_port_start=0,
89                  local_port_stop=65535):
90         self.test = test
91         self.spd = spd
92         self.sa_id = sa_id
93         self.local_start = ip_address(text_type(local_start))
94         self.local_stop = ip_address(text_type(local_stop))
95         self.remote_start = ip_address(text_type(remote_start))
96         self.remote_stop = ip_address(text_type(remote_stop))
97         self.proto = proto
98         self.is_outbound = is_outbound
99         self.priority = priority
100         if not policy:
101             self.policy = (VppEnum.vl_api_ipsec_spd_action_t.
102                            IPSEC_API_SPD_ACTION_BYPASS)
103         else:
104             self.policy = policy
105         self.is_ipv6 = (0 if self.local_start.version == 4 else 1)
106         self.local_port_start = local_port_start
107         self.local_port_stop = local_port_stop
108         self.remote_port_start = remote_port_start
109         self.remote_port_stop = remote_port_stop
110
111     def add_vpp_config(self):
112         rv = self.test.vapi.ipsec_spd_entry_add_del(
113             self.spd.id,
114             self.sa_id,
115             self.local_start,
116             self.local_stop,
117             self.remote_start,
118             self.remote_stop,
119             protocol=self.proto,
120             is_ipv6=self.is_ipv6,
121             is_outbound=self.is_outbound,
122             priority=self.priority,
123             policy=self.policy,
124             local_port_start=self.local_port_start,
125             local_port_stop=self.local_port_stop,
126             remote_port_start=self.remote_port_start,
127             remote_port_stop=self.remote_port_stop)
128         self.stat_index = rv.stat_index
129         self.test.registry.register(self, self.test.logger)
130
131     def remove_vpp_config(self):
132         self.test.vapi.ipsec_spd_entry_add_del(
133             self.spd.id,
134             self.sa_id,
135             self.local_start,
136             self.local_stop,
137             self.remote_start,
138             self.remote_stop,
139             protocol=self.proto,
140             is_ipv6=self.is_ipv6,
141             is_outbound=self.is_outbound,
142             priority=self.priority,
143             policy=self.policy,
144             local_port_start=self.local_port_start,
145             local_port_stop=self.local_port_stop,
146             remote_port_start=self.remote_port_start,
147             remote_port_stop=self.remote_port_stop,
148             is_add=0)
149
150     def object_id(self):
151         return "spd-entry-%d-%d-%d-%d-%d-%d" % (self.spd.id,
152                                                 self.priority,
153                                                 self.policy,
154                                                 self.is_outbound,
155                                                 self.is_ipv6,
156                                                 self.remote_port_start)
157
158     def query_vpp_config(self):
159         ss = self.test.vapi.ipsec_spd_dump(self.spd.id)
160         for s in ss:
161             if s.entry.sa_id == self.sa_id and \
162                s.entry.is_outbound == self.is_outbound and \
163                s.entry.priority == self.priority and \
164                s.entry.policy == self.policy and \
165                s.entry.remote_address_start == self.remote_start and \
166                s.entry.remote_port_start == self.remote_port_start:
167                 return True
168         return False
169
170     def get_stats(self, worker=None):
171         c = self.test.statistics.get_counter("/net/ipsec/policy")
172         if worker is None:
173             total = mk_counter()
174             for t in c:
175                 total['packets'] += t[self.stat_index]['packets']
176             return total
177         else:
178             # +1 to skip main thread
179             return c[worker+1][self.stat_index]
180
181
182 class VppIpsecSA(VppObject):
183     """
184     VPP SAD Entry
185     """
186
187     DEFAULT_UDP_PORT = 4500
188
189     def __init__(self, test, id, spi,
190                  integ_alg, integ_key,
191                  crypto_alg, crypto_key,
192                  proto,
193                  tun_src=None, tun_dst=None,
194                  flags=None, salt=0, udp_src=None,
195                  udp_dst=None):
196         e = VppEnum.vl_api_ipsec_sad_flags_t
197         self.test = test
198         self.id = id
199         self.spi = spi
200         self.integ_alg = integ_alg
201         self.integ_key = integ_key
202         self.crypto_alg = crypto_alg
203         self.crypto_key = crypto_key
204         self.proto = proto
205         self.salt = salt
206
207         self.tun_src = tun_src
208         self.tun_dst = tun_dst
209         if not flags:
210             self.flags = e.IPSEC_API_SAD_FLAG_NONE
211         else:
212             self.flags = flags
213         if (tun_src):
214             self.tun_src = ip_address(text_type(tun_src))
215             self.flags = self.flags | e.IPSEC_API_SAD_FLAG_IS_TUNNEL
216             if (self.tun_src.version == 6):
217                 self.flags = self.flags | e.IPSEC_API_SAD_FLAG_IS_TUNNEL_V6
218         if (tun_dst):
219             self.tun_dst = ip_address(text_type(tun_dst))
220         self.udp_src = udp_src
221         self.udp_dst = udp_dst
222
223     def add_vpp_config(self):
224         entry = {
225             'sad_id': self.id,
226             'spi': self.spi,
227             'integrity_algorithm': self.integ_alg,
228             'integrity_key': {
229                 'length': len(self.integ_key),
230                 'data': self.integ_key,
231             },
232             'crypto_algorithm': self.crypto_alg,
233             'crypto_key': {
234                 'data': self.crypto_key,
235                 'length': len(self.crypto_key),
236             },
237             'protocol': self.proto,
238             'tunnel_src': (self.tun_src if self.tun_src else []),
239             'tunnel_dst': (self.tun_dst if self.tun_dst else []),
240             'flags': self.flags,
241             'salt': self.salt
242         }
243         # don't explicitly send the defaults, let papi fill them in
244         if self.udp_src:
245             entry['udp_src_port'] = self.udp_src
246         if self.udp_dst:
247             entry['udp_dst_port'] = self.udp_dst
248         r = self.test.vapi.ipsec_sad_entry_add_del(is_add=1, entry=entry)
249         self.stat_index = r.stat_index
250         self.test.registry.register(self, self.test.logger)
251
252     def remove_vpp_config(self):
253         r = self.test.vapi.ipsec_sad_entry_add_del(
254             is_add=0,
255             entry={
256                 'sad_id': self.id,
257                 'spi': self.spi,
258                 'integrity_algorithm': self.integ_alg,
259                 'integrity_key': {
260                     'length': len(self.integ_key),
261                     'data': self.integ_key,
262                 },
263                 'crypto_algorithm': self.crypto_alg,
264                 'crypto_key': {
265                     'data': self.crypto_key,
266                     'length': len(self.crypto_key),
267                 },
268                 'protocol': self.proto,
269                 'tunnel_src': (self.tun_src if self.tun_src else []),
270                 'tunnel_dst': (self.tun_dst if self.tun_dst else []),
271                 'flags': self.flags,
272                 'salt': self.salt
273             })
274
275     def object_id(self):
276         return "ipsec-sa-%d" % self.id
277
278     def query_vpp_config(self):
279         e = VppEnum.vl_api_ipsec_sad_flags_t
280
281         bs = self.test.vapi.ipsec_sa_dump()
282         for b in bs:
283             if b.entry.sad_id == self.id:
284                 # if udp encap is configured then the ports should match
285                 # those configured or the default
286                 if (self.flags & e.IPSEC_API_SAD_FLAG_UDP_ENCAP):
287                     if not b.entry.flags & e.IPSEC_API_SAD_FLAG_UDP_ENCAP:
288                         return False
289                     if self.udp_src:
290                         if self.udp_src != b.entry.udp_src_port:
291                             return False
292                     else:
293                         if self.DEFAULT_UDP_PORT != b.entry.udp_src_port:
294                             return False
295                     if self.udp_dst:
296                         if self.udp_dst != b.entry.udp_dst_port:
297                             return False
298                     else:
299                         if self.DEFAULT_UDP_PORT != b.entry.udp_dst_port:
300                             return False
301                 return True
302         return False
303
304     def get_stats(self, worker=None):
305         c = self.test.statistics.get_counter("/net/ipsec/sa")
306         if worker is None:
307             total = mk_counter()
308             for t in c:
309                 total['packets'] += t[self.stat_index]['packets']
310             return total
311         else:
312             # +1 to skip main thread
313             return c[worker+1][self.stat_index]
314
315
316 class VppIpsecTunProtect(VppObject):
317     """
318     VPP IPSEC tunnel protection
319     """
320
321     def __init__(self, test, itf, sa_out, sas_in, nh=None):
322         self.test = test
323         self.itf = itf
324         self.sas_in = []
325         for sa in sas_in:
326             self.sas_in.append(sa.id)
327         self.sa_out = sa_out.id
328         self.nh = nh
329         if not self.nh:
330             self.nh = "0.0.0.0"
331
332     def update_vpp_config(self, sa_out, sas_in):
333         self.sas_in = []
334         for sa in sas_in:
335             self.sas_in.append(sa.id)
336         self.sa_out = sa_out.id
337         self.test.vapi.ipsec_tunnel_protect_update(
338             tunnel={
339                 'sw_if_index': self.itf._sw_if_index,
340                 'n_sa_in': len(self.sas_in),
341                 'sa_out': self.sa_out,
342                 'sa_in': self.sas_in,
343                 'nh': self.nh})
344
345     def object_id(self):
346         return "ipsec-tun-protect-%s-%s" % (self.itf, self.nh)
347
348     def add_vpp_config(self):
349         self.test.vapi.ipsec_tunnel_protect_update(
350             tunnel={
351                 'sw_if_index': self.itf._sw_if_index,
352                 'n_sa_in': len(self.sas_in),
353                 'sa_out': self.sa_out,
354                 'sa_in': self.sas_in,
355                 'nh': self.nh})
356         self.test.registry.register(self, self.test.logger)
357
358     def remove_vpp_config(self):
359         self.test.vapi.ipsec_tunnel_protect_del(
360             sw_if_index=self.itf.sw_if_index,
361             nh=self.nh)
362
363     def query_vpp_config(self):
364         bs = self.test.vapi.ipsec_tunnel_protect_dump(
365             sw_if_index=self.itf.sw_if_index)
366         for b in bs:
367             if b.tun.sw_if_index == self.itf.sw_if_index and \
368                self.nh == str(b.tun.nh):
369                 return True
370         return False