+from ipaddress import IPv4Address, AddressValueError
from vpp_object import VppObject
from vpp_papi import VppEnum
class IDType:
v = {'ip4-addr': 1,
- 'fqdn': 2}
+ 'fqdn': 2,
+ 'ip6-addr': 5}
@staticmethod
def value(key): return IDType.v[key]
self.test = test
self.vapi = test.vapi
self.profile_name = profile_name
+ self.udp_encap = False
+ self.natt = True
+
+ def disable_natt(self):
+ self.natt = False
def add_auth(self, method, data, is_hex=False):
if isinstance(method, int):
'is_local': False}
def add_local_ts(self, start_addr, end_addr, start_port=0, end_port=0xffff,
- proto=0):
+ proto=0, is_ip4=True):
+ self.ts_is_ip4 = is_ip4
self.local_ts = {'is_local': True,
- 'proto': proto,
+ 'protocol_id': proto,
'start_port': start_port,
'end_port': end_port,
'start_addr': start_addr,
def add_remote_ts(self, start_addr, end_addr, start_port=0,
end_port=0xffff, proto=0):
+ try:
+ IPv4Address(start_addr)
+ is_ip4 = True
+ except AddressValueError:
+ is_ip4 = False
+ self.ts_is_ip4 = is_ip4
self.remote_ts = {'is_local': False,
- 'proto': proto,
+ 'protocol_id': proto,
'start_port': start_port,
'end_port': end_port,
'start_addr': start_addr,
'end_addr': end_addr}
+ def add_responder_hostname(self, hn):
+ self.responder_hostname = hn
+
+ def add_responder(self, responder):
+ self.responder = responder
+
+ def add_ike_transforms(self, tr):
+ self.ike_transforms = tr
+
+ def add_esp_transforms(self, tr):
+ self.esp_transforms = tr
+
+ def set_udp_encap(self, udp_encap):
+ self.udp_encap = udp_encap
+
+ def set_lifetime_data(self, data):
+ self.lifetime_data = data
+
+ def set_ipsec_over_udp_port(self, port):
+ self.ipsec_udp_port = {'is_set': 1,
+ 'port': port}
+
+ def set_tunnel_interface(self, sw_if_index):
+ self.tun_itf = sw_if_index
+
def object_id(self):
return 'ikev2-profile-%s' % self.profile_name
**self.remote_id)
if hasattr(self, 'local_ts'):
self.vapi.ikev2_profile_set_ts(name=self.profile_name,
- **self.local_ts)
+ ts=self.local_ts)
+
if hasattr(self, 'remote_ts'):
self.vapi.ikev2_profile_set_ts(name=self.profile_name,
- **self.remote_ts)
+ ts=self.remote_ts)
+
+ if hasattr(self, 'responder'):
+ self.vapi.ikev2_set_responder(name=self.profile_name,
+ responder=self.responder)
+
+ if hasattr(self, 'responder_hostname'):
+ print(self.responder_hostname)
+ self.vapi.ikev2_set_responder_hostname(name=self.profile_name,
+ **self.responder_hostname)
+
+ if hasattr(self, 'ike_transforms'):
+ self.vapi.ikev2_set_ike_transforms(name=self.profile_name,
+ tr=self.ike_transforms)
+
+ if hasattr(self, 'esp_transforms'):
+ self.vapi.ikev2_set_esp_transforms(name=self.profile_name,
+ tr=self.esp_transforms)
+
+ if self.udp_encap:
+ self.vapi.ikev2_profile_set_udp_encap(name=self.profile_name)
+
+ if hasattr(self, 'lifetime_data'):
+ self.vapi.ikev2_set_sa_lifetime(name=self.profile_name,
+ **self.lifetime_data)
+
+ if hasattr(self, 'ipsec_udp_port'):
+ self.vapi.ikev2_profile_set_ipsec_udp_port(name=self.profile_name,
+ **self.ipsec_udp_port)
+ if hasattr(self, 'tun_itf'):
+ self.vapi.ikev2_set_tunnel_interface(name=self.profile_name,
+ sw_if_index=self.tun_itf)
+
+ if not self.natt:
+ self.vapi.ikev2_profile_disable_natt(name=self.profile_name)
def query_vpp_config(self):
- raise NotImplementedError()
+ res = self.vapi.ikev2_profile_dump()
+ for r in res:
+ if r.profile.name == self.profile_name:
+ return r.profile
+ return None