ipsec: IPSec protection for multi-point tunnel interfaces
[vpp.git] / test / vpp_ipsec.py
1 from vpp_object import VppObject
2 from ipaddress import ip_address
3 from vpp_papi import VppEnum
4
5 try:
6     text_type = unicode
7 except NameError:
8     text_type = str
9
10
11 def mk_counter():
12     return {'packets': 0, 'bytes': 0}
13
14
15 class VppIpsecSpd(VppObject):
16     """
17     VPP SPD DB
18     """
19
20     def __init__(self, test, id):
21         self.test = test
22         self.id = id
23
24     def add_vpp_config(self):
25         self.test.vapi.ipsec_spd_add_del(self.id)
26         self.test.registry.register(self, self.test.logger)
27
28     def remove_vpp_config(self):
29         self.test.vapi.ipsec_spd_add_del(self.id, is_add=0)
30
31     def object_id(self):
32         return "ipsec-spd-%d" % self.id
33
34     def query_vpp_config(self):
35         spds = self.test.vapi.ipsec_spds_dump()
36         for spd in spds:
37             if spd.spd_id == self.id:
38                 return True
39         return False
40
41
42 class VppIpsecSpdItfBinding(VppObject):
43     """
44     VPP SPD DB to interface binding
45     (i.e. this SPD is used on this interface)
46     """
47
48     def __init__(self, test, spd, itf):
49         self.test = test
50         self.spd = spd
51         self.itf = itf
52
53     def add_vpp_config(self):
54         self.test.vapi.ipsec_interface_add_del_spd(self.spd.id,
55                                                    self.itf.sw_if_index)
56         self.test.registry.register(self, self.test.logger)
57
58     def remove_vpp_config(self):
59         self.test.vapi.ipsec_interface_add_del_spd(self.spd.id,
60                                                    self.itf.sw_if_index,
61                                                    is_add=0)
62
63     def object_id(self):
64         return "bind-%s-to-%s" % (self.spd.id, self.itf)
65
66     def query_vpp_config(self):
67         bs = self.test.vapi.ipsec_spd_interface_dump()
68         for b in bs:
69             if b.sw_if_index == self.itf.sw_if_index:
70                 return True
71         return False
72
73
74 class VppIpsecSpdEntry(VppObject):
75     """
76     VPP SPD DB Entry
77     """
78
79     def __init__(self, test, spd, sa_id,
80                  local_start, local_stop,
81                  remote_start, remote_stop,
82                  proto,
83                  priority=100,
84                  policy=None,
85                  is_outbound=1,
86                  remote_port_start=0,
87                  remote_port_stop=65535,
88                  local_port_start=0,
89                  local_port_stop=65535):
90         self.test = test
91         self.spd = spd
92         self.sa_id = sa_id
93         self.local_start = ip_address(text_type(local_start))
94         self.local_stop = ip_address(text_type(local_stop))
95         self.remote_start = ip_address(text_type(remote_start))
96         self.remote_stop = ip_address(text_type(remote_stop))
97         self.proto = proto
98         self.is_outbound = is_outbound
99         self.priority = priority
100         if not policy:
101             self.policy = (VppEnum.vl_api_ipsec_spd_action_t.
102                            IPSEC_API_SPD_ACTION_BYPASS)
103         else:
104             self.policy = policy
105         self.is_ipv6 = (0 if self.local_start.version == 4 else 1)
106         self.local_port_start = local_port_start
107         self.local_port_stop = local_port_stop
108         self.remote_port_start = remote_port_start
109         self.remote_port_stop = remote_port_stop
110
111     def add_vpp_config(self):
112         rv = self.test.vapi.ipsec_spd_entry_add_del(
113             self.spd.id,
114             self.sa_id,
115             self.local_start,
116             self.local_stop,
117             self.remote_start,
118             self.remote_stop,
119             protocol=self.proto,
120             is_ipv6=self.is_ipv6,
121             is_outbound=self.is_outbound,
122             priority=self.priority,
123             policy=self.policy,
124             local_port_start=self.local_port_start,
125             local_port_stop=self.local_port_stop,
126             remote_port_start=self.remote_port_start,
127             remote_port_stop=self.remote_port_stop)
128         self.stat_index = rv.stat_index
129         self.test.registry.register(self, self.test.logger)
130
131     def remove_vpp_config(self):
132         self.test.vapi.ipsec_spd_entry_add_del(
133             self.spd.id,
134             self.sa_id,
135             self.local_start,
136             self.local_stop,
137             self.remote_start,
138             self.remote_stop,
139             protocol=self.proto,
140             is_ipv6=self.is_ipv6,
141             is_outbound=self.is_outbound,
142             priority=self.priority,
143             policy=self.policy,
144             local_port_start=self.local_port_start,
145             local_port_stop=self.local_port_stop,
146             remote_port_start=self.remote_port_start,
147             remote_port_stop=self.remote_port_stop,
148             is_add=0)
149
150     def object_id(self):
151         return "spd-entry-%d-%d-%d-%d-%d-%d" % (self.spd.id,
152                                                 self.priority,
153                                                 self.policy,
154                                                 self.is_outbound,
155                                                 self.is_ipv6,
156                                                 self.remote_port_start)
157
158     def query_vpp_config(self):
159         ss = self.test.vapi.ipsec_spd_dump(self.spd.id)
160         for s in ss:
161             if s.entry.sa_id == self.sa_id and \
162                s.entry.is_outbound == self.is_outbound and \
163                s.entry.priority == self.priority and \
164                s.entry.policy == self.policy and \
165                s.entry.remote_address_start == self.remote_start and \
166                s.entry.remote_port_start == self.remote_port_start:
167                 return True
168         return False
169
170     def get_stats(self, worker=None):
171         c = self.test.statistics.get_counter("/net/ipsec/policy")
172         if worker is None:
173             total = mk_counter()
174             for t in c:
175                 total['packets'] += t[self.stat_index]['packets']
176             return total
177         else:
178             # +1 to skip main thread
179             return c[worker+1][self.stat_index]
180
181
182 class VppIpsecSA(VppObject):
183     """
184     VPP SAD Entry
185     """
186
187     def __init__(self, test, id, spi,
188                  integ_alg, integ_key,
189                  crypto_alg, crypto_key,
190                  proto,
191                  tun_src=None, tun_dst=None,
192                  flags=None, salt=0):
193         e = VppEnum.vl_api_ipsec_sad_flags_t
194         self.test = test
195         self.id = id
196         self.spi = spi
197         self.integ_alg = integ_alg
198         self.integ_key = integ_key
199         self.crypto_alg = crypto_alg
200         self.crypto_key = crypto_key
201         self.proto = proto
202         self.salt = salt
203
204         self.tun_src = tun_src
205         self.tun_dst = tun_dst
206         if not flags:
207             self.flags = e.IPSEC_API_SAD_FLAG_NONE
208         else:
209             self.flags = flags
210         if (tun_src):
211             self.tun_src = ip_address(text_type(tun_src))
212             self.flags = self.flags | e.IPSEC_API_SAD_FLAG_IS_TUNNEL
213             if (self.tun_src.version == 6):
214                 self.flags = self.flags | e.IPSEC_API_SAD_FLAG_IS_TUNNEL_V6
215         if (tun_dst):
216             self.tun_dst = ip_address(text_type(tun_dst))
217
218     def add_vpp_config(self):
219         r = self.test.vapi.ipsec_sad_entry_add_del(
220             self.id,
221             self.spi,
222             self.integ_alg,
223             self.integ_key,
224             self.crypto_alg,
225             self.crypto_key,
226             self.proto,
227             (self.tun_src if self.tun_src else []),
228             (self.tun_dst if self.tun_dst else []),
229             flags=self.flags,
230             salt=self.salt)
231         self.stat_index = r.stat_index
232         self.test.registry.register(self, self.test.logger)
233
234     def remove_vpp_config(self):
235         self.test.vapi.ipsec_sad_entry_add_del(
236             self.id,
237             self.spi,
238             self.integ_alg,
239             self.integ_key,
240             self.crypto_alg,
241             self.crypto_key,
242             self.proto,
243             (self.tun_src if self.tun_src else []),
244             (self.tun_dst if self.tun_dst else []),
245             flags=self.flags,
246             is_add=0)
247
248     def object_id(self):
249         return "ipsec-sa-%d" % self.id
250
251     def query_vpp_config(self):
252         bs = self.test.vapi.ipsec_sa_dump()
253         for b in bs:
254             if b.entry.sad_id == self.id:
255                 return True
256         return False
257
258     def get_stats(self, worker=None):
259         c = self.test.statistics.get_counter("/net/ipsec/sa")
260         if worker is None:
261             total = mk_counter()
262             for t in c:
263                 total['packets'] += t[self.stat_index]['packets']
264             return total
265         else:
266             # +1 to skip main thread
267             return c[worker+1][self.stat_index]
268
269
270 class VppIpsecTunProtect(VppObject):
271     """
272     VPP IPSEC tunnel protection
273     """
274
275     def __init__(self, test, itf, sa_out, sas_in, nh=None):
276         self.test = test
277         self.itf = itf
278         self.sas_in = []
279         for sa in sas_in:
280             self.sas_in.append(sa.id)
281         self.sa_out = sa_out.id
282         self.nh = nh
283         if not self.nh:
284             self.nh = "0.0.0.0"
285
286     def update_vpp_config(self, sa_out, sas_in):
287         self.sas_in = []
288         for sa in sas_in:
289             self.sas_in.append(sa.id)
290         self.sa_out = sa_out.id
291         self.test.vapi.ipsec_tunnel_protect_update(
292             tunnel={
293                 'sw_if_index': self.itf._sw_if_index,
294                 'n_sa_in': len(self.sas_in),
295                 'sa_out': self.sa_out,
296                 'sa_in': self.sas_in,
297                 'nh': self.nh})
298
299     def object_id(self):
300         return "ipsec-tun-protect-%s-%s" % (self.itf, self.nh)
301
302     def add_vpp_config(self):
303         self.test.vapi.ipsec_tunnel_protect_update(
304             tunnel={
305                 'sw_if_index': self.itf._sw_if_index,
306                 'n_sa_in': len(self.sas_in),
307                 'sa_out': self.sa_out,
308                 'sa_in': self.sas_in,
309                 'nh': self.nh})
310         self.test.registry.register(self, self.test.logger)
311
312     def remove_vpp_config(self):
313         self.test.vapi.ipsec_tunnel_protect_del(
314             sw_if_index=self.itf.sw_if_index,
315             nh=self.nh)
316
317     def query_vpp_config(self):
318         bs = self.test.vapi.ipsec_tunnel_protect_dump(
319             sw_if_index=self.itf.sw_if_index)
320         for b in bs:
321             if b.tun.sw_if_index == self.itf.sw_if_index and \
322                self.nh == str(b.tun.nh):
323                 return True
324         return False