ipsec: remove dedicated IPSec tunnels
[vpp.git] / test / test_ipsec_ah.py
1 import socket
2 import unittest
3
4 from scapy.layers.ipsec import AH
5
6 from framework import VppTestRunner
7 from template_ipsec import TemplateIpsec, IpsecTra46Tests, IpsecTun46Tests, \
8     config_tun_params, config_tra_params, IPsecIPv4Params, IPsecIPv6Params, \
9     IpsecTra4, IpsecTun4, IpsecTra6, IpsecTun6
10 from template_ipsec import IpsecTcpTests
11 from vpp_ipsec import VppIpsecSA, VppIpsecSpd, VppIpsecSpdEntry,\
12         VppIpsecSpdItfBinding
13 from vpp_ip_route import VppIpRoute, VppRoutePath
14 from vpp_ip import DpoProto
15 from vpp_papi import VppEnum
16
17
18 class ConfigIpsecAH(TemplateIpsec):
19     """
20     Basic test for IPSEC using AH transport and Tunnel mode
21
22     TRANSPORT MODE:
23
24      ---   encrypt   ---
25     |pg2| <-------> |VPP|
26      ---   decrypt   ---
27
28     TUNNEL MODE:
29
30      ---   encrypt   ---   plain   ---
31     |pg0| <-------  |VPP| <------ |pg1|
32      ---             ---           ---
33
34      ---   decrypt   ---   plain   ---
35     |pg0| ------->  |VPP| ------> |pg1|
36      ---             ---           ---
37     """
38     encryption_type = AH
39     net_objs = []
40     tra4_encrypt_node_name = "ah4-encrypt"
41     tra4_decrypt_node_name = "ah4-decrypt"
42     tra6_encrypt_node_name = "ah6-encrypt"
43     tra6_decrypt_node_name = "ah6-decrypt"
44     tun4_encrypt_node_name = "ah4-encrypt"
45     tun4_decrypt_node_name = "ah4-decrypt"
46     tun6_encrypt_node_name = "ah6-encrypt"
47     tun6_decrypt_node_name = "ah6-decrypt"
48
49     @classmethod
50     def setUpClass(cls):
51         super(ConfigIpsecAH, cls).setUpClass()
52
53     @classmethod
54     def tearDownClass(cls):
55         super(ConfigIpsecAH, cls).tearDownClass()
56
57     def setUp(self):
58         super(ConfigIpsecAH, self).setUp()
59
60     def tearDown(self):
61         super(ConfigIpsecAH, self).tearDown()
62
63     def config_network(self, params):
64         self.net_objs = []
65         self.tun_if = self.pg0
66         self.tra_if = self.pg2
67         self.logger.info(self.vapi.ppcli("show int addr"))
68
69         self.tra_spd = VppIpsecSpd(self, self.tra_spd_id)
70         self.tra_spd.add_vpp_config()
71         self.net_objs.append(self.tra_spd)
72         self.tun_spd = VppIpsecSpd(self, self.tun_spd_id)
73         self.tun_spd.add_vpp_config()
74         self.net_objs.append(self.tun_spd)
75
76         b = VppIpsecSpdItfBinding(self, self.tra_spd,
77                                   self.tra_if)
78         b.add_vpp_config()
79         self.net_objs.append(b)
80
81         b = VppIpsecSpdItfBinding(self, self.tun_spd,
82                                   self.tun_if)
83         b.add_vpp_config()
84         self.net_objs.append(b)
85
86         for p in params:
87             self.config_ah_tra(p)
88             config_tra_params(p, self.encryption_type)
89         for p in params:
90             self.config_ah_tun(p)
91             config_tun_params(p, self.encryption_type, self.tun_if)
92         for p in params:
93             d = DpoProto.DPO_PROTO_IP6 if p.is_ipv6 else DpoProto.DPO_PROTO_IP4
94             r = VppIpRoute(self,  p.remote_tun_if_host, p.addr_len,
95                            [VppRoutePath(self.tun_if.remote_addr[p.addr_type],
96                                          0xffffffff,
97                                          proto=d)])
98             r.add_vpp_config()
99             self.net_objs.append(r)
100         self.logger.info(self.vapi.ppcli("show ipsec all"))
101
102     def unconfig_network(self):
103         for o in reversed(self.net_objs):
104             o.remove_vpp_config()
105         self.net_objs = []
106
107     def config_ah_tun(self, params):
108         addr_type = params.addr_type
109         scapy_tun_sa_id = params.scapy_tun_sa_id
110         scapy_tun_spi = params.scapy_tun_spi
111         vpp_tun_sa_id = params.vpp_tun_sa_id
112         vpp_tun_spi = params.vpp_tun_spi
113         auth_algo_vpp_id = params.auth_algo_vpp_id
114         auth_key = params.auth_key
115         crypt_algo_vpp_id = params.crypt_algo_vpp_id
116         crypt_key = params.crypt_key
117         remote_tun_if_host = params.remote_tun_if_host
118         addr_any = params.addr_any
119         addr_bcast = params.addr_bcast
120         flags = params.flags
121         e = VppEnum.vl_api_ipsec_spd_action_t
122         objs = []
123
124         params.tun_sa_in = VppIpsecSA(self, scapy_tun_sa_id, scapy_tun_spi,
125                                       auth_algo_vpp_id, auth_key,
126                                       crypt_algo_vpp_id, crypt_key,
127                                       self.vpp_ah_protocol,
128                                       self.tun_if.local_addr[addr_type],
129                                       self.tun_if.remote_addr[addr_type],
130                                       flags=flags)
131
132         params.tun_sa_out = VppIpsecSA(self, vpp_tun_sa_id, vpp_tun_spi,
133                                        auth_algo_vpp_id, auth_key,
134                                        crypt_algo_vpp_id, crypt_key,
135                                        self.vpp_ah_protocol,
136                                        self.tun_if.remote_addr[addr_type],
137                                        self.tun_if.local_addr[addr_type],
138                                        flags=flags)
139
140         objs.append(params.tun_sa_in)
141         objs.append(params.tun_sa_out)
142
143         params.spd_policy_in_any = VppIpsecSpdEntry(self, self.tun_spd,
144                                                     vpp_tun_sa_id,
145                                                     addr_any, addr_bcast,
146                                                     addr_any, addr_bcast,
147                                                     socket.IPPROTO_AH)
148         params.spd_policy_out_any = VppIpsecSpdEntry(self, self.tun_spd,
149                                                      vpp_tun_sa_id,
150                                                      addr_any, addr_bcast,
151                                                      addr_any, addr_bcast,
152                                                      socket.IPPROTO_AH,
153                                                      is_outbound=0)
154
155         objs.append(params.spd_policy_out_any)
156         objs.append(params.spd_policy_in_any)
157
158         e1 = VppIpsecSpdEntry(self, self.tun_spd, vpp_tun_sa_id,
159                               remote_tun_if_host,
160                               remote_tun_if_host,
161                               self.pg1.remote_addr[addr_type],
162                               self.pg1.remote_addr[addr_type],
163                               0, priority=10,
164                               policy=e.IPSEC_API_SPD_ACTION_PROTECT,
165                               is_outbound=0)
166         e2 = VppIpsecSpdEntry(self, self.tun_spd, scapy_tun_sa_id,
167                               self.pg1.remote_addr[addr_type],
168                               self.pg1.remote_addr[addr_type],
169                               remote_tun_if_host,
170                               remote_tun_if_host,
171                               0, policy=e.IPSEC_API_SPD_ACTION_PROTECT,
172                               priority=10)
173         e3 = VppIpsecSpdEntry(self, self.tun_spd, vpp_tun_sa_id,
174                               remote_tun_if_host,
175                               remote_tun_if_host,
176                               self.pg0.local_addr[addr_type],
177                               self.pg0.local_addr[addr_type],
178                               0, priority=20,
179                               policy=e.IPSEC_API_SPD_ACTION_PROTECT,
180                               is_outbound=0)
181         e4 = VppIpsecSpdEntry(self, self.tun_spd, scapy_tun_sa_id,
182                               self.pg0.local_addr[addr_type],
183                               self.pg0.local_addr[addr_type],
184                               remote_tun_if_host,
185                               remote_tun_if_host,
186                               0, policy=e.IPSEC_API_SPD_ACTION_PROTECT,
187                               priority=20)
188
189         objs = objs + [e1, e2, e3, e4]
190
191         for o in objs:
192             o.add_vpp_config()
193
194         self.net_objs = self.net_objs + objs
195
196     def config_ah_tra(self, params):
197         addr_type = params.addr_type
198         scapy_tra_sa_id = params.scapy_tra_sa_id
199         scapy_tra_spi = params.scapy_tra_spi
200         vpp_tra_sa_id = params.vpp_tra_sa_id
201         vpp_tra_spi = params.vpp_tra_spi
202         auth_algo_vpp_id = params.auth_algo_vpp_id
203         auth_key = params.auth_key
204         crypt_algo_vpp_id = params.crypt_algo_vpp_id
205         crypt_key = params.crypt_key
206         addr_any = params.addr_any
207         addr_bcast = params.addr_bcast
208         flags = params.flags | (VppEnum.vl_api_ipsec_sad_flags_t.
209                                 IPSEC_API_SAD_FLAG_USE_ANTI_REPLAY)
210         e = VppEnum.vl_api_ipsec_spd_action_t
211         objs = []
212
213         params.tra_sa_in = VppIpsecSA(self, scapy_tra_sa_id, scapy_tra_spi,
214                                       auth_algo_vpp_id, auth_key,
215                                       crypt_algo_vpp_id, crypt_key,
216                                       self.vpp_ah_protocol,
217                                       flags=flags)
218         params.tra_sa_out = VppIpsecSA(self, vpp_tra_sa_id, vpp_tra_spi,
219                                        auth_algo_vpp_id, auth_key,
220                                        crypt_algo_vpp_id, crypt_key,
221                                        self.vpp_ah_protocol,
222                                        flags=flags)
223
224         objs.append(params.tra_sa_in)
225         objs.append(params.tra_sa_out)
226
227         objs.append(VppIpsecSpdEntry(self, self.tra_spd, vpp_tra_sa_id,
228                                      addr_any, addr_bcast,
229                                      addr_any, addr_bcast,
230                                      socket.IPPROTO_AH))
231         objs.append(VppIpsecSpdEntry(self, self.tra_spd, scapy_tra_sa_id,
232                                      addr_any, addr_bcast,
233                                      addr_any, addr_bcast,
234                                      socket.IPPROTO_AH,
235                                      is_outbound=0))
236         objs.append(VppIpsecSpdEntry(self, self.tra_spd, vpp_tra_sa_id,
237                                      self.tra_if.local_addr[addr_type],
238                                      self.tra_if.local_addr[addr_type],
239                                      self.tra_if.remote_addr[addr_type],
240                                      self.tra_if.remote_addr[addr_type],
241                                      0, priority=10,
242                                      policy=e.IPSEC_API_SPD_ACTION_PROTECT,
243                                      is_outbound=0))
244         objs.append(VppIpsecSpdEntry(self, self.tra_spd, scapy_tra_sa_id,
245                                      self.tra_if.local_addr[addr_type],
246                                      self.tra_if.local_addr[addr_type],
247                                      self.tra_if.remote_addr[addr_type],
248                                      self.tra_if.remote_addr[addr_type],
249                                      0, policy=e.IPSEC_API_SPD_ACTION_PROTECT,
250                                      priority=10))
251
252         for o in objs:
253             o.add_vpp_config()
254         self.net_objs = self.net_objs + objs
255
256
257 class TemplateIpsecAh(ConfigIpsecAH):
258     """
259     Basic test for IPSEC using AH transport and Tunnel mode
260
261     TRANSPORT MODE:
262
263      ---   encrypt   ---
264     |pg2| <-------> |VPP|
265      ---   decrypt   ---
266
267     TUNNEL MODE:
268
269      ---   encrypt   ---   plain   ---
270     |pg0| <-------  |VPP| <------ |pg1|
271      ---             ---           ---
272
273      ---   decrypt   ---   plain   ---
274     |pg0| ------->  |VPP| ------> |pg1|
275      ---             ---           ---
276     """
277     @classmethod
278     def setUpClass(cls):
279         super(TemplateIpsecAh, cls).setUpClass()
280
281     @classmethod
282     def tearDownClass(cls):
283         super(TemplateIpsecAh, cls).tearDownClass()
284
285     def setUp(self):
286         super(TemplateIpsecAh, self).setUp()
287         self.config_network(self.params.values())
288
289     def tearDown(self):
290         self.unconfig_network()
291         super(TemplateIpsecAh, self).tearDown()
292
293
294 class TestIpsecAh1(TemplateIpsecAh, IpsecTcpTests):
295     """ Ipsec AH - TCP tests """
296     pass
297
298
299 class TestIpsecAh2(TemplateIpsecAh, IpsecTra46Tests, IpsecTun46Tests):
300     """ Ipsec AH w/ SHA1 """
301     pass
302
303
304 class TestIpsecAhAll(ConfigIpsecAH,
305                      IpsecTra4, IpsecTra6,
306                      IpsecTun4, IpsecTun6):
307     """ Ipsec AH all Algos """
308
309     def setUp(self):
310         super(TestIpsecAhAll, self).setUp()
311
312     def tearDown(self):
313         super(TestIpsecAhAll, self).tearDown()
314
315     def test_integ_algs(self):
316         """All Engines SHA[1_96, 256, 384, 512] w/ & w/o ESN"""
317         # foreach VPP crypto engine
318         engines = ["ia32", "ipsecmb", "openssl"]
319
320         algos = [{'vpp': VppEnum.vl_api_ipsec_integ_alg_t.
321                   IPSEC_API_INTEG_ALG_SHA1_96,
322                   'scapy': "HMAC-SHA1-96"},
323                  {'vpp': VppEnum.vl_api_ipsec_integ_alg_t.
324                   IPSEC_API_INTEG_ALG_SHA_256_128,
325                   'scapy': "SHA2-256-128"},
326                  {'vpp': VppEnum.vl_api_ipsec_integ_alg_t.
327                   IPSEC_API_INTEG_ALG_SHA_384_192,
328                   'scapy': "SHA2-384-192"},
329                  {'vpp': VppEnum.vl_api_ipsec_integ_alg_t.
330                   IPSEC_API_INTEG_ALG_SHA_512_256,
331                   'scapy': "SHA2-512-256"}]
332
333         flags = [0, (VppEnum.vl_api_ipsec_sad_flags_t.
334                      IPSEC_API_SAD_FLAG_USE_ESN)]
335
336         #
337         # loop through the VPP engines
338         #
339         for engine in engines:
340             self.vapi.cli("set crypto handler all %s" % engine)
341             #
342             # loop through each of the algorithms
343             #
344             for algo in algos:
345                 # with self.subTest(algo=algo['scapy']):
346                 for flag in flags:
347                     #
348                     # setup up the config paramters
349                     #
350                     self.ipv4_params = IPsecIPv4Params()
351                     self.ipv6_params = IPsecIPv6Params()
352
353                     self.params = {self.ipv4_params.addr_type:
354                                    self.ipv4_params,
355                                    self.ipv6_params.addr_type:
356                                    self.ipv6_params}
357
358                     for _, p in self.params.items():
359                         p.auth_algo_vpp_id = algo['vpp']
360                         p.auth_algo = algo['scapy']
361                         p.flags = p.flags | flag
362
363                     #
364                     # configure the SPDs. SAs, etc
365                     #
366                     self.config_network(self.params.values())
367
368                     #
369                     # run some traffic.
370                     #  An exhautsive 4o6, 6o4 is not necessary for each algo
371                     #
372                     self.verify_tra_basic6(count=17)
373                     self.verify_tra_basic4(count=17)
374                     self.verify_tun_66(self.params[socket.AF_INET6], count=17)
375                     self.verify_tun_44(self.params[socket.AF_INET], count=17)
376
377                     #
378                     # remove the SPDs, SAs, etc
379                     #
380                     self.unconfig_network()
381
382
383 if __name__ == '__main__':
384     unittest.main(testRunner=VppTestRunner)