IPSEC: tests use opbject registry
[vpp.git] / test / vpp_ipsec.py
1 from vpp_object import *
2 from ipaddress import ip_address
3
4 try:
5     text_type = unicode
6 except NameError:
7     text_type = str
8
9
10 class VppIpsecSpd(VppObject):
11     """
12     VPP SPD DB
13     """
14
15     def __init__(self, test, id):
16         self.test = test
17         self.id = id
18
19     def add_vpp_config(self):
20         self.test.vapi.ipsec_spd_add_del(self.id)
21         self.test.registry.register(self, self.test.logger)
22
23     def remove_vpp_config(self):
24         self.test.vapi.ipsec_spd_add_del(self.id, is_add=0)
25
26     def __str__(self):
27         return self.object_id()
28
29     def object_id(self):
30         return "ipsec-spd-%d" % self.id
31
32     def query_vpp_config(self):
33         spds = self.test.vapi.ipsec_spds_dump()
34         for spd in spds:
35             if spd.spd_id == self.id:
36                 return True
37         return False
38
39
40 class VppIpsecSpdItfBinding(VppObject):
41     """
42     VPP SPD DB to interface binding
43     (i.e. this SPD is used on this interfce)
44     """
45
46     def __init__(self, test, spd, itf):
47         self.test = test
48         self.spd = spd
49         self.itf = itf
50
51     def add_vpp_config(self):
52         self.test.vapi.ipsec_interface_add_del_spd(self.spd.id,
53                                                    self.itf.sw_if_index)
54         self.test.registry.register(self, self.test.logger)
55
56     def remove_vpp_config(self):
57         self.test.vapi.ipsec_interface_add_del_spd(self.spd.id,
58                                                    self.itf.sw_if_index,
59                                                    is_add=0)
60
61     def __str__(self):
62         return self.object_id()
63
64     def object_id(self):
65         return "bind-%s-to-%s" % (self.spd.id, self.itf)
66
67     def query_vpp_config(self):
68         bs = self.test.vapi.ipsec_spd_interface_dump()
69         for b in bs:
70             if b.sw_if_index == self.itf.sw_if_index:
71                 return True
72         return False
73
74
75 class VppIpsecSpdEntry(VppObject):
76     """
77     VPP SPD DB Entry
78     """
79
80     def __init__(self, test, spd, sa_id,
81                  local_start, local_stop,
82                  remote_start, remote_stop,
83                  proto,
84                  priority=100,
85                  policy=0,
86                  is_outbound=1,
87                  remote_port_start=0,
88                  remote_port_stop=65535,
89                  local_port_start=0,
90                  local_port_stop=65535):
91         self.test = test
92         self.spd = spd
93         self.sa_id = sa_id
94         self.local_start = ip_address(text_type(local_start))
95         self.local_stop = ip_address(text_type(local_stop))
96         self.remote_start = ip_address(text_type(remote_start))
97         self.remote_stop = ip_address(text_type(remote_stop))
98         self.proto = proto
99         self.is_outbound = is_outbound
100         self.priority = priority
101         self.policy = policy
102         self.is_ipv6 = (0 if self.local_start.version == 4 else 1)
103         self.local_port_start = local_port_start
104         self.local_port_stop = local_port_stop
105         self.remote_port_start = remote_port_start
106         self.remote_port_stop = remote_port_stop
107
108     def add_vpp_config(self):
109         self.test.vapi.ipsec_spd_add_del_entry(
110             self.spd.id,
111             self.sa_id,
112             self.local_start.packed,
113             self.local_stop.packed,
114             self.remote_start.packed,
115             self.remote_stop.packed,
116             protocol=self.proto,
117             is_ipv6=self.is_ipv6,
118             is_outbound=self.is_outbound,
119             priority=self.priority,
120             policy=self.policy,
121             local_port_start=self.local_port_start,
122             local_port_stop=self.local_port_stop,
123             remote_port_start=self.remote_port_start,
124             remote_port_stop=self.remote_port_stop)
125         self.test.registry.register(self, self.test.logger)
126
127     def remove_vpp_config(self):
128         self.test.vapi.ipsec_spd_add_del_entry(
129             self.spd.id,
130             self.sa_id,
131             self.local_start.packed,
132             self.local_stop.packed,
133             self.remote_start.packed,
134             self.remote_stop.packed,
135             protocol=self.proto,
136             is_ipv6=self.is_ipv6,
137             is_outbound=self.is_outbound,
138             priority=self.priority,
139             policy=self.policy,
140             local_port_start=self.local_port_start,
141             local_port_stop=self.local_port_stop,
142             remote_port_start=self.remote_port_start,
143             remote_port_stop=self.remote_port_stop,
144             is_add=0)
145
146     def __str__(self):
147         return self.object_id()
148
149     def object_id(self):
150         return "spd-entry-%d-%d-%d-%d-%d-%d" % (self.spd.id,
151                                                 self.priority,
152                                                 self.policy,
153                                                 self.is_outbound,
154                                                 self.is_ipv6,
155                                                 self.remote_port_start)
156
157     def query_vpp_config(self):
158         ss = self.test.vapi.ipsec_spd_dump(self.spd.id)
159         for s in ss:
160             if s.sa_id == self.sa_id and \
161                s.is_outbound == self.is_outbound and \
162                s.priority == self.priority and \
163                s.policy == self.policy and \
164                s.is_ipv6 == self.is_ipv6 and \
165                s.remote_start_port == self.remote_port_start:
166                 return True
167         return False
168
169
170 class VppIpsecSA(VppObject):
171     """
172     VPP SAD Entry
173     """
174
175     def __init__(self, test, id, spi,
176                  integ_alg, integ_key,
177                  crypto_alg, crypto_key,
178                  proto,
179                  tun_src=None, tun_dst=None,
180                  use_anti_replay=0,
181                  udp_encap=0):
182         self.test = test
183         self.id = id
184         self.spi = spi
185         self.integ_alg = integ_alg
186         self.integ_key = integ_key
187         self.crypto_alg = crypto_alg
188         self.crypto_key = crypto_key
189         self.proto = proto
190         self.is_tunnel = 0
191         self.is_tunnel_v6 = 0
192         self.tun_src = tun_src
193         self.tun_dst = tun_dst
194         if (tun_src):
195             self.tun_src = ip_address(text_type(tun_src))
196             self.is_tunnel = 1
197             if (self.tun_src.version == 6):
198                 self.is_tunnel_v6 = 1
199         if (tun_dst):
200             self.tun_dst = ip_address(text_type(tun_dst))
201         self.use_anti_replay = use_anti_replay
202         self.udp_encap = udp_encap
203
204     def add_vpp_config(self):
205         self.test.vapi.ipsec_sad_add_del_entry(
206             self.id,
207             self.spi,
208             self.integ_alg,
209             self.integ_key,
210             self.crypto_alg,
211             self.crypto_key,
212             self.proto,
213             (self.tun_src.packed if self.tun_src else []),
214             (self.tun_dst.packed if self.tun_dst else []),
215             is_tunnel=self.is_tunnel,
216             is_tunnel_ipv6=self.is_tunnel_v6,
217             use_anti_replay=self.use_anti_replay,
218             udp_encap=self.udp_encap)
219         self.test.registry.register(self, self.test.logger)
220
221     def remove_vpp_config(self):
222         self.test.vapi.ipsec_sad_add_del_entry(
223             self.id,
224             self.spi,
225             self.integ_alg,
226             self.integ_key,
227             self.crypto_alg,
228             self.crypto_key,
229             self.proto,
230             (self.tun_src.packed if self.tun_src else []),
231             (self.tun_dst.packed if self.tun_dst else []),
232             is_tunnel=self.is_tunnel,
233             is_tunnel_ipv6=self.is_tunnel_v6,
234             use_anti_replay=self.use_anti_replay,
235             udp_encap=self.udp_encap,
236             is_add=0)
237
238     def __str__(self):
239         return self.object_id()
240
241     def object_id(self):
242         return "ipsec-sa-%d" % self.id
243
244     def query_vpp_config(self):
245         bs = self.test.vapi.ipsec_sa_dump()
246         for b in bs:
247             if b.sa_id == self.id:
248                 return True
249         return False