IPSEC Tests: to per-test setup and tearDown
[vpp.git] / test / test_ipsec_ah.py
1 import socket
2 import unittest
3
4 from scapy.layers.ipsec import AH
5
6 from framework import VppTestRunner
7 from template_ipsec import TemplateIpsec, IpsecTraTests, IpsecTunTests
8 from template_ipsec import IpsecTcpTests
9
10
11 class TemplateIpsecAh(TemplateIpsec):
12     """
13     Basic test for IPSEC using AH transport and Tunnel mode
14
15     TRANSPORT MODE:
16
17      ---   encrypt   ---
18     |pg2| <-------> |VPP|
19      ---   decrypt   ---
20
21     TUNNEL MODE:
22
23      ---   encrypt   ---   plain   ---
24     |pg0| <-------  |VPP| <------ |pg1|
25      ---             ---           ---
26
27      ---   decrypt   ---   plain   ---
28     |pg0| ------->  |VPP| ------> |pg1|
29      ---             ---           ---
30     """
31
32     def setUp(self):
33         super(TemplateIpsecAh, self).setUp()
34
35         self.encryption_type = AH
36         self.tun_if = self.pg0
37         self.tra_if = self.pg2
38         self.logger.info(self.vapi.ppcli("show int addr"))
39         self.vapi.ipsec_spd_add_del(self.tun_spd_id)
40         self.vapi.ipsec_interface_add_del_spd(self.tun_spd_id,
41                                               self.tun_if.sw_if_index)
42         self.vapi.ipsec_spd_add_del(self.tra_spd_id)
43         self.vapi.ipsec_interface_add_del_spd(self.tra_spd_id,
44                                               self.tra_if.sw_if_index)
45         for _, p in self.params.items():
46             self.config_ah_tra(p)
47             self.configure_sa_tra(p)
48             self.logger.info(self.vapi.ppcli("show ipsec"))
49         for _, p in self.params.items():
50             self.config_ah_tun(p)
51             self.logger.info(self.vapi.ppcli("show ipsec"))
52         for _, p in self.params.items():
53             src = socket.inet_pton(p.addr_type, p.remote_tun_if_host)
54             self.vapi.ip_add_del_route(src, p.addr_len,
55                                        self.tun_if.remote_addr_n[p.addr_type],
56                                        is_ipv6=p.is_ipv6)
57
58     def config_ah_tun(self, params):
59         addr_type = params.addr_type
60         is_ipv6 = params.is_ipv6
61         scapy_tun_sa_id = params.scapy_tun_sa_id
62         scapy_tun_spi = params.scapy_tun_spi
63         vpp_tun_sa_id = params.vpp_tun_sa_id
64         vpp_tun_spi = params.vpp_tun_spi
65         auth_algo_vpp_id = params.auth_algo_vpp_id
66         auth_key = params.auth_key
67         crypt_algo_vpp_id = params.crypt_algo_vpp_id
68         crypt_key = params.crypt_key
69         remote_tun_if_host = params.remote_tun_if_host
70         addr_any = params.addr_any
71         addr_bcast = params.addr_bcast
72         self.vapi.ipsec_sad_add_del_entry(scapy_tun_sa_id, scapy_tun_spi,
73                                           auth_algo_vpp_id, auth_key,
74                                           crypt_algo_vpp_id, crypt_key,
75                                           self.vpp_ah_protocol,
76                                           self.tun_if.local_addr_n[addr_type],
77                                           self.tun_if.remote_addr_n[addr_type],
78                                           is_tunnel=1, is_tunnel_ipv6=is_ipv6)
79         self.vapi.ipsec_sad_add_del_entry(vpp_tun_sa_id, vpp_tun_spi,
80                                           auth_algo_vpp_id, auth_key,
81                                           crypt_algo_vpp_id, crypt_key,
82                                           self.vpp_ah_protocol,
83                                           self.tun_if.remote_addr_n[addr_type],
84                                           self.tun_if.local_addr_n[addr_type],
85                                           is_tunnel=1, is_tunnel_ipv6=is_ipv6)
86         l_startaddr = r_startaddr = socket.inet_pton(addr_type, addr_any)
87         l_stopaddr = r_stopaddr = socket.inet_pton(addr_type, addr_bcast)
88         self.vapi.ipsec_spd_add_del_entry(self.tun_spd_id, vpp_tun_sa_id,
89                                           l_startaddr, l_stopaddr, r_startaddr,
90                                           r_stopaddr, is_ipv6=is_ipv6,
91                                           protocol=socket.IPPROTO_AH)
92         self.vapi.ipsec_spd_add_del_entry(self.tun_spd_id, vpp_tun_sa_id,
93                                           l_startaddr, l_stopaddr, r_startaddr,
94                                           r_stopaddr, is_outbound=0,
95                                           is_ipv6=is_ipv6,
96                                           protocol=socket.IPPROTO_AH)
97         l_startaddr = l_stopaddr = socket.inet_pton(addr_type,
98                                                     remote_tun_if_host)
99         r_startaddr = r_stopaddr = self.pg1.remote_addr_n[addr_type]
100         self.vapi.ipsec_spd_add_del_entry(self.tun_spd_id, vpp_tun_sa_id,
101                                           l_startaddr, l_stopaddr, r_startaddr,
102                                           r_stopaddr, priority=10, policy=3,
103                                           is_outbound=0, is_ipv6=is_ipv6)
104         self.vapi.ipsec_spd_add_del_entry(self.tun_spd_id, scapy_tun_sa_id,
105                                           r_startaddr, r_stopaddr, l_startaddr,
106                                           l_stopaddr, priority=10, policy=3,
107                                           is_ipv6=is_ipv6)
108         r_startaddr = r_stopaddr = self.pg0.local_addr_n[addr_type]
109         self.vapi.ipsec_spd_add_del_entry(self.tun_spd_id, vpp_tun_sa_id,
110                                           l_startaddr, l_stopaddr, r_startaddr,
111                                           r_stopaddr, priority=20, policy=3,
112                                           is_outbound=0, is_ipv6=is_ipv6)
113         self.vapi.ipsec_spd_add_del_entry(self.tun_spd_id, scapy_tun_sa_id,
114                                           r_startaddr, r_stopaddr, l_startaddr,
115                                           l_stopaddr, priority=20, policy=3,
116                                           is_ipv6=is_ipv6)
117
118     def unconfig_ah_tun(self, params):
119         addr_type = params.addr_type
120         is_ipv6 = params.is_ipv6
121         scapy_tun_sa_id = params.scapy_tun_sa_id
122         scapy_tun_spi = params.scapy_tun_spi
123         vpp_tun_sa_id = params.vpp_tun_sa_id
124         vpp_tun_spi = params.vpp_tun_spi
125         auth_algo_vpp_id = params.auth_algo_vpp_id
126         auth_key = params.auth_key
127         crypt_algo_vpp_id = params.crypt_algo_vpp_id
128         crypt_key = params.crypt_key
129         remote_tun_if_host = params.remote_tun_if_host
130         addr_any = params.addr_any
131         addr_bcast = params.addr_bcast
132         l_startaddr = l_stopaddr = socket.inet_pton(addr_type,
133                                                     remote_tun_if_host)
134         r_startaddr = r_stopaddr = self.pg0.local_addr_n[addr_type]
135         self.vapi.ipsec_spd_add_del_entry(self.tun_spd_id, vpp_tun_sa_id,
136                                           l_startaddr, l_stopaddr, r_startaddr,
137                                           r_stopaddr, priority=20, policy=3,
138                                           is_outbound=0, is_ipv6=is_ipv6,
139                                           is_add=0)
140         self.vapi.ipsec_spd_add_del_entry(self.tun_spd_id, scapy_tun_sa_id,
141                                           r_startaddr, r_stopaddr, l_startaddr,
142                                           l_stopaddr, priority=20, policy=3,
143                                           is_ipv6=is_ipv6,
144                                           is_add=0)
145         r_startaddr = r_stopaddr = self.pg1.remote_addr_n[addr_type]
146         self.vapi.ipsec_spd_add_del_entry(self.tun_spd_id, vpp_tun_sa_id,
147                                           l_startaddr, l_stopaddr, r_startaddr,
148                                           r_stopaddr, priority=10, policy=3,
149                                           is_outbound=0, is_ipv6=is_ipv6,
150                                           is_add=0)
151         self.vapi.ipsec_spd_add_del_entry(self.tun_spd_id, scapy_tun_sa_id,
152                                           r_startaddr, r_stopaddr, l_startaddr,
153                                           l_stopaddr, priority=10, policy=3,
154                                           is_ipv6=is_ipv6, is_add=0)
155         l_startaddr = r_startaddr = socket.inet_pton(addr_type, addr_any)
156         l_stopaddr = r_stopaddr = socket.inet_pton(addr_type, addr_bcast)
157         self.vapi.ipsec_spd_add_del_entry(self.tun_spd_id, vpp_tun_sa_id,
158                                           l_startaddr, l_stopaddr, r_startaddr,
159                                           r_stopaddr, is_ipv6=is_ipv6,
160                                           protocol=socket.IPPROTO_AH,
161                                           is_add=0)
162         self.vapi.ipsec_spd_add_del_entry(self.tun_spd_id, vpp_tun_sa_id,
163                                           l_startaddr, l_stopaddr, r_startaddr,
164                                           r_stopaddr, is_outbound=0,
165                                           is_ipv6=is_ipv6,
166                                           protocol=socket.IPPROTO_AH,
167                                           is_add=0)
168         self.vapi.ipsec_sad_add_del_entry(scapy_tun_sa_id, scapy_tun_spi,
169                                           auth_algo_vpp_id, auth_key,
170                                           crypt_algo_vpp_id, crypt_key,
171                                           self.vpp_ah_protocol,
172                                           self.tun_if.local_addr_n[addr_type],
173                                           self.tun_if.remote_addr_n[addr_type],
174                                           is_tunnel=1, is_tunnel_ipv6=is_ipv6,
175                                           is_add=0)
176         self.vapi.ipsec_sad_add_del_entry(vpp_tun_sa_id, vpp_tun_spi,
177                                           auth_algo_vpp_id, auth_key,
178                                           crypt_algo_vpp_id, crypt_key,
179                                           self.vpp_ah_protocol,
180                                           self.tun_if.remote_addr_n[addr_type],
181                                           self.tun_if.local_addr_n[addr_type],
182                                           is_tunnel=1, is_tunnel_ipv6=is_ipv6,
183                                           is_add=0)
184
185     def config_ah_tra(self, params):
186         addr_type = params.addr_type
187         is_ipv6 = params.is_ipv6
188         scapy_tra_sa_id = params.scapy_tra_sa_id
189         scapy_tra_spi = params.scapy_tra_spi
190         vpp_tra_sa_id = params.vpp_tra_sa_id
191         vpp_tra_spi = params.vpp_tra_spi
192         auth_algo_vpp_id = params.auth_algo_vpp_id
193         auth_key = params.auth_key
194         crypt_algo_vpp_id = params.crypt_algo_vpp_id
195         crypt_key = params.crypt_key
196         addr_any = params.addr_any
197         addr_bcast = params.addr_bcast
198         self.vapi.ipsec_sad_add_del_entry(scapy_tra_sa_id, scapy_tra_spi,
199                                           auth_algo_vpp_id, auth_key,
200                                           crypt_algo_vpp_id, crypt_key,
201                                           self.vpp_ah_protocol, is_tunnel=0,
202                                           is_tunnel_ipv6=0,
203                                           use_anti_replay=1)
204         self.vapi.ipsec_sad_add_del_entry(vpp_tra_sa_id, vpp_tra_spi,
205                                           auth_algo_vpp_id, auth_key,
206                                           crypt_algo_vpp_id, crypt_key,
207                                           self.vpp_ah_protocol, is_tunnel=0,
208                                           is_tunnel_ipv6=0,
209                                           use_anti_replay=1)
210         l_startaddr = r_startaddr = socket.inet_pton(addr_type, addr_any)
211         l_stopaddr = r_stopaddr = socket.inet_pton(addr_type, addr_bcast)
212         self.vapi.ipsec_spd_add_del_entry(self.tra_spd_id, vpp_tra_sa_id,
213                                           l_startaddr, l_stopaddr, r_startaddr,
214                                           r_stopaddr, is_ipv6=is_ipv6,
215                                           protocol=socket.IPPROTO_AH)
216         self.vapi.ipsec_spd_add_del_entry(self.tra_spd_id, scapy_tra_sa_id,
217                                           l_startaddr, l_stopaddr, r_startaddr,
218                                           r_stopaddr, is_outbound=0,
219                                           is_ipv6=is_ipv6,
220                                           protocol=socket.IPPROTO_AH)
221         l_startaddr = l_stopaddr = self.tra_if.local_addr_n[addr_type]
222         r_startaddr = r_stopaddr = self.tra_if.remote_addr_n[addr_type]
223         self.vapi.ipsec_spd_add_del_entry(self.tra_spd_id, vpp_tra_sa_id,
224                                           l_startaddr, l_stopaddr, r_startaddr,
225                                           r_stopaddr, priority=10, policy=3,
226                                           is_outbound=0, is_ipv6=is_ipv6)
227         self.vapi.ipsec_spd_add_del_entry(self.tra_spd_id, scapy_tra_sa_id,
228                                           l_startaddr, l_stopaddr, r_startaddr,
229                                           r_stopaddr, priority=10,
230                                           policy=3, is_ipv6=is_ipv6)
231
232     def unconfig_ah_tra(self, params):
233         addr_type = params.addr_type
234         is_ipv6 = params.is_ipv6
235         scapy_tra_sa_id = params.scapy_tra_sa_id
236         scapy_tra_spi = params.scapy_tra_spi
237         vpp_tra_sa_id = params.vpp_tra_sa_id
238         vpp_tra_spi = params.vpp_tra_spi
239         auth_algo_vpp_id = params.auth_algo_vpp_id
240         auth_key = params.auth_key
241         crypt_algo_vpp_id = params.crypt_algo_vpp_id
242         crypt_key = params.crypt_key
243         addr_any = params.addr_any
244         addr_bcast = params.addr_bcast
245         l_startaddr = r_startaddr = socket.inet_pton(addr_type, addr_any)
246         l_stopaddr = r_stopaddr = socket.inet_pton(addr_type, addr_bcast)
247         self.vapi.ipsec_spd_add_del_entry(self.tra_spd_id, vpp_tra_sa_id,
248                                           l_startaddr, l_stopaddr, r_startaddr,
249                                           r_stopaddr, is_ipv6=is_ipv6,
250                                           protocol=socket.IPPROTO_AH,
251                                           is_add=0)
252         self.vapi.ipsec_spd_add_del_entry(self.tra_spd_id, scapy_tra_sa_id,
253                                           l_startaddr, l_stopaddr, r_startaddr,
254                                           r_stopaddr, is_outbound=0,
255                                           is_ipv6=is_ipv6,
256                                           protocol=socket.IPPROTO_AH,
257                                           is_add=0)
258         l_startaddr = l_stopaddr = self.tra_if.local_addr_n[addr_type]
259         r_startaddr = r_stopaddr = self.tra_if.remote_addr_n[addr_type]
260         self.vapi.ipsec_spd_add_del_entry(self.tra_spd_id, vpp_tra_sa_id,
261                                           l_startaddr, l_stopaddr, r_startaddr,
262                                           r_stopaddr, priority=10, policy=3,
263                                           is_outbound=0, is_ipv6=is_ipv6,
264                                           is_add=0)
265         self.vapi.ipsec_spd_add_del_entry(self.tra_spd_id, scapy_tra_sa_id,
266                                           l_startaddr, l_stopaddr, r_startaddr,
267                                           r_stopaddr, priority=10,
268                                           policy=3, is_ipv6=is_ipv6,
269                                           is_add=0)
270         self.vapi.ipsec_sad_add_del_entry(scapy_tra_sa_id, scapy_tra_spi,
271                                           auth_algo_vpp_id, auth_key,
272                                           crypt_algo_vpp_id, crypt_key,
273                                           self.vpp_ah_protocol, is_tunnel=0,
274                                           is_tunnel_ipv6=0,
275                                           use_anti_replay=1,
276                                           is_add=0)
277         self.vapi.ipsec_sad_add_del_entry(vpp_tra_sa_id, vpp_tra_spi,
278                                           auth_algo_vpp_id, auth_key,
279                                           crypt_algo_vpp_id, crypt_key,
280                                           self.vpp_ah_protocol, is_tunnel=0,
281                                           is_tunnel_ipv6=0,
282                                           use_anti_replay=1,
283                                           is_add=0)
284
285     def tearDown(self):
286         for _, p in self.params.items():
287             self.unconfig_ah_tun(p)
288         for _, p in self.params.items():
289             self.unconfig_ah_tra(p)
290
291         self.vapi.ipsec_interface_add_del_spd(self.tun_spd_id,
292                                               self.tun_if.sw_if_index,
293                                               is_add=0)
294         self.vapi.ipsec_spd_add_del(self.tun_spd_id, is_add=0)
295         self.vapi.ipsec_interface_add_del_spd(self.tra_spd_id,
296                                               self.tra_if.sw_if_index,
297                                               is_add=0)
298         self.vapi.ipsec_spd_add_del(self.tra_spd_id,
299                                     is_add=0)
300         for _, p in self.params.items():
301             src = socket.inet_pton(p.addr_type, p.remote_tun_if_host)
302             self.vapi.ip_add_del_route(
303                 src, p.addr_len, self.tun_if.remote_addr_n[p.addr_type],
304                 is_ipv6=p.is_ipv6, is_add=0)
305
306         super(TemplateIpsecAh, self).tearDown()
307         if not self.vpp_dead:
308             self.vapi.cli("show hardware")
309
310
311 class TestIpsecAh1(TemplateIpsecAh, IpsecTraTests, IpsecTunTests):
312     """ Ipsec AH - TUN & TRA tests """
313     tra4_encrypt_node_name = "ah4-encrypt"
314     tra4_decrypt_node_name = "ah4-decrypt"
315     tra6_encrypt_node_name = "ah6-encrypt"
316     tra6_decrypt_node_name = "ah6-decrypt"
317     tun4_encrypt_node_name = "ah4-encrypt"
318     tun4_decrypt_node_name = "ah4-decrypt"
319     tun6_encrypt_node_name = "ah6-encrypt"
320     tun6_decrypt_node_name = "ah6-decrypt"
321
322
323 class TestIpsecAh2(TemplateIpsecAh, IpsecTcpTests):
324     """ Ipsec AH - TCP tests """
325     pass
326
327
328 if __name__ == '__main__':
329     unittest.main(testRunner=VppTestRunner)