6788876dad228893f34087c32b27cd746e631c31
[vpp.git] / test / test_ipsec_ah.py
1 import socket
2 import unittest
3
4 from scapy.layers.ipsec import AH
5
6 from framework import VppTestRunner
7 from template_ipsec import TemplateIpsec, IpsecTra46Tests, IpsecTun46Tests, \
8     config_tun_params, config_tra_params, IPsecIPv4Params, IPsecIPv6Params
9 from template_ipsec import IpsecTcpTests
10 from vpp_ipsec import VppIpsecSA, VppIpsecSpd, VppIpsecSpdEntry,\
11         VppIpsecSpdItfBinding
12 from vpp_ip_route import VppIpRoute, VppRoutePath
13 from vpp_ip import DpoProto
14 from vpp_papi import VppEnum
15
16
17 class TemplateIpsecAh(TemplateIpsec):
18     """
19     Basic test for IPSEC using AH transport and Tunnel mode
20
21     TRANSPORT MODE:
22
23      ---   encrypt   ---
24     |pg2| <-------> |VPP|
25      ---   decrypt   ---
26
27     TUNNEL MODE:
28
29      ---   encrypt   ---   plain   ---
30     |pg0| <-------  |VPP| <------ |pg1|
31      ---             ---           ---
32
33      ---   decrypt   ---   plain   ---
34     |pg0| ------->  |VPP| ------> |pg1|
35      ---             ---           ---
36     """
37
38     def setUp(self):
39         super(TemplateIpsecAh, self).setUp()
40
41         self.encryption_type = AH
42         self.tun_if = self.pg0
43         self.tra_if = self.pg2
44         self.logger.info(self.vapi.ppcli("show int addr"))
45
46         self.tra_spd = VppIpsecSpd(self, self.tra_spd_id)
47         self.tra_spd.add_vpp_config()
48         VppIpsecSpdItfBinding(self, self.tra_spd,
49                               self.tra_if).add_vpp_config()
50         self.tun_spd = VppIpsecSpd(self, self.tun_spd_id)
51         self.tun_spd.add_vpp_config()
52         VppIpsecSpdItfBinding(self, self.tun_spd,
53                               self.tun_if).add_vpp_config()
54
55         for _, p in self.params.items():
56             self.config_ah_tra(p)
57             config_tra_params(p, self.encryption_type)
58             self.logger.info(self.vapi.ppcli("show ipsec"))
59         for _, p in self.params.items():
60             self.config_ah_tun(p)
61             self.logger.info(self.vapi.ppcli("show ipsec"))
62         for _, p in self.params.items():
63             d = DpoProto.DPO_PROTO_IP6 if p.is_ipv6 else DpoProto.DPO_PROTO_IP4
64             VppIpRoute(self,  p.remote_tun_if_host, p.addr_len,
65                        [VppRoutePath(self.tun_if.remote_addr[p.addr_type],
66                                      0xffffffff,
67                                      proto=d)],
68                        is_ip6=p.is_ipv6).add_vpp_config()
69
70     def tearDown(self):
71         super(TemplateIpsecAh, self).tearDown()
72         if not self.vpp_dead:
73             self.vapi.cli("show hardware")
74
75     def config_ah_tun(self, params):
76         addr_type = params.addr_type
77         scapy_tun_sa_id = params.scapy_tun_sa_id
78         scapy_tun_spi = params.scapy_tun_spi
79         vpp_tun_sa_id = params.vpp_tun_sa_id
80         vpp_tun_spi = params.vpp_tun_spi
81         auth_algo_vpp_id = params.auth_algo_vpp_id
82         auth_key = params.auth_key
83         crypt_algo_vpp_id = params.crypt_algo_vpp_id
84         crypt_key = params.crypt_key
85         remote_tun_if_host = params.remote_tun_if_host
86         addr_any = params.addr_any
87         addr_bcast = params.addr_bcast
88         flags = params.flags
89         e = VppEnum.vl_api_ipsec_spd_action_t
90
91         params.tun_sa_in = VppIpsecSA(self, scapy_tun_sa_id, scapy_tun_spi,
92                                       auth_algo_vpp_id, auth_key,
93                                       crypt_algo_vpp_id, crypt_key,
94                                       self.vpp_ah_protocol,
95                                       self.tun_if.local_addr[addr_type],
96                                       self.tun_if.remote_addr[addr_type],
97                                       flags=flags)
98         params.tun_sa_in.add_vpp_config()
99         params.tun_sa_out = VppIpsecSA(self, vpp_tun_sa_id, vpp_tun_spi,
100                                        auth_algo_vpp_id, auth_key,
101                                        crypt_algo_vpp_id, crypt_key,
102                                        self.vpp_ah_protocol,
103                                        self.tun_if.remote_addr[addr_type],
104                                        self.tun_if.local_addr[addr_type],
105                                        flags=flags)
106         params.tun_sa_out.add_vpp_config()
107
108         params.spd_policy_in_any = VppIpsecSpdEntry(self, self.tun_spd,
109                                                     vpp_tun_sa_id,
110                                                     addr_any, addr_bcast,
111                                                     addr_any, addr_bcast,
112                                                     socket.IPPROTO_AH)
113         params.spd_policy_in_any.add_vpp_config()
114         params.spd_policy_out_any = VppIpsecSpdEntry(self, self.tun_spd,
115                                                      vpp_tun_sa_id,
116                                                      addr_any, addr_bcast,
117                                                      addr_any, addr_bcast,
118                                                      socket.IPPROTO_AH,
119                                                      is_outbound=0)
120         params.spd_policy_out_any.add_vpp_config()
121
122         VppIpsecSpdEntry(self, self.tun_spd, vpp_tun_sa_id,
123                          remote_tun_if_host,
124                          remote_tun_if_host,
125                          self.pg1.remote_addr[addr_type],
126                          self.pg1.remote_addr[addr_type],
127                          0, priority=10,
128                          policy=e.IPSEC_API_SPD_ACTION_PROTECT,
129                          is_outbound=0).add_vpp_config()
130         VppIpsecSpdEntry(self, self.tun_spd, scapy_tun_sa_id,
131                          self.pg1.remote_addr[addr_type],
132                          self.pg1.remote_addr[addr_type],
133                          remote_tun_if_host,
134                          remote_tun_if_host,
135                          0, policy=e.IPSEC_API_SPD_ACTION_PROTECT,
136                          priority=10).add_vpp_config()
137
138         VppIpsecSpdEntry(self, self.tun_spd, vpp_tun_sa_id,
139                          remote_tun_if_host,
140                          remote_tun_if_host,
141                          self.pg0.local_addr[addr_type],
142                          self.pg0.local_addr[addr_type],
143                          0, priority=20,
144                          policy=e.IPSEC_API_SPD_ACTION_PROTECT,
145                          is_outbound=0).add_vpp_config()
146         VppIpsecSpdEntry(self, self.tun_spd, scapy_tun_sa_id,
147                          self.pg0.local_addr[addr_type],
148                          self.pg0.local_addr[addr_type],
149                          remote_tun_if_host,
150                          remote_tun_if_host,
151                          0, policy=e.IPSEC_API_SPD_ACTION_PROTECT,
152                          priority=20).add_vpp_config()
153
154     def config_ah_tra(self, params):
155         addr_type = params.addr_type
156         scapy_tra_sa_id = params.scapy_tra_sa_id
157         scapy_tra_spi = params.scapy_tra_spi
158         vpp_tra_sa_id = params.vpp_tra_sa_id
159         vpp_tra_spi = params.vpp_tra_spi
160         auth_algo_vpp_id = params.auth_algo_vpp_id
161         auth_key = params.auth_key
162         crypt_algo_vpp_id = params.crypt_algo_vpp_id
163         crypt_key = params.crypt_key
164         addr_any = params.addr_any
165         addr_bcast = params.addr_bcast
166         flags = params.flags | (VppEnum.vl_api_ipsec_sad_flags_t.
167                                 IPSEC_API_SAD_FLAG_USE_ANTI_REPLAY)
168         e = VppEnum.vl_api_ipsec_spd_action_t
169
170         params.tra_sa_in = VppIpsecSA(self, scapy_tra_sa_id, scapy_tra_spi,
171                                       auth_algo_vpp_id, auth_key,
172                                       crypt_algo_vpp_id, crypt_key,
173                                       self.vpp_ah_protocol,
174                                       flags=flags)
175         params.tra_sa_in.add_vpp_config()
176         params.tra_sa_out = VppIpsecSA(self, vpp_tra_sa_id, vpp_tra_spi,
177                                        auth_algo_vpp_id, auth_key,
178                                        crypt_algo_vpp_id, crypt_key,
179                                        self.vpp_ah_protocol,
180                                        flags=flags)
181         params.tra_sa_out.add_vpp_config()
182
183         VppIpsecSpdEntry(self, self.tra_spd, vpp_tra_sa_id,
184                          addr_any, addr_bcast,
185                          addr_any, addr_bcast,
186                          socket.IPPROTO_AH).add_vpp_config()
187         VppIpsecSpdEntry(self, self.tra_spd, scapy_tra_sa_id,
188                          addr_any, addr_bcast,
189                          addr_any, addr_bcast,
190                          socket.IPPROTO_AH,
191                          is_outbound=0).add_vpp_config()
192
193         VppIpsecSpdEntry(self, self.tra_spd, vpp_tra_sa_id,
194                          self.tra_if.local_addr[addr_type],
195                          self.tra_if.local_addr[addr_type],
196                          self.tra_if.remote_addr[addr_type],
197                          self.tra_if.remote_addr[addr_type],
198                          0, priority=10,
199                          policy=e.IPSEC_API_SPD_ACTION_PROTECT,
200                          is_outbound=0).add_vpp_config()
201         VppIpsecSpdEntry(self, self.tra_spd, scapy_tra_sa_id,
202                          self.tra_if.local_addr[addr_type],
203                          self.tra_if.local_addr[addr_type],
204                          self.tra_if.remote_addr[addr_type],
205                          self.tra_if.remote_addr[addr_type],
206                          0, policy=e.IPSEC_API_SPD_ACTION_PROTECT,
207                          priority=10).add_vpp_config()
208
209
210 class TestIpsecAh1(TemplateIpsecAh, IpsecTcpTests):
211     """ Ipsec AH - TCP tests """
212     pass
213
214
215 class TestIpsecAh2(TemplateIpsecAh, IpsecTra46Tests, IpsecTun46Tests):
216     """ Ipsec AH w/ SHA1 """
217     tra4_encrypt_node_name = "ah4-encrypt"
218     tra4_decrypt_node_name = "ah4-decrypt"
219     tra6_encrypt_node_name = "ah6-encrypt"
220     tra6_decrypt_node_name = "ah6-decrypt"
221     tun4_encrypt_node_name = "ah4-encrypt"
222     tun4_decrypt_node_name = "ah4-decrypt"
223     tun6_encrypt_node_name = "ah6-encrypt"
224     tun6_decrypt_node_name = "ah6-decrypt"
225
226
227 class TestIpsecAh3(TemplateIpsecAh, IpsecTra46Tests, IpsecTun46Tests):
228     """ Ipsec AH w/ SHA1 & ESN """
229
230     tra4_encrypt_node_name = "ah4-encrypt"
231     tra4_decrypt_node_name = "ah4-decrypt"
232     tra6_encrypt_node_name = "ah6-encrypt"
233     tra6_decrypt_node_name = "ah6-decrypt"
234     tun4_encrypt_node_name = "ah4-encrypt"
235     tun4_decrypt_node_name = "ah4-decrypt"
236     tun6_encrypt_node_name = "ah6-encrypt"
237     tun6_decrypt_node_name = "ah6-decrypt"
238
239     def setup_params(self):
240         self.ipv4_params = IPsecIPv4Params()
241         self.ipv6_params = IPsecIPv6Params()
242         self.params = {self.ipv4_params.addr_type: self.ipv4_params,
243                        self.ipv6_params.addr_type: self.ipv6_params}
244         for _, p in self.params.items():
245             p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
246                        IPSEC_API_SAD_FLAG_USE_ESN)
247
248
249 class TestIpsecAh4(TemplateIpsecAh, IpsecTra46Tests, IpsecTun46Tests):
250     """ Ipsec AH w/ SHA256 """
251
252     tra4_encrypt_node_name = "ah4-encrypt"
253     tra4_decrypt_node_name = "ah4-decrypt"
254     tra6_encrypt_node_name = "ah6-encrypt"
255     tra6_decrypt_node_name = "ah6-decrypt"
256     tun4_encrypt_node_name = "ah4-encrypt"
257     tun4_decrypt_node_name = "ah4-decrypt"
258     tun6_encrypt_node_name = "ah6-encrypt"
259     tun6_decrypt_node_name = "ah6-decrypt"
260
261     def setup_params(self):
262         self.ipv4_params = IPsecIPv4Params()
263         self.ipv6_params = IPsecIPv6Params()
264         self.ipv4_params.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
265                                              IPSEC_API_INTEG_ALG_SHA_256_128)
266         self.ipv4_params.auth_algo = 'SHA2-256-128'  # scapy name
267         self.ipv6_params.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
268                                              IPSEC_API_INTEG_ALG_SHA_256_128)
269         self.ipv6_params.auth_algo = 'SHA2-256-128'  # scapy name
270
271         self.params = {self.ipv4_params.addr_type: self.ipv4_params,
272                        self.ipv6_params.addr_type: self.ipv6_params}
273
274
275 class TestIpsecAh5(TemplateIpsecAh, IpsecTra46Tests, IpsecTun46Tests):
276     """ Ipsec AH w/ SHA384 """
277
278     tra4_encrypt_node_name = "ah4-encrypt"
279     tra4_decrypt_node_name = "ah4-decrypt"
280     tra6_encrypt_node_name = "ah6-encrypt"
281     tra6_decrypt_node_name = "ah6-decrypt"
282     tun4_encrypt_node_name = "ah4-encrypt"
283     tun4_decrypt_node_name = "ah4-decrypt"
284     tun6_encrypt_node_name = "ah6-encrypt"
285     tun6_decrypt_node_name = "ah6-decrypt"
286
287     def setup_params(self):
288         self.ipv4_params = IPsecIPv4Params()
289         self.ipv6_params = IPsecIPv6Params()
290         self.ipv4_params.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
291                                              IPSEC_API_INTEG_ALG_SHA_384_192)
292         self.ipv4_params.auth_algo = 'SHA2-384-192'  # scapy name
293         self.ipv6_params.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
294                                              IPSEC_API_INTEG_ALG_SHA_384_192)
295         self.ipv6_params.auth_algo = 'SHA2-384-192'  # scapy name
296
297         self.params = {self.ipv4_params.addr_type: self.ipv4_params,
298                        self.ipv6_params.addr_type: self.ipv6_params}
299
300
301 class TestIpsecAh6(TemplateIpsecAh, IpsecTra46Tests, IpsecTun46Tests):
302     """ Ipsec AH w/ SHA512 """
303
304     tra4_encrypt_node_name = "ah4-encrypt"
305     tra4_decrypt_node_name = "ah4-decrypt"
306     tra6_encrypt_node_name = "ah6-encrypt"
307     tra6_decrypt_node_name = "ah6-decrypt"
308     tun4_encrypt_node_name = "ah4-encrypt"
309     tun4_decrypt_node_name = "ah4-decrypt"
310     tun6_encrypt_node_name = "ah6-encrypt"
311     tun6_decrypt_node_name = "ah6-decrypt"
312
313     def setup_params(self):
314         self.ipv4_params = IPsecIPv4Params()
315         self.ipv6_params = IPsecIPv6Params()
316         self.ipv4_params.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
317                                              IPSEC_API_INTEG_ALG_SHA_512_256)
318         self.ipv4_params.auth_algo = 'SHA2-512-256'  # scapy name
319         self.ipv6_params.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
320                                              IPSEC_API_INTEG_ALG_SHA_512_256)
321         self.ipv6_params.auth_algo = 'SHA2-512-256'  # scapy name
322
323         self.params = {self.ipv4_params.addr_type: self.ipv4_params,
324                        self.ipv6_params.addr_type: self.ipv6_params}
325
326
327 if __name__ == '__main__':
328     unittest.main(testRunner=VppTestRunner)