ikev2: refactor and test profile dump API
[vpp.git] / src / plugins / ikev2 / test / vpp_ikev2.py
1 from vpp_object import VppObject
2 from vpp_papi import VppEnum
3
4
5 class AuthMethod:
6     v = {'rsa-sig': 1,
7          'shared-key': 2}
8
9     @staticmethod
10     def value(key): return AuthMethod.v[key]
11
12
13 class IDType:
14     v = {'ip4-addr': 1,
15          'fqdn': 2}
16
17     @staticmethod
18     def value(key): return IDType.v[key]
19
20
21 class Profile(VppObject):
22     """ IKEv2 profile """
23     def __init__(self, test, profile_name):
24         self.test = test
25         self.vapi = test.vapi
26         self.profile_name = profile_name
27         self.udp_encap = False
28
29     def add_auth(self, method, data, is_hex=False):
30         if isinstance(method, int):
31             m = method
32         elif isinstance(method, str):
33             m = AuthMethod.value(method)
34         else:
35             raise Exception('unsupported type {}'.format(method))
36         self.auth = {'auth_method': m,
37                      'data': data,
38                      'is_hex': is_hex}
39
40     def add_local_id(self, id_type, data):
41         if isinstance(id_type, str):
42             t = IDType.value(id_type)
43         self.local_id = {'id_type': t,
44                          'data': data,
45                          'is_local': True}
46
47     def add_remote_id(self, id_type, data):
48         if isinstance(id_type, str):
49             t = IDType.value(id_type)
50         self.remote_id = {'id_type': t,
51                           'data': data,
52                           'is_local': False}
53
54     def add_local_ts(self, start_addr, end_addr, start_port=0, end_port=0xffff,
55                      proto=0):
56         self.local_ts = {'is_local': True,
57                          'protocol_id': proto,
58                          'start_port': start_port,
59                          'end_port': end_port,
60                          'start_addr': start_addr,
61                          'end_addr': end_addr}
62
63     def add_remote_ts(self, start_addr, end_addr, start_port=0,
64                       end_port=0xffff, proto=0):
65         self.remote_ts = {'is_local': False,
66                           'protocol_id': proto,
67                           'start_port': start_port,
68                           'end_port': end_port,
69                           'start_addr': start_addr,
70                           'end_addr': end_addr}
71
72     def add_responder(self, responder):
73         self.responder = responder
74
75     def add_ike_transforms(self, tr):
76         self.ike_transforms = tr
77
78     def add_esp_transforms(self, tr):
79         self.esp_transforms = tr
80
81     def set_udp_encap(self, udp_encap):
82         self.udp_encap = udp_encap
83
84     def set_lifetime_data(self, data):
85         self.lifetime_data = data
86
87     def set_ipsec_over_udp_port(self, port):
88         self.ipsec_udp_port = {'is_set': 1,
89                                'port': port}
90
91     def set_tunnel_interface(self, sw_if_index):
92         self.tun_itf = sw_if_index
93
94     def object_id(self):
95         return 'ikev2-profile-%s' % self.profile_name
96
97     def remove_vpp_config(self):
98         self.vapi.ikev2_profile_add_del(name=self.profile_name, is_add=False)
99
100     def add_vpp_config(self):
101         self.vapi.ikev2_profile_add_del(name=self.profile_name, is_add=True)
102         if hasattr(self, 'auth'):
103             self.vapi.ikev2_profile_set_auth(name=self.profile_name,
104                                              data_len=len(self.auth['data']),
105                                              **self.auth)
106         if hasattr(self, 'local_id'):
107             self.vapi.ikev2_profile_set_id(name=self.profile_name,
108                                            data_len=len(self.local_id
109                                                         ['data']),
110                                            **self.local_id)
111         if hasattr(self, 'remote_id'):
112             self.vapi.ikev2_profile_set_id(name=self.profile_name,
113                                            data_len=len(self.remote_id
114                                                         ['data']),
115                                            **self.remote_id)
116         if hasattr(self, 'local_ts'):
117             self.vapi.ikev2_profile_set_ts(name=self.profile_name,
118                                            ts={**self.local_ts})
119
120         if hasattr(self, 'remote_ts'):
121             self.vapi.ikev2_profile_set_ts(name=self.profile_name,
122                                            ts={**self.remote_ts})
123
124         if hasattr(self, 'responder'):
125             self.vapi.ikev2_set_responder(name=self.profile_name,
126                                           responder={**self.responder})
127
128         if hasattr(self, 'ike_transforms'):
129             self.vapi.ikev2_set_ike_transforms(name=self.profile_name,
130                                                tr={**self.ike_transforms})
131
132         if hasattr(self, 'esp_transforms'):
133             self.vapi.ikev2_set_esp_transforms(name=self.profile_name,
134                                                tr=self.esp_transforms)
135
136         if self.udp_encap:
137             self.vapi.ikev2_profile_set_udp_encap(name=self.profile_name)
138
139         if hasattr(self, 'lifetime_data'):
140             self.vapi.ikev2_set_sa_lifetime(name=self.profile_name,
141                                             **self.lifetime_data)
142
143         if hasattr(self, 'ipsec_udp_port'):
144             self.vapi.ikev2_profile_set_ipsec_udp_port(name=self.profile_name,
145                                                        **self.ipsec_udp_port)
146         if hasattr(self, 'tun_itf'):
147             self.vapi.ikev2_set_tunnel_interface(name=self.profile_name,
148                                                  sw_if_index=self.tun_itf)
149
150     def query_vpp_config(self):
151         res = self.vapi.ikev2_profile_dump()
152         for r in res:
153             if r.profile.name == self.profile_name:
154                 return r.profile
155         return None