1 from vpp_object import *
2 from ipaddress import ip_address
10 class VppIpsecSpd(VppObject):
15 def __init__(self, test, id):
19 def add_vpp_config(self):
20 self.test.vapi.ipsec_spd_add_del(self.id)
21 self.test.registry.register(self, self.test.logger)
23 def remove_vpp_config(self):
24 self.test.vapi.ipsec_spd_add_del(self.id, is_add=0)
27 return self.object_id()
30 return "ipsec-spd-%d" % self.id
32 def query_vpp_config(self):
33 spds = self.test.vapi.ipsec_spds_dump()
35 if spd.spd_id == self.id:
40 class VppIpsecSpdItfBinding(VppObject):
42 VPP SPD DB to interface binding
43 (i.e. this SPD is used on this interfce)
46 def __init__(self, test, spd, itf):
51 def add_vpp_config(self):
52 self.test.vapi.ipsec_interface_add_del_spd(self.spd.id,
54 self.test.registry.register(self, self.test.logger)
56 def remove_vpp_config(self):
57 self.test.vapi.ipsec_interface_add_del_spd(self.spd.id,
62 return self.object_id()
65 return "bind-%s-to-%s" % (self.spd.id, self.itf)
67 def query_vpp_config(self):
68 bs = self.test.vapi.ipsec_spd_interface_dump()
70 if b.sw_if_index == self.itf.sw_if_index:
75 class VppIpsecSpdEntry(VppObject):
80 def __init__(self, test, spd, sa_id,
81 local_start, local_stop,
82 remote_start, remote_stop,
88 remote_port_stop=65535,
90 local_port_stop=65535):
94 self.local_start = ip_address(text_type(local_start))
95 self.local_stop = ip_address(text_type(local_stop))
96 self.remote_start = ip_address(text_type(remote_start))
97 self.remote_stop = ip_address(text_type(remote_stop))
99 self.is_outbound = is_outbound
100 self.priority = priority
102 self.is_ipv6 = (0 if self.local_start.version == 4 else 1)
103 self.local_port_start = local_port_start
104 self.local_port_stop = local_port_stop
105 self.remote_port_start = remote_port_start
106 self.remote_port_stop = remote_port_stop
108 def add_vpp_config(self):
109 self.test.vapi.ipsec_spd_add_del_entry(
112 self.local_start.packed,
113 self.local_stop.packed,
114 self.remote_start.packed,
115 self.remote_stop.packed,
117 is_ipv6=self.is_ipv6,
118 is_outbound=self.is_outbound,
119 priority=self.priority,
121 local_port_start=self.local_port_start,
122 local_port_stop=self.local_port_stop,
123 remote_port_start=self.remote_port_start,
124 remote_port_stop=self.remote_port_stop)
125 self.test.registry.register(self, self.test.logger)
127 def remove_vpp_config(self):
128 self.test.vapi.ipsec_spd_add_del_entry(
131 self.local_start.packed,
132 self.local_stop.packed,
133 self.remote_start.packed,
134 self.remote_stop.packed,
136 is_ipv6=self.is_ipv6,
137 is_outbound=self.is_outbound,
138 priority=self.priority,
140 local_port_start=self.local_port_start,
141 local_port_stop=self.local_port_stop,
142 remote_port_start=self.remote_port_start,
143 remote_port_stop=self.remote_port_stop,
147 return self.object_id()
150 return "spd-entry-%d-%d-%d-%d-%d-%d" % (self.spd.id,
155 self.remote_port_start)
157 def query_vpp_config(self):
158 ss = self.test.vapi.ipsec_spd_dump(self.spd.id)
160 if s.sa_id == self.sa_id and \
161 s.is_outbound == self.is_outbound and \
162 s.priority == self.priority and \
163 s.policy == self.policy and \
164 s.is_ipv6 == self.is_ipv6 and \
165 s.remote_start_port == self.remote_port_start:
170 class VppIpsecSA(VppObject):
175 def __init__(self, test, id, spi,
176 integ_alg, integ_key,
177 crypto_alg, crypto_key,
179 tun_src=None, tun_dst=None,
185 self.integ_alg = integ_alg
186 self.integ_key = integ_key
187 self.crypto_alg = crypto_alg
188 self.crypto_key = crypto_key
191 self.is_tunnel_v6 = 0
192 self.tun_src = tun_src
193 self.tun_dst = tun_dst
195 self.tun_src = ip_address(text_type(tun_src))
197 if (self.tun_src.version == 6):
198 self.is_tunnel_v6 = 1
200 self.tun_dst = ip_address(text_type(tun_dst))
201 self.use_anti_replay = use_anti_replay
202 self.udp_encap = udp_encap
204 def add_vpp_config(self):
205 self.test.vapi.ipsec_sad_add_del_entry(
213 (self.tun_src.packed if self.tun_src else []),
214 (self.tun_dst.packed if self.tun_dst else []),
215 is_tunnel=self.is_tunnel,
216 is_tunnel_ipv6=self.is_tunnel_v6,
217 use_anti_replay=self.use_anti_replay,
218 udp_encap=self.udp_encap)
219 self.test.registry.register(self, self.test.logger)
221 def remove_vpp_config(self):
222 self.test.vapi.ipsec_sad_add_del_entry(
230 (self.tun_src.packed if self.tun_src else []),
231 (self.tun_dst.packed if self.tun_dst else []),
232 is_tunnel=self.is_tunnel,
233 is_tunnel_ipv6=self.is_tunnel_v6,
234 use_anti_replay=self.use_anti_replay,
235 udp_encap=self.udp_encap,
239 return self.object_id()
242 return "ipsec-sa-%d" % self.id
244 def query_vpp_config(self):
245 bs = self.test.vapi.ipsec_sa_dump()
247 if b.sa_id == self.id: