fib: fib api updates
[vpp.git] / test / test_ipsec_ah.py
1 import socket
2 import unittest
3
4 from scapy.layers.ipsec import AH
5
6 from framework import VppTestRunner
7 from template_ipsec import TemplateIpsec, IpsecTra46Tests, IpsecTun46Tests, \
8     config_tun_params, config_tra_params, IPsecIPv4Params, IPsecIPv6Params, \
9     IpsecTra4, IpsecTun4, IpsecTra6, IpsecTun6
10 from template_ipsec import IpsecTcpTests
11 from vpp_ipsec import VppIpsecSA, VppIpsecSpd, VppIpsecSpdEntry,\
12         VppIpsecSpdItfBinding
13 from vpp_ip_route import VppIpRoute, VppRoutePath
14 from vpp_ip import DpoProto
15 from vpp_papi import VppEnum
16
17
18 class ConfigIpsecAH(TemplateIpsec):
19     """
20     Basic test for IPSEC using AH transport and Tunnel mode
21
22     TRANSPORT MODE:
23
24      ---   encrypt   ---
25     |pg2| <-------> |VPP|
26      ---   decrypt   ---
27
28     TUNNEL MODE:
29
30      ---   encrypt   ---   plain   ---
31     |pg0| <-------  |VPP| <------ |pg1|
32      ---             ---           ---
33
34      ---   decrypt   ---   plain   ---
35     |pg0| ------->  |VPP| ------> |pg1|
36      ---             ---           ---
37     """
38     encryption_type = AH
39     net_objs = []
40     tra4_encrypt_node_name = "ah4-encrypt"
41     tra4_decrypt_node_name = "ah4-decrypt"
42     tra6_encrypt_node_name = "ah6-encrypt"
43     tra6_decrypt_node_name = "ah6-decrypt"
44     tun4_encrypt_node_name = "ah4-encrypt"
45     tun4_decrypt_node_name = "ah4-decrypt"
46     tun6_encrypt_node_name = "ah6-encrypt"
47     tun6_decrypt_node_name = "ah6-decrypt"
48
49     @classmethod
50     def setUpClass(cls):
51         super(ConfigIpsecAH, cls).setUpClass()
52
53     @classmethod
54     def tearDownClass(cls):
55         super(ConfigIpsecAH, cls).tearDownClass()
56
57     def setUp(self):
58         super(ConfigIpsecAH, self).setUp()
59
60     def tearDown(self):
61         super(ConfigIpsecAH, self).tearDown()
62
63     def config_network(self, params):
64         self.net_objs = []
65         self.tun_if = self.pg0
66         self.tra_if = self.pg2
67         self.logger.info(self.vapi.ppcli("show int addr"))
68
69         self.tra_spd = VppIpsecSpd(self, self.tra_spd_id)
70         self.tra_spd.add_vpp_config()
71         self.net_objs.append(self.tra_spd)
72         self.tun_spd = VppIpsecSpd(self, self.tun_spd_id)
73         self.tun_spd.add_vpp_config()
74         self.net_objs.append(self.tun_spd)
75
76         b = VppIpsecSpdItfBinding(self, self.tra_spd,
77                                   self.tra_if)
78         b.add_vpp_config()
79         self.net_objs.append(b)
80
81         b = VppIpsecSpdItfBinding(self, self.tun_spd,
82                                   self.tun_if)
83         b.add_vpp_config()
84         self.net_objs.append(b)
85
86         for p in params:
87             self.config_ah_tra(p)
88             config_tra_params(p, self.encryption_type)
89         for p in params:
90             self.config_ah_tun(p)
91         for p in params:
92             d = DpoProto.DPO_PROTO_IP6 if p.is_ipv6 else DpoProto.DPO_PROTO_IP4
93             r = VppIpRoute(self,  p.remote_tun_if_host, p.addr_len,
94                            [VppRoutePath(self.tun_if.remote_addr[p.addr_type],
95                                          0xffffffff,
96                                          proto=d)])
97             r.add_vpp_config()
98             self.net_objs.append(r)
99         self.logger.info(self.vapi.ppcli("show ipsec all"))
100
101     def unconfig_network(self):
102         for o in reversed(self.net_objs):
103             o.remove_vpp_config()
104         self.net_objs = []
105
106     def config_ah_tun(self, params):
107         addr_type = params.addr_type
108         scapy_tun_sa_id = params.scapy_tun_sa_id
109         scapy_tun_spi = params.scapy_tun_spi
110         vpp_tun_sa_id = params.vpp_tun_sa_id
111         vpp_tun_spi = params.vpp_tun_spi
112         auth_algo_vpp_id = params.auth_algo_vpp_id
113         auth_key = params.auth_key
114         crypt_algo_vpp_id = params.crypt_algo_vpp_id
115         crypt_key = params.crypt_key
116         remote_tun_if_host = params.remote_tun_if_host
117         addr_any = params.addr_any
118         addr_bcast = params.addr_bcast
119         flags = params.flags
120         e = VppEnum.vl_api_ipsec_spd_action_t
121         objs = []
122
123         params.tun_sa_in = VppIpsecSA(self, scapy_tun_sa_id, scapy_tun_spi,
124                                       auth_algo_vpp_id, auth_key,
125                                       crypt_algo_vpp_id, crypt_key,
126                                       self.vpp_ah_protocol,
127                                       self.tun_if.local_addr[addr_type],
128                                       self.tun_if.remote_addr[addr_type],
129                                       flags=flags)
130
131         params.tun_sa_out = VppIpsecSA(self, vpp_tun_sa_id, vpp_tun_spi,
132                                        auth_algo_vpp_id, auth_key,
133                                        crypt_algo_vpp_id, crypt_key,
134                                        self.vpp_ah_protocol,
135                                        self.tun_if.remote_addr[addr_type],
136                                        self.tun_if.local_addr[addr_type],
137                                        flags=flags)
138
139         objs.append(params.tun_sa_in)
140         objs.append(params.tun_sa_out)
141
142         params.spd_policy_in_any = VppIpsecSpdEntry(self, self.tun_spd,
143                                                     vpp_tun_sa_id,
144                                                     addr_any, addr_bcast,
145                                                     addr_any, addr_bcast,
146                                                     socket.IPPROTO_AH)
147         params.spd_policy_out_any = VppIpsecSpdEntry(self, self.tun_spd,
148                                                      vpp_tun_sa_id,
149                                                      addr_any, addr_bcast,
150                                                      addr_any, addr_bcast,
151                                                      socket.IPPROTO_AH,
152                                                      is_outbound=0)
153
154         objs.append(params.spd_policy_out_any)
155         objs.append(params.spd_policy_in_any)
156
157         e1 = VppIpsecSpdEntry(self, self.tun_spd, vpp_tun_sa_id,
158                               remote_tun_if_host,
159                               remote_tun_if_host,
160                               self.pg1.remote_addr[addr_type],
161                               self.pg1.remote_addr[addr_type],
162                               0, priority=10,
163                               policy=e.IPSEC_API_SPD_ACTION_PROTECT,
164                               is_outbound=0)
165         e2 = VppIpsecSpdEntry(self, self.tun_spd, scapy_tun_sa_id,
166                               self.pg1.remote_addr[addr_type],
167                               self.pg1.remote_addr[addr_type],
168                               remote_tun_if_host,
169                               remote_tun_if_host,
170                               0, policy=e.IPSEC_API_SPD_ACTION_PROTECT,
171                               priority=10)
172         e3 = VppIpsecSpdEntry(self, self.tun_spd, vpp_tun_sa_id,
173                               remote_tun_if_host,
174                               remote_tun_if_host,
175                               self.pg0.local_addr[addr_type],
176                               self.pg0.local_addr[addr_type],
177                               0, priority=20,
178                               policy=e.IPSEC_API_SPD_ACTION_PROTECT,
179                               is_outbound=0)
180         e4 = VppIpsecSpdEntry(self, self.tun_spd, scapy_tun_sa_id,
181                               self.pg0.local_addr[addr_type],
182                               self.pg0.local_addr[addr_type],
183                               remote_tun_if_host,
184                               remote_tun_if_host,
185                               0, policy=e.IPSEC_API_SPD_ACTION_PROTECT,
186                               priority=20)
187
188         objs = objs + [e1, e2, e3, e4]
189
190         for o in objs:
191             o.add_vpp_config()
192
193         self.net_objs = self.net_objs + objs
194
195     def config_ah_tra(self, params):
196         addr_type = params.addr_type
197         scapy_tra_sa_id = params.scapy_tra_sa_id
198         scapy_tra_spi = params.scapy_tra_spi
199         vpp_tra_sa_id = params.vpp_tra_sa_id
200         vpp_tra_spi = params.vpp_tra_spi
201         auth_algo_vpp_id = params.auth_algo_vpp_id
202         auth_key = params.auth_key
203         crypt_algo_vpp_id = params.crypt_algo_vpp_id
204         crypt_key = params.crypt_key
205         addr_any = params.addr_any
206         addr_bcast = params.addr_bcast
207         flags = params.flags | (VppEnum.vl_api_ipsec_sad_flags_t.
208                                 IPSEC_API_SAD_FLAG_USE_ANTI_REPLAY)
209         e = VppEnum.vl_api_ipsec_spd_action_t
210         objs = []
211
212         params.tra_sa_in = VppIpsecSA(self, scapy_tra_sa_id, scapy_tra_spi,
213                                       auth_algo_vpp_id, auth_key,
214                                       crypt_algo_vpp_id, crypt_key,
215                                       self.vpp_ah_protocol,
216                                       flags=flags)
217         params.tra_sa_out = VppIpsecSA(self, vpp_tra_sa_id, vpp_tra_spi,
218                                        auth_algo_vpp_id, auth_key,
219                                        crypt_algo_vpp_id, crypt_key,
220                                        self.vpp_ah_protocol,
221                                        flags=flags)
222
223         objs.append(params.tra_sa_in)
224         objs.append(params.tra_sa_out)
225
226         objs.append(VppIpsecSpdEntry(self, self.tra_spd, vpp_tra_sa_id,
227                                      addr_any, addr_bcast,
228                                      addr_any, addr_bcast,
229                                      socket.IPPROTO_AH))
230         objs.append(VppIpsecSpdEntry(self, self.tra_spd, scapy_tra_sa_id,
231                                      addr_any, addr_bcast,
232                                      addr_any, addr_bcast,
233                                      socket.IPPROTO_AH,
234                                      is_outbound=0))
235         objs.append(VppIpsecSpdEntry(self, self.tra_spd, vpp_tra_sa_id,
236                                      self.tra_if.local_addr[addr_type],
237                                      self.tra_if.local_addr[addr_type],
238                                      self.tra_if.remote_addr[addr_type],
239                                      self.tra_if.remote_addr[addr_type],
240                                      0, priority=10,
241                                      policy=e.IPSEC_API_SPD_ACTION_PROTECT,
242                                      is_outbound=0))
243         objs.append(VppIpsecSpdEntry(self, self.tra_spd, scapy_tra_sa_id,
244                                      self.tra_if.local_addr[addr_type],
245                                      self.tra_if.local_addr[addr_type],
246                                      self.tra_if.remote_addr[addr_type],
247                                      self.tra_if.remote_addr[addr_type],
248                                      0, policy=e.IPSEC_API_SPD_ACTION_PROTECT,
249                                      priority=10))
250
251         for o in objs:
252             o.add_vpp_config()
253         self.net_objs = self.net_objs + objs
254
255
256 class TemplateIpsecAh(ConfigIpsecAH):
257     """
258     Basic test for IPSEC using AH transport and Tunnel mode
259
260     TRANSPORT MODE:
261
262      ---   encrypt   ---
263     |pg2| <-------> |VPP|
264      ---   decrypt   ---
265
266     TUNNEL MODE:
267
268      ---   encrypt   ---   plain   ---
269     |pg0| <-------  |VPP| <------ |pg1|
270      ---             ---           ---
271
272      ---   decrypt   ---   plain   ---
273     |pg0| ------->  |VPP| ------> |pg1|
274      ---             ---           ---
275     """
276     @classmethod
277     def setUpClass(cls):
278         super(TemplateIpsecAh, cls).setUpClass()
279
280     @classmethod
281     def tearDownClass(cls):
282         super(TemplateIpsecAh, cls).tearDownClass()
283
284     def setUp(self):
285         super(TemplateIpsecAh, self).setUp()
286         self.config_network(self.params.values())
287
288     def tearDown(self):
289         self.unconfig_network()
290         super(TemplateIpsecAh, self).tearDown()
291
292
293 class TestIpsecAh1(TemplateIpsecAh, IpsecTcpTests):
294     """ Ipsec AH - TCP tests """
295     pass
296
297
298 class TestIpsecAh2(TemplateIpsecAh, IpsecTra46Tests, IpsecTun46Tests):
299     """ Ipsec AH w/ SHA1 """
300     pass
301
302
303 class TestIpsecAhAll(ConfigIpsecAH,
304                      IpsecTra4, IpsecTra6,
305                      IpsecTun4, IpsecTun6):
306     """ Ipsec AH all Algos """
307
308     def setUp(self):
309         super(TestIpsecAhAll, self).setUp()
310
311     def tearDown(self):
312         super(TestIpsecAhAll, self).tearDown()
313
314     def test_integ_algs(self):
315         """All Engines SHA[1_96, 256, 384, 512] w/ & w/o ESN"""
316         # foreach VPP crypto engine
317         engines = ["ia32", "ipsecmb", "openssl"]
318
319         algos = [{'vpp': VppEnum.vl_api_ipsec_integ_alg_t.
320                   IPSEC_API_INTEG_ALG_SHA1_96,
321                   'scapy': "HMAC-SHA1-96"},
322                  {'vpp': VppEnum.vl_api_ipsec_integ_alg_t.
323                   IPSEC_API_INTEG_ALG_SHA_256_128,
324                   'scapy': "SHA2-256-128"},
325                  {'vpp': VppEnum.vl_api_ipsec_integ_alg_t.
326                   IPSEC_API_INTEG_ALG_SHA_384_192,
327                   'scapy': "SHA2-384-192"},
328                  {'vpp': VppEnum.vl_api_ipsec_integ_alg_t.
329                   IPSEC_API_INTEG_ALG_SHA_512_256,
330                   'scapy': "SHA2-512-256"}]
331
332         flags = [0, (VppEnum.vl_api_ipsec_sad_flags_t.
333                      IPSEC_API_SAD_FLAG_USE_ESN)]
334
335         #
336         # loop through the VPP engines
337         #
338         for engine in engines:
339             self.vapi.cli("set crypto handler all %s" % engine)
340             #
341             # loop through each of the algorithms
342             #
343             for algo in algos:
344                 # with self.subTest(algo=algo['scapy']):
345                 for flag in flags:
346                     #
347                     # setup up the config paramters
348                     #
349                     self.ipv4_params = IPsecIPv4Params()
350                     self.ipv6_params = IPsecIPv6Params()
351
352                     self.params = {self.ipv4_params.addr_type:
353                                    self.ipv4_params,
354                                    self.ipv6_params.addr_type:
355                                    self.ipv6_params}
356
357                     for _, p in self.params.items():
358                         p.auth_algo_vpp_id = algo['vpp']
359                         p.auth_algo = algo['scapy']
360                         p.flags = p.flags | flag
361
362                     #
363                     # configure the SPDs. SAs, etc
364                     #
365                     self.config_network(self.params.values())
366
367                     #
368                     # run some traffic.
369                     #  An exhautsive 4o6, 6o4 is not necessary for each algo
370                     #
371                     self.verify_tra_basic6(count=17)
372                     self.verify_tra_basic4(count=17)
373                     self.verify_tun_66(self.params[socket.AF_INET6], count=17)
374                     self.verify_tun_44(self.params[socket.AF_INET], count=17)
375
376                     #
377                     # remove the SPDs, SAs, etc
378                     #
379                     self.unconfig_network()
380
381
382 if __name__ == '__main__':
383     unittest.main(testRunner=VppTestRunner)