1 from ipaddress import IPv4Address, AddressValueError
2 from vpp_object import VppObject
3 from vpp_papi import VppEnum
11 def value(key): return AuthMethod.v[key]
20 def value(key): return IDType.v[key]
23 class Profile(VppObject):
25 def __init__(self, test, profile_name):
28 self.profile_name = profile_name
29 self.udp_encap = False
32 def disable_natt(self):
35 def add_auth(self, method, data, is_hex=False):
36 if isinstance(method, int):
38 elif isinstance(method, str):
39 m = AuthMethod.value(method)
41 raise Exception('unsupported type {}'.format(method))
42 self.auth = {'auth_method': m,
46 def add_local_id(self, id_type, data):
47 if isinstance(id_type, str):
48 t = IDType.value(id_type)
49 self.local_id = {'id_type': t,
53 def add_remote_id(self, id_type, data):
54 if isinstance(id_type, str):
55 t = IDType.value(id_type)
56 self.remote_id = {'id_type': t,
60 def add_local_ts(self, start_addr, end_addr, start_port=0, end_port=0xffff,
61 proto=0, is_ip4=True):
62 self.ts_is_ip4 = is_ip4
63 self.local_ts = {'is_local': True,
65 'start_port': start_port,
67 'start_addr': start_addr,
70 def add_remote_ts(self, start_addr, end_addr, start_port=0,
71 end_port=0xffff, proto=0):
73 IPv4Address(start_addr)
75 except AddressValueError:
77 self.ts_is_ip4 = is_ip4
78 self.remote_ts = {'is_local': False,
80 'start_port': start_port,
82 'start_addr': start_addr,
85 def add_responder(self, responder):
86 self.responder = responder
88 def add_ike_transforms(self, tr):
89 self.ike_transforms = tr
91 def add_esp_transforms(self, tr):
92 self.esp_transforms = tr
94 def set_udp_encap(self, udp_encap):
95 self.udp_encap = udp_encap
97 def set_lifetime_data(self, data):
98 self.lifetime_data = data
100 def set_ipsec_over_udp_port(self, port):
101 self.ipsec_udp_port = {'is_set': 1,
104 def set_tunnel_interface(self, sw_if_index):
105 self.tun_itf = sw_if_index
108 return 'ikev2-profile-%s' % self.profile_name
110 def remove_vpp_config(self):
111 self.vapi.ikev2_profile_add_del(name=self.profile_name, is_add=False)
113 def add_vpp_config(self):
114 self.vapi.ikev2_profile_add_del(name=self.profile_name, is_add=True)
115 if hasattr(self, 'auth'):
116 self.vapi.ikev2_profile_set_auth(name=self.profile_name,
117 data_len=len(self.auth['data']),
119 if hasattr(self, 'local_id'):
120 self.vapi.ikev2_profile_set_id(name=self.profile_name,
121 data_len=len(self.local_id
124 if hasattr(self, 'remote_id'):
125 self.vapi.ikev2_profile_set_id(name=self.profile_name,
126 data_len=len(self.remote_id
129 if hasattr(self, 'local_ts'):
130 self.vapi.ikev2_profile_set_ts(name=self.profile_name,
133 if hasattr(self, 'remote_ts'):
134 self.vapi.ikev2_profile_set_ts(name=self.profile_name,
137 if hasattr(self, 'responder'):
138 self.vapi.ikev2_set_responder(name=self.profile_name,
139 responder=self.responder)
141 if hasattr(self, 'ike_transforms'):
142 self.vapi.ikev2_set_ike_transforms(name=self.profile_name,
143 tr=self.ike_transforms)
145 if hasattr(self, 'esp_transforms'):
146 self.vapi.ikev2_set_esp_transforms(name=self.profile_name,
147 tr=self.esp_transforms)
150 self.vapi.ikev2_profile_set_udp_encap(name=self.profile_name)
152 if hasattr(self, 'lifetime_data'):
153 self.vapi.ikev2_set_sa_lifetime(name=self.profile_name,
154 **self.lifetime_data)
156 if hasattr(self, 'ipsec_udp_port'):
157 self.vapi.ikev2_profile_set_ipsec_udp_port(name=self.profile_name,
158 **self.ipsec_udp_port)
159 if hasattr(self, 'tun_itf'):
160 self.vapi.ikev2_set_tunnel_interface(name=self.profile_name,
161 sw_if_index=self.tun_itf)
164 self.vapi.ikev2_profile_disable_natt(name=self.profile_name)
166 def query_vpp_config(self):
167 res = self.vapi.ikev2_profile_dump()
169 if r.profile.name == self.profile_name: