566ed3474181c11bb5cc29e887aca0c5504b4e28
[vpp.git] / test / test_ipsec_esp.py
1 import socket
2 import unittest
3 import struct
4 from scapy.layers.ipsec import ESP
5 from scapy.layers.inet import UDP
6
7 from framework import VppTestRunner
8 from template_ipsec import IpsecTra46Tests, IpsecTun46Tests, TemplateIpsec, \
9     IpsecTcpTests, IpsecTun4Tests, IpsecTra4Tests, config_tra_params, \
10     IPsecIPv4Params, IPsecIPv6Params, \
11     IpsecTra4, IpsecTun4, IpsecTra6, IpsecTun6
12 from vpp_ipsec import VppIpsecSpd, VppIpsecSpdEntry, VppIpsecSA,\
13     VppIpsecSpdItfBinding
14 from vpp_ip_route import VppIpRoute, VppRoutePath
15 from vpp_ip import DpoProto
16 from vpp_papi import VppEnum
17
18
19 class ConfigIpsecESP(TemplateIpsec):
20     encryption_type = ESP
21     tra4_encrypt_node_name = "esp4-encrypt"
22     tra4_decrypt_node_name = "esp4-decrypt"
23     tra6_encrypt_node_name = "esp6-encrypt"
24     tra6_decrypt_node_name = "esp6-decrypt"
25     tun4_encrypt_node_name = "esp4-encrypt"
26     tun4_decrypt_node_name = "esp4-decrypt"
27     tun6_encrypt_node_name = "esp6-encrypt"
28     tun6_decrypt_node_name = "esp6-decrypt"
29
30     @classmethod
31     def setUpClass(cls):
32         super(ConfigIpsecESP, cls).setUpClass()
33
34     @classmethod
35     def tearDownClass(cls):
36         super(ConfigIpsecESP, cls).tearDownClass()
37
38     def setUp(self):
39         super(ConfigIpsecESP, self).setUp()
40
41     def tearDown(self):
42         super(ConfigIpsecESP, self).tearDown()
43
44     def config_network(self, params):
45         self.net_objs = []
46         self.tun_if = self.pg0
47         self.tra_if = self.pg2
48         self.logger.info(self.vapi.ppcli("show int addr"))
49
50         self.tra_spd = VppIpsecSpd(self, self.tra_spd_id)
51         self.tra_spd.add_vpp_config()
52         self.net_objs.append(self.tra_spd)
53         self.tun_spd = VppIpsecSpd(self, self.tun_spd_id)
54         self.tun_spd.add_vpp_config()
55         self.net_objs.append(self.tun_spd)
56
57         b = VppIpsecSpdItfBinding(self, self.tun_spd,
58                                   self.tun_if)
59         b.add_vpp_config()
60         self.net_objs.append(b)
61
62         b = VppIpsecSpdItfBinding(self, self.tra_spd,
63                                   self.tra_if)
64         b.add_vpp_config()
65         self.net_objs.append(b)
66
67         for p in params:
68             self.config_esp_tra(p)
69             config_tra_params(p, self.encryption_type)
70         for p in params:
71             self.config_esp_tun(p)
72
73         for p in params:
74             d = DpoProto.DPO_PROTO_IP6 if p.is_ipv6 else DpoProto.DPO_PROTO_IP4
75             r = VppIpRoute(self,  p.remote_tun_if_host, p.addr_len,
76                            [VppRoutePath(self.tun_if.remote_addr[p.addr_type],
77                                          0xffffffff,
78                                          proto=d)],
79                            is_ip6=p.is_ipv6)
80             r.add_vpp_config()
81             self.net_objs.append(r)
82
83         self.logger.info(self.vapi.ppcli("show ipsec all"))
84
85     def unconfig_network(self):
86         for o in reversed(self.net_objs):
87             o.remove_vpp_config()
88         self.net_objs = []
89
90     def config_esp_tun(self, params):
91         addr_type = params.addr_type
92         scapy_tun_sa_id = params.scapy_tun_sa_id
93         scapy_tun_spi = params.scapy_tun_spi
94         vpp_tun_sa_id = params.vpp_tun_sa_id
95         vpp_tun_spi = params.vpp_tun_spi
96         auth_algo_vpp_id = params.auth_algo_vpp_id
97         auth_key = params.auth_key
98         crypt_algo_vpp_id = params.crypt_algo_vpp_id
99         crypt_key = params.crypt_key
100         remote_tun_if_host = params.remote_tun_if_host
101         addr_any = params.addr_any
102         addr_bcast = params.addr_bcast
103         e = VppEnum.vl_api_ipsec_spd_action_t
104         flags = params.flags
105         objs = []
106
107         params.tun_sa_in = VppIpsecSA(self, scapy_tun_sa_id, scapy_tun_spi,
108                                       auth_algo_vpp_id, auth_key,
109                                       crypt_algo_vpp_id, crypt_key,
110                                       self.vpp_esp_protocol,
111                                       self.tun_if.local_addr[addr_type],
112                                       self.tun_if.remote_addr[addr_type],
113                                       flags=flags)
114         params.tun_sa_out = VppIpsecSA(self, vpp_tun_sa_id, vpp_tun_spi,
115                                        auth_algo_vpp_id, auth_key,
116                                        crypt_algo_vpp_id, crypt_key,
117                                        self.vpp_esp_protocol,
118                                        self.tun_if.remote_addr[addr_type],
119                                        self.tun_if.local_addr[addr_type],
120                                        flags=flags)
121         objs.append(params.tun_sa_in)
122         objs.append(params.tun_sa_out)
123
124         params.spd_policy_in_any = VppIpsecSpdEntry(self, self.tun_spd,
125                                                     scapy_tun_sa_id,
126                                                     addr_any, addr_bcast,
127                                                     addr_any, addr_bcast,
128                                                     socket.IPPROTO_ESP)
129         params.spd_policy_out_any = VppIpsecSpdEntry(self, self.tun_spd,
130                                                      scapy_tun_sa_id,
131                                                      addr_any, addr_bcast,
132                                                      addr_any, addr_bcast,
133                                                      socket.IPPROTO_ESP,
134                                                      is_outbound=0)
135         objs.append(params.spd_policy_out_any)
136         objs.append(params.spd_policy_in_any)
137
138         objs.append(VppIpsecSpdEntry(self, self.tun_spd, vpp_tun_sa_id,
139                                      remote_tun_if_host, remote_tun_if_host,
140                                      self.pg1.remote_addr[addr_type],
141                                      self.pg1.remote_addr[addr_type],
142                                      0,
143                                      priority=10,
144                                      policy=e.IPSEC_API_SPD_ACTION_PROTECT,
145                                      is_outbound=0))
146         objs.append(VppIpsecSpdEntry(self, self.tun_spd, scapy_tun_sa_id,
147                                      self.pg1.remote_addr[addr_type],
148                                      self.pg1.remote_addr[addr_type],
149                                      remote_tun_if_host, remote_tun_if_host,
150                                      0,
151                                      policy=e.IPSEC_API_SPD_ACTION_PROTECT,
152                                      priority=10))
153         objs.append(VppIpsecSpdEntry(self, self.tun_spd, vpp_tun_sa_id,
154                                      remote_tun_if_host, remote_tun_if_host,
155                                      self.pg0.local_addr[addr_type],
156                                      self.pg0.local_addr[addr_type],
157                                      0,
158                                      priority=20,
159                                      policy=e.IPSEC_API_SPD_ACTION_PROTECT,
160                                      is_outbound=0))
161         objs.append(VppIpsecSpdEntry(self, self.tun_spd, scapy_tun_sa_id,
162                                      self.pg0.local_addr[addr_type],
163                                      self.pg0.local_addr[addr_type],
164                                      remote_tun_if_host, remote_tun_if_host,
165                                      0,
166                                      policy=e.IPSEC_API_SPD_ACTION_PROTECT,
167                                      priority=20))
168         for o in objs:
169             o.add_vpp_config()
170         self.net_objs = self.net_objs + objs
171
172     def config_esp_tra(self, params):
173         addr_type = params.addr_type
174         scapy_tra_sa_id = params.scapy_tra_sa_id
175         scapy_tra_spi = params.scapy_tra_spi
176         vpp_tra_sa_id = params.vpp_tra_sa_id
177         vpp_tra_spi = params.vpp_tra_spi
178         auth_algo_vpp_id = params.auth_algo_vpp_id
179         auth_key = params.auth_key
180         crypt_algo_vpp_id = params.crypt_algo_vpp_id
181         crypt_key = params.crypt_key
182         addr_any = params.addr_any
183         addr_bcast = params.addr_bcast
184         flags = (VppEnum.vl_api_ipsec_sad_flags_t.
185                  IPSEC_API_SAD_FLAG_USE_ANTI_REPLAY)
186         e = VppEnum.vl_api_ipsec_spd_action_t
187         flags = params.flags | flags
188         objs = []
189
190         params.tra_sa_in = VppIpsecSA(self, scapy_tra_sa_id, scapy_tra_spi,
191                                       auth_algo_vpp_id, auth_key,
192                                       crypt_algo_vpp_id, crypt_key,
193                                       self.vpp_esp_protocol,
194                                       flags=flags)
195         params.tra_sa_out = VppIpsecSA(self, vpp_tra_sa_id, vpp_tra_spi,
196                                        auth_algo_vpp_id, auth_key,
197                                        crypt_algo_vpp_id, crypt_key,
198                                        self.vpp_esp_protocol,
199                                        flags=flags)
200         objs.append(params.tra_sa_in)
201         objs.append(params.tra_sa_out)
202
203         objs.append(VppIpsecSpdEntry(self, self.tra_spd, vpp_tra_sa_id,
204                                      addr_any, addr_bcast,
205                                      addr_any, addr_bcast,
206                                      socket.IPPROTO_ESP))
207         objs.append(VppIpsecSpdEntry(self, self.tra_spd, vpp_tra_sa_id,
208                                      addr_any, addr_bcast,
209                                      addr_any, addr_bcast,
210                                      socket.IPPROTO_ESP,
211                                      is_outbound=0))
212         objs.append(VppIpsecSpdEntry(self, self.tra_spd, vpp_tra_sa_id,
213                                      self.tra_if.local_addr[addr_type],
214                                      self.tra_if.local_addr[addr_type],
215                                      self.tra_if.remote_addr[addr_type],
216                                      self.tra_if.remote_addr[addr_type],
217                                      0, priority=10,
218                                      policy=e.IPSEC_API_SPD_ACTION_PROTECT,
219                                      is_outbound=0))
220         objs.append(VppIpsecSpdEntry(self, self.tra_spd, scapy_tra_sa_id,
221                                      self.tra_if.local_addr[addr_type],
222                                      self.tra_if.local_addr[addr_type],
223                                      self.tra_if.remote_addr[addr_type],
224                                      self.tra_if.remote_addr[addr_type],
225                                      0, policy=e.IPSEC_API_SPD_ACTION_PROTECT,
226                                      priority=10))
227         for o in objs:
228             o.add_vpp_config()
229         self.net_objs = self.net_objs + objs
230
231
232 class TemplateIpsecEsp(ConfigIpsecESP):
233     """
234     Basic test for ipsec esp sanity - tunnel and transport modes.
235
236     Below 4 cases are covered as part of this test
237     1) ipsec esp v4 transport basic test  - IPv4 Transport mode
238         scenario using HMAC-SHA1-96 integrity algo
239     2) ipsec esp v4 transport burst test
240         Above test for 257 pkts
241     3) ipsec esp 4o4 tunnel basic test    - IPv4 Tunnel mode
242         scenario using HMAC-SHA1-96 integrity algo
243     4) ipsec esp 4o4 tunnel burst test
244         Above test for 257 pkts
245
246     TRANSPORT MODE:
247
248      ---   encrypt   ---
249     |pg2| <-------> |VPP|
250      ---   decrypt   ---
251
252     TUNNEL MODE:
253
254      ---   encrypt   ---   plain   ---
255     |pg0| <-------  |VPP| <------ |pg1|
256      ---             ---           ---
257
258      ---   decrypt   ---   plain   ---
259     |pg0| ------->  |VPP| ------> |pg1|
260      ---             ---           ---
261     """
262
263     @classmethod
264     def setUpClass(cls):
265         super(TemplateIpsecEsp, cls).setUpClass()
266
267     @classmethod
268     def tearDownClass(cls):
269         super(TemplateIpsecEsp, cls).tearDownClass()
270
271     def setUp(self):
272         super(TemplateIpsecEsp, self).setUp()
273         self.config_network(self.params.values())
274
275     def tearDown(self):
276         self.unconfig_network()
277         super(TemplateIpsecEsp, self).tearDown()
278
279
280 class TestIpsecEsp1(TemplateIpsecEsp, IpsecTra46Tests, IpsecTun46Tests):
281     """ Ipsec ESP - TUN & TRA tests """
282     pass
283
284
285 class TestIpsecEsp2(TemplateIpsecEsp, IpsecTcpTests):
286     """ Ipsec ESP - TCP tests """
287     pass
288
289
290 class TemplateIpsecEspUdp(ConfigIpsecESP):
291     """
292     UDP encapped ESP
293     """
294
295     @classmethod
296     def setUpClass(cls):
297         super(TemplateIpsecEspUdp, cls).setUpClass()
298
299     @classmethod
300     def tearDownClass(cls):
301         super(TemplateIpsecEspUdp, cls).tearDownClass()
302
303     def setUp(self):
304         super(TemplateIpsecEspUdp, self).setUp()
305         self.net_objs = []
306         self.tun_if = self.pg0
307         self.tra_if = self.pg2
308         self.logger.info(self.vapi.ppcli("show int addr"))
309
310         p = self.ipv4_params
311         p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
312                    IPSEC_API_SAD_FLAG_UDP_ENCAP)
313         p.nat_header = UDP(sport=5454, dport=4500)
314
315         self.tra_spd = VppIpsecSpd(self, self.tra_spd_id)
316         self.tra_spd.add_vpp_config()
317         VppIpsecSpdItfBinding(self, self.tra_spd,
318                               self.tra_if).add_vpp_config()
319
320         self.config_esp_tra(p)
321         config_tra_params(p, self.encryption_type)
322
323         self.tun_spd = VppIpsecSpd(self, self.tun_spd_id)
324         self.tun_spd.add_vpp_config()
325         VppIpsecSpdItfBinding(self, self.tun_spd,
326                               self.tun_if).add_vpp_config()
327
328         self.config_esp_tun(p)
329         self.logger.info(self.vapi.ppcli("show ipsec all"))
330
331         d = DpoProto.DPO_PROTO_IP4
332         VppIpRoute(self,  p.remote_tun_if_host, p.addr_len,
333                    [VppRoutePath(self.tun_if.remote_addr[p.addr_type],
334                                  0xffffffff,
335                                  proto=d)]).add_vpp_config()
336
337     def tearDown(self):
338         super(TemplateIpsecEspUdp, self).tearDown()
339
340     def show_commands_at_teardown(self):
341         self.logger.info(self.vapi.cli("show hardware"))
342
343
344 class TestIpsecEspUdp(TemplateIpsecEspUdp, IpsecTra4Tests):
345     """ Ipsec NAT-T ESP UDP tests """
346     pass
347
348
349 class TestIpsecEspAll(ConfigIpsecESP,
350                       IpsecTra4, IpsecTra6,
351                       IpsecTun4, IpsecTun6):
352     """ Ipsec ESP all Algos """
353
354     def setUp(self):
355         super(TestIpsecEspAll, self).setUp()
356
357     def tearDown(self):
358         super(TestIpsecEspAll, self).tearDown()
359
360     def test_crypto_algs(self):
361         """All engines AES-[CBC, GCM]-[128, 192, 256] w/ & w/o ESN"""
362
363         # foreach VPP crypto engine
364         engines = ["ia32", "ipsecmb", "openssl"]
365
366         # foreach crypto algorithm
367         algos = [{'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
368                                  IPSEC_API_CRYPTO_ALG_AES_GCM_128),
369                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
370                                 IPSEC_API_INTEG_ALG_NONE),
371                   'scapy-crypto': "AES-GCM",
372                   'scapy-integ': "NULL",
373                   'key': "JPjyOWBeVEQiMe7h",
374                   'salt': struct.pack("!L", 0)},
375                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
376                                  IPSEC_API_CRYPTO_ALG_AES_GCM_256),
377                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
378                                 IPSEC_API_INTEG_ALG_NONE),
379                   'scapy-crypto': "AES-GCM",
380                   'scapy-integ': "NULL",
381                   'key': "JPjyOWBeVEQiMe7h0123456787654321",
382                   'salt': struct.pack("!L", 0)},
383                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
384                                  IPSEC_API_CRYPTO_ALG_AES_CBC_128),
385                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
386                                 IPSEC_API_INTEG_ALG_SHA1_96),
387                   'scapy-crypto': "AES-CBC",
388                   'scapy-integ': "HMAC-SHA1-96",
389                   'salt': '',
390                   'key': "JPjyOWBeVEQiMe7h"},
391                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
392                                  IPSEC_API_CRYPTO_ALG_AES_CBC_192),
393                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
394                                 IPSEC_API_INTEG_ALG_SHA1_96),
395                   'scapy-crypto': "AES-CBC",
396                   'scapy-integ': "HMAC-SHA1-96",
397                   'salt': '',
398                   'key': "JPjyOWBeVEQiMe7hJPjyOWBe"},
399                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
400                                  IPSEC_API_CRYPTO_ALG_AES_CBC_256),
401                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
402                                 IPSEC_API_INTEG_ALG_SHA1_96),
403                   'scapy-crypto': "AES-CBC",
404                   'scapy-integ': "HMAC-SHA1-96",
405                   'salt': '',
406                   'key': "JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"}]
407
408         # with and without ESN
409         flags = [0,
410                  VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_USE_ESN]
411
412         #
413         # loop through the VPP engines
414         #
415         for engine in engines:
416             self.vapi.cli("set crypto handler all %s" % engine)
417             #
418             # loop through each of the algorithms
419             #
420             for algo in algos:
421                 # with self.subTest(algo=algo['scapy']):
422                 for flag in flags:
423                     #
424                     # setup up the config paramters
425                     #
426                     self.ipv4_params = IPsecIPv4Params()
427                     self.ipv6_params = IPsecIPv6Params()
428
429                     self.params = {self.ipv4_params.addr_type:
430                                    self.ipv4_params,
431                                    self.ipv6_params.addr_type:
432                                    self.ipv6_params}
433
434                     for _, p in self.params.items():
435                         p.auth_algo_vpp_id = algo['vpp-integ']
436                         p.crypt_algo_vpp_id = algo['vpp-crypto']
437                         p.crypt_algo = algo['scapy-crypto']
438                         p.auth_algo = algo['scapy-integ']
439                         p.crypt_key = algo['key']
440                         p.crypt_salt = algo['salt']
441                         p.flags = p.flags | flag
442
443                     #
444                     # configure the SPDs. SAs, etc
445                     #
446                     self.config_network(self.params.values())
447
448                     #
449                     # run some traffic.
450                     #  An exhautsive 4o6, 6o4 is not necessary
451                     #  for each algo
452                     #
453                     self.verify_tra_basic6(count=17)
454                     self.verify_tra_basic4(count=17)
455                     self.verify_tun_66(self.params[socket.AF_INET6], 17)
456                     self.verify_tun_44(self.params[socket.AF_INET], 17)
457
458                     #
459                     # remove the SPDs, SAs, etc
460                     #
461                     self.unconfig_network()
462
463
464 if __name__ == '__main__':
465     unittest.main(testRunner=VppTestRunner)