vlib: pass node runtime to vlib_buffer_enqueue_to_thread()
[vpp.git] / test / vpp_ikev2.py
1 from ipaddress import IPv4Address, AddressValueError
2 from vpp_object import VppObject
3 from vpp_papi import VppEnum
4
5
6 class AuthMethod:
7     v = {'rsa-sig': 1,
8          'shared-key': 2}
9
10     @staticmethod
11     def value(key): return AuthMethod.v[key]
12
13
14 class IDType:
15     v = {'ip4-addr': 1,
16          'fqdn': 2,
17          'ip6-addr': 5}
18
19     @staticmethod
20     def value(key): return IDType.v[key]
21
22
23 class Profile(VppObject):
24     """ IKEv2 profile """
25     def __init__(self, test, profile_name):
26         self.test = test
27         self.vapi = test.vapi
28         self.profile_name = profile_name
29         self.udp_encap = False
30         self.natt = True
31
32     def disable_natt(self):
33         self.natt = False
34
35     def add_auth(self, method, data, is_hex=False):
36         if isinstance(method, int):
37             m = method
38         elif isinstance(method, str):
39             m = AuthMethod.value(method)
40         else:
41             raise Exception('unsupported type {}'.format(method))
42         self.auth = {'auth_method': m,
43                      'data': data,
44                      'is_hex': is_hex}
45
46     def add_local_id(self, id_type, data):
47         if isinstance(id_type, str):
48             t = IDType.value(id_type)
49         self.local_id = {'id_type': t,
50                          'data': data,
51                          'is_local': True}
52
53     def add_remote_id(self, id_type, data):
54         if isinstance(id_type, str):
55             t = IDType.value(id_type)
56         self.remote_id = {'id_type': t,
57                           'data': data,
58                           'is_local': False}
59
60     def add_local_ts(self, start_addr, end_addr, start_port=0, end_port=0xffff,
61                      proto=0, is_ip4=True):
62         self.ts_is_ip4 = is_ip4
63         self.local_ts = {'is_local': True,
64                          'protocol_id': proto,
65                          'start_port': start_port,
66                          'end_port': end_port,
67                          'start_addr': start_addr,
68                          'end_addr': end_addr}
69
70     def add_remote_ts(self, start_addr, end_addr, start_port=0,
71                       end_port=0xffff, proto=0):
72         try:
73             IPv4Address(start_addr)
74             is_ip4 = True
75         except AddressValueError:
76             is_ip4 = False
77         self.ts_is_ip4 = is_ip4
78         self.remote_ts = {'is_local': False,
79                           'protocol_id': proto,
80                           'start_port': start_port,
81                           'end_port': end_port,
82                           'start_addr': start_addr,
83                           'end_addr': end_addr}
84
85     def add_responder_hostname(self, hn):
86         self.responder_hostname = hn
87
88     def add_responder(self, responder):
89         self.responder = responder
90
91     def add_ike_transforms(self, tr):
92         self.ike_transforms = tr
93
94     def add_esp_transforms(self, tr):
95         self.esp_transforms = tr
96
97     def set_udp_encap(self, udp_encap):
98         self.udp_encap = udp_encap
99
100     def set_lifetime_data(self, data):
101         self.lifetime_data = data
102
103     def set_ipsec_over_udp_port(self, port):
104         self.ipsec_udp_port = {'is_set': 1,
105                                'port': port}
106
107     def set_tunnel_interface(self, sw_if_index):
108         self.tun_itf = sw_if_index
109
110     def object_id(self):
111         return 'ikev2-profile-%s' % self.profile_name
112
113     def remove_vpp_config(self):
114         self.vapi.ikev2_profile_add_del(name=self.profile_name, is_add=False)
115
116     def add_vpp_config(self):
117         self.vapi.ikev2_profile_add_del(name=self.profile_name, is_add=True)
118         if hasattr(self, 'auth'):
119             self.vapi.ikev2_profile_set_auth(name=self.profile_name,
120                                              data_len=len(self.auth['data']),
121                                              **self.auth)
122         if hasattr(self, 'local_id'):
123             self.vapi.ikev2_profile_set_id(name=self.profile_name,
124                                            data_len=len(self.local_id
125                                                         ['data']),
126                                            **self.local_id)
127         if hasattr(self, 'remote_id'):
128             self.vapi.ikev2_profile_set_id(name=self.profile_name,
129                                            data_len=len(self.remote_id
130                                                         ['data']),
131                                            **self.remote_id)
132         if hasattr(self, 'local_ts'):
133             self.vapi.ikev2_profile_set_ts(name=self.profile_name,
134                                            ts=self.local_ts)
135
136         if hasattr(self, 'remote_ts'):
137             self.vapi.ikev2_profile_set_ts(name=self.profile_name,
138                                            ts=self.remote_ts)
139
140         if hasattr(self, 'responder'):
141             self.vapi.ikev2_set_responder(name=self.profile_name,
142                                           responder=self.responder)
143
144         if hasattr(self, 'responder_hostname'):
145             print(self.responder_hostname)
146             self.vapi.ikev2_set_responder_hostname(name=self.profile_name,
147                                                    **self.responder_hostname)
148
149         if hasattr(self, 'ike_transforms'):
150             self.vapi.ikev2_set_ike_transforms(name=self.profile_name,
151                                                tr=self.ike_transforms)
152
153         if hasattr(self, 'esp_transforms'):
154             self.vapi.ikev2_set_esp_transforms(name=self.profile_name,
155                                                tr=self.esp_transforms)
156
157         if self.udp_encap:
158             self.vapi.ikev2_profile_set_udp_encap(name=self.profile_name)
159
160         if hasattr(self, 'lifetime_data'):
161             self.vapi.ikev2_set_sa_lifetime(name=self.profile_name,
162                                             **self.lifetime_data)
163
164         if hasattr(self, 'ipsec_udp_port'):
165             self.vapi.ikev2_profile_set_ipsec_udp_port(name=self.profile_name,
166                                                        **self.ipsec_udp_port)
167         if hasattr(self, 'tun_itf'):
168             self.vapi.ikev2_set_tunnel_interface(name=self.profile_name,
169                                                  sw_if_index=self.tun_itf)
170
171         if not self.natt:
172             self.vapi.ikev2_profile_disable_natt(name=self.profile_name)
173
174     def query_vpp_config(self):
175         res = self.vapi.ikev2_profile_dump()
176         for r in res:
177             if r.profile.name == self.profile_name:
178                 return r.profile
179         return None