Revert "ipsec: Use the new tunnel API types to add flow label and TTL copy"
[vpp.git] / test / test_ipsec_ah.py
1 import socket
2 import unittest
3
4 from scapy.layers.ipsec import AH
5 from scapy.layers.inet import IP, UDP
6 from scapy.layers.inet6 import IPv6
7 from scapy.layers.l2 import Ether
8 from scapy.packet import Raw
9
10 from framework import VppTestRunner
11 from template_ipsec import TemplateIpsec, IpsecTra46Tests, IpsecTun46Tests, \
12     config_tun_params, config_tra_params, IPsecIPv4Params, IPsecIPv6Params, \
13     IpsecTra4, IpsecTun4, IpsecTra6, IpsecTun6, \
14     IpsecTun6HandoffTests, IpsecTun4HandoffTests
15 from template_ipsec import IpsecTcpTests
16 from vpp_ipsec import VppIpsecSA, VppIpsecSpd, VppIpsecSpdEntry,\
17         VppIpsecSpdItfBinding
18 from vpp_ip_route import VppIpRoute, VppRoutePath
19 from vpp_ip import DpoProto
20 from vpp_papi import VppEnum
21
22
23 class ConfigIpsecAH(TemplateIpsec):
24     """
25     Basic test for IPSEC using AH transport and Tunnel mode
26
27     TRANSPORT MODE:
28
29      ---   encrypt   ---
30     |pg2| <-------> |VPP|
31      ---   decrypt   ---
32
33     TUNNEL MODE:
34
35      ---   encrypt   ---   plain   ---
36     |pg0| <-------  |VPP| <------ |pg1|
37      ---             ---           ---
38
39      ---   decrypt   ---   plain   ---
40     |pg0| ------->  |VPP| ------> |pg1|
41      ---             ---           ---
42     """
43     encryption_type = AH
44     net_objs = []
45     tra4_encrypt_node_name = "ah4-encrypt"
46     tra4_decrypt_node_name = "ah4-decrypt"
47     tra6_encrypt_node_name = "ah6-encrypt"
48     tra6_decrypt_node_name = "ah6-decrypt"
49     tun4_encrypt_node_name = "ah4-encrypt"
50     tun4_decrypt_node_name = "ah4-decrypt"
51     tun6_encrypt_node_name = "ah6-encrypt"
52     tun6_decrypt_node_name = "ah6-decrypt"
53
54     @classmethod
55     def setUpClass(cls):
56         super(ConfigIpsecAH, cls).setUpClass()
57
58     @classmethod
59     def tearDownClass(cls):
60         super(ConfigIpsecAH, cls).tearDownClass()
61
62     def setUp(self):
63         super(ConfigIpsecAH, self).setUp()
64
65     def tearDown(self):
66         super(ConfigIpsecAH, self).tearDown()
67
68     def config_network(self, params):
69         self.net_objs = []
70         self.tun_if = self.pg0
71         self.tra_if = self.pg2
72         self.logger.info(self.vapi.ppcli("show int addr"))
73
74         self.tra_spd = VppIpsecSpd(self, self.tra_spd_id)
75         self.tra_spd.add_vpp_config()
76         self.net_objs.append(self.tra_spd)
77         self.tun_spd = VppIpsecSpd(self, self.tun_spd_id)
78         self.tun_spd.add_vpp_config()
79         self.net_objs.append(self.tun_spd)
80
81         b = VppIpsecSpdItfBinding(self, self.tra_spd,
82                                   self.tra_if)
83         b.add_vpp_config()
84         self.net_objs.append(b)
85
86         b = VppIpsecSpdItfBinding(self, self.tun_spd,
87                                   self.tun_if)
88         b.add_vpp_config()
89         self.net_objs.append(b)
90
91         for p in params:
92             self.config_ah_tra(p)
93             config_tra_params(p, self.encryption_type)
94         for p in params:
95             self.config_ah_tun(p)
96             config_tun_params(p, self.encryption_type, self.tun_if)
97         for p in params:
98             d = DpoProto.DPO_PROTO_IP6 if p.is_ipv6 else DpoProto.DPO_PROTO_IP4
99             r = VppIpRoute(self,  p.remote_tun_if_host, p.addr_len,
100                            [VppRoutePath(self.tun_if.remote_addr[p.addr_type],
101                                          0xffffffff,
102                                          proto=d)])
103             r.add_vpp_config()
104             self.net_objs.append(r)
105         self.logger.info(self.vapi.ppcli("show ipsec all"))
106
107     def unconfig_network(self):
108         for o in reversed(self.net_objs):
109             o.remove_vpp_config()
110         self.net_objs = []
111
112     def config_ah_tun(self, params):
113         addr_type = params.addr_type
114         scapy_tun_sa_id = params.scapy_tun_sa_id
115         scapy_tun_spi = params.scapy_tun_spi
116         vpp_tun_sa_id = params.vpp_tun_sa_id
117         vpp_tun_spi = params.vpp_tun_spi
118         auth_algo_vpp_id = params.auth_algo_vpp_id
119         auth_key = params.auth_key
120         crypt_algo_vpp_id = params.crypt_algo_vpp_id
121         crypt_key = params.crypt_key
122         remote_tun_if_host = params.remote_tun_if_host
123         addr_any = params.addr_any
124         addr_bcast = params.addr_bcast
125         flags = params.flags
126         tun_flags = params.tun_flags
127         e = VppEnum.vl_api_ipsec_spd_action_t
128         objs = []
129
130         params.tun_sa_in = VppIpsecSA(self, scapy_tun_sa_id, scapy_tun_spi,
131                                       auth_algo_vpp_id, auth_key,
132                                       crypt_algo_vpp_id, crypt_key,
133                                       self.vpp_ah_protocol,
134                                       self.tun_if.local_addr[addr_type],
135                                       self.tun_if.remote_addr[addr_type],
136                                       tun_flags=tun_flags,
137                                       flags=flags,
138                                       dscp=params.dscp)
139
140         params.tun_sa_out = VppIpsecSA(self, vpp_tun_sa_id, vpp_tun_spi,
141                                        auth_algo_vpp_id, auth_key,
142                                        crypt_algo_vpp_id, crypt_key,
143                                        self.vpp_ah_protocol,
144                                        self.tun_if.remote_addr[addr_type],
145                                        self.tun_if.local_addr[addr_type],
146                                        tun_flags=tun_flags,
147                                        flags=flags,
148                                        dscp=params.dscp)
149
150         objs.append(params.tun_sa_in)
151         objs.append(params.tun_sa_out)
152
153         params.spd_policy_in_any = VppIpsecSpdEntry(self, self.tun_spd,
154                                                     vpp_tun_sa_id,
155                                                     addr_any, addr_bcast,
156                                                     addr_any, addr_bcast,
157                                                     socket.IPPROTO_AH)
158         params.spd_policy_out_any = VppIpsecSpdEntry(self, self.tun_spd,
159                                                      vpp_tun_sa_id,
160                                                      addr_any, addr_bcast,
161                                                      addr_any, addr_bcast,
162                                                      socket.IPPROTO_AH,
163                                                      is_outbound=0)
164
165         objs.append(params.spd_policy_out_any)
166         objs.append(params.spd_policy_in_any)
167
168         e1 = VppIpsecSpdEntry(self, self.tun_spd, vpp_tun_sa_id,
169                               remote_tun_if_host,
170                               remote_tun_if_host,
171                               self.pg1.remote_addr[addr_type],
172                               self.pg1.remote_addr[addr_type],
173                               0, priority=10,
174                               policy=e.IPSEC_API_SPD_ACTION_PROTECT,
175                               is_outbound=0)
176         e2 = VppIpsecSpdEntry(self, self.tun_spd, scapy_tun_sa_id,
177                               self.pg1.remote_addr[addr_type],
178                               self.pg1.remote_addr[addr_type],
179                               remote_tun_if_host,
180                               remote_tun_if_host,
181                               0, policy=e.IPSEC_API_SPD_ACTION_PROTECT,
182                               priority=10)
183         e3 = VppIpsecSpdEntry(self, self.tun_spd, vpp_tun_sa_id,
184                               remote_tun_if_host,
185                               remote_tun_if_host,
186                               self.pg0.local_addr[addr_type],
187                               self.pg0.local_addr[addr_type],
188                               0, priority=20,
189                               policy=e.IPSEC_API_SPD_ACTION_PROTECT,
190                               is_outbound=0)
191         e4 = VppIpsecSpdEntry(self, self.tun_spd, scapy_tun_sa_id,
192                               self.pg0.local_addr[addr_type],
193                               self.pg0.local_addr[addr_type],
194                               remote_tun_if_host,
195                               remote_tun_if_host,
196                               0, policy=e.IPSEC_API_SPD_ACTION_PROTECT,
197                               priority=20)
198
199         objs = objs + [e1, e2, e3, e4]
200
201         for o in objs:
202             o.add_vpp_config()
203
204         self.net_objs = self.net_objs + objs
205
206     def config_ah_tra(self, params):
207         addr_type = params.addr_type
208         scapy_tra_sa_id = params.scapy_tra_sa_id
209         scapy_tra_spi = params.scapy_tra_spi
210         vpp_tra_sa_id = params.vpp_tra_sa_id
211         vpp_tra_spi = params.vpp_tra_spi
212         auth_algo_vpp_id = params.auth_algo_vpp_id
213         auth_key = params.auth_key
214         crypt_algo_vpp_id = params.crypt_algo_vpp_id
215         crypt_key = params.crypt_key
216         addr_any = params.addr_any
217         addr_bcast = params.addr_bcast
218         flags = params.flags | (VppEnum.vl_api_ipsec_sad_flags_t.
219                                 IPSEC_API_SAD_FLAG_USE_ANTI_REPLAY)
220         e = VppEnum.vl_api_ipsec_spd_action_t
221         objs = []
222
223         params.tra_sa_in = VppIpsecSA(self, scapy_tra_sa_id, scapy_tra_spi,
224                                       auth_algo_vpp_id, auth_key,
225                                       crypt_algo_vpp_id, crypt_key,
226                                       self.vpp_ah_protocol,
227                                       flags=flags)
228         params.tra_sa_out = VppIpsecSA(self, vpp_tra_sa_id, vpp_tra_spi,
229                                        auth_algo_vpp_id, auth_key,
230                                        crypt_algo_vpp_id, crypt_key,
231                                        self.vpp_ah_protocol,
232                                        flags=flags)
233
234         objs.append(params.tra_sa_in)
235         objs.append(params.tra_sa_out)
236
237         objs.append(VppIpsecSpdEntry(self, self.tra_spd, vpp_tra_sa_id,
238                                      addr_any, addr_bcast,
239                                      addr_any, addr_bcast,
240                                      socket.IPPROTO_AH))
241         objs.append(VppIpsecSpdEntry(self, self.tra_spd, scapy_tra_sa_id,
242                                      addr_any, addr_bcast,
243                                      addr_any, addr_bcast,
244                                      socket.IPPROTO_AH,
245                                      is_outbound=0))
246         objs.append(VppIpsecSpdEntry(self, self.tra_spd, vpp_tra_sa_id,
247                                      self.tra_if.local_addr[addr_type],
248                                      self.tra_if.local_addr[addr_type],
249                                      self.tra_if.remote_addr[addr_type],
250                                      self.tra_if.remote_addr[addr_type],
251                                      0, priority=10,
252                                      policy=e.IPSEC_API_SPD_ACTION_PROTECT,
253                                      is_outbound=0))
254         objs.append(VppIpsecSpdEntry(self, self.tra_spd, scapy_tra_sa_id,
255                                      self.tra_if.local_addr[addr_type],
256                                      self.tra_if.local_addr[addr_type],
257                                      self.tra_if.remote_addr[addr_type],
258                                      self.tra_if.remote_addr[addr_type],
259                                      0, policy=e.IPSEC_API_SPD_ACTION_PROTECT,
260                                      priority=10))
261
262         for o in objs:
263             o.add_vpp_config()
264         self.net_objs = self.net_objs + objs
265
266
267 class TemplateIpsecAh(ConfigIpsecAH):
268     """
269     Basic test for IPSEC using AH transport and Tunnel mode
270
271     TRANSPORT MODE:
272
273      ---   encrypt   ---
274     |pg2| <-------> |VPP|
275      ---   decrypt   ---
276
277     TUNNEL MODE:
278
279      ---   encrypt   ---   plain   ---
280     |pg0| <-------  |VPP| <------ |pg1|
281      ---             ---           ---
282
283      ---   decrypt   ---   plain   ---
284     |pg0| ------->  |VPP| ------> |pg1|
285      ---             ---           ---
286     """
287     @classmethod
288     def setUpClass(cls):
289         super(TemplateIpsecAh, cls).setUpClass()
290
291     @classmethod
292     def tearDownClass(cls):
293         super(TemplateIpsecAh, cls).tearDownClass()
294
295     def setUp(self):
296         super(TemplateIpsecAh, self).setUp()
297         self.config_network(self.params.values())
298
299     def tearDown(self):
300         self.unconfig_network()
301         super(TemplateIpsecAh, self).tearDown()
302
303
304 class TestIpsecAh1(TemplateIpsecAh, IpsecTcpTests):
305     """ Ipsec AH - TCP tests """
306     pass
307
308
309 class TestIpsecAh2(TemplateIpsecAh, IpsecTra46Tests, IpsecTun46Tests):
310     """ Ipsec AH w/ SHA1 """
311     pass
312
313
314 class TestIpsecAhTun(TemplateIpsecAh, IpsecTun46Tests):
315     """ Ipsec AH - TUN encap tests """
316
317     def setUp(self):
318         self.ipv4_params = IPsecIPv4Params()
319         self.ipv6_params = IPsecIPv6Params()
320
321         c = (VppEnum.vl_api_tunnel_encap_decap_flags_t.
322              TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_DSCP)
323         c1 = c | (VppEnum.vl_api_tunnel_encap_decap_flags_t.
324                   TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_ECN)
325
326         self.ipv4_params.tun_flags = c
327         self.ipv6_params.tun_flags = c1
328
329         super(TestIpsecAhTun, self).setUp()
330
331     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=54):
332         # set the DSCP + ECN - flags are set to copy only DSCP
333         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
334                 IP(src=src, dst=dst, tos=5) /
335                 UDP(sport=4444, dport=4444) /
336                 Raw(b'X' * payload_size)
337                 for i in range(count)]
338
339     def gen_pkts6(self, sw_intf, src, dst, count=1, payload_size=54):
340         # set the DSCP + ECN - flags are set to copy both
341         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
342                 IPv6(src=src, dst=dst, tc=5) /
343                 UDP(sport=4444, dport=4444) /
344                 Raw(b'X' * payload_size)
345                 for i in range(count)]
346
347     def verify_encrypted(self, p, sa, rxs):
348         # just check that only the DSCP is copied
349         for rx in rxs:
350             self.assertEqual(rx[IP].tos, 4)
351
352     def verify_encrypted6(self, p, sa, rxs):
353         # just check that the DSCP & ECN are copied
354         for rx in rxs:
355             self.assertEqual(rx[IPv6].tc, 5)
356
357
358 class TestIpsecAhTun2(TemplateIpsecAh, IpsecTun46Tests):
359     """ Ipsec AH - TUN encap tests """
360
361     def setUp(self):
362         self.ipv4_params = IPsecIPv4Params()
363         self.ipv6_params = IPsecIPv6Params()
364
365         self.ipv4_params.dscp = 3
366         self.ipv6_params.dscp = 4
367
368         super(TestIpsecAhTun2, self).setUp()
369
370     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=54):
371         # set the DSCP + ECN - flags are set to copy only DSCP
372         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
373                 IP(src=src, dst=dst, tos=0) /
374                 UDP(sport=4444, dport=4444) /
375                 Raw(b'X' * payload_size)
376                 for i in range(count)]
377
378     def gen_pkts6(self, sw_intf, src, dst, count=1, payload_size=54):
379         # set the DSCP + ECN - flags are set to copy both
380         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
381                 IPv6(src=src, dst=dst, tc=0) /
382                 UDP(sport=4444, dport=4444) /
383                 Raw(b'X' * payload_size)
384                 for i in range(count)]
385
386     def verify_encrypted(self, p, sa, rxs):
387         # just check that only the DSCP is copied
388         for rx in rxs:
389             self.assertEqual(rx[IP].tos, 0xc)
390
391     def verify_encrypted6(self, p, sa, rxs):
392         # just check that the DSCP & ECN are copied
393         for rx in rxs:
394             self.assertEqual(rx[IPv6].tc, 0x10)
395
396
397 class TestIpsecAhHandoff(TemplateIpsecAh,
398                          IpsecTun6HandoffTests,
399                          IpsecTun4HandoffTests):
400     """ Ipsec AH Handoff """
401     pass
402
403
404 class TestIpsecAhAll(ConfigIpsecAH,
405                      IpsecTra4, IpsecTra6,
406                      IpsecTun4, IpsecTun6):
407     """ Ipsec AH all Algos """
408
409     def setUp(self):
410         super(TestIpsecAhAll, self).setUp()
411
412     def tearDown(self):
413         super(TestIpsecAhAll, self).tearDown()
414
415     def test_integ_algs(self):
416         """All Engines SHA[1_96, 256, 384, 512] w/ & w/o ESN"""
417         # foreach VPP crypto engine
418         engines = ["ia32", "ipsecmb", "openssl"]
419
420         algos = [{'vpp': VppEnum.vl_api_ipsec_integ_alg_t.
421                   IPSEC_API_INTEG_ALG_SHA1_96,
422                   'scapy': "HMAC-SHA1-96"},
423                  {'vpp': VppEnum.vl_api_ipsec_integ_alg_t.
424                   IPSEC_API_INTEG_ALG_SHA_256_128,
425                   'scapy': "SHA2-256-128"},
426                  {'vpp': VppEnum.vl_api_ipsec_integ_alg_t.
427                   IPSEC_API_INTEG_ALG_SHA_384_192,
428                   'scapy': "SHA2-384-192"},
429                  {'vpp': VppEnum.vl_api_ipsec_integ_alg_t.
430                   IPSEC_API_INTEG_ALG_SHA_512_256,
431                   'scapy': "SHA2-512-256"}]
432
433         flags = [0, (VppEnum.vl_api_ipsec_sad_flags_t.
434                      IPSEC_API_SAD_FLAG_USE_ESN)]
435
436         #
437         # loop through the VPP engines
438         #
439         for engine in engines:
440             self.vapi.cli("set crypto handler all %s" % engine)
441             #
442             # loop through each of the algorithms
443             #
444             for algo in algos:
445                 # with self.subTest(algo=algo['scapy']):
446                 for flag in flags:
447                     #
448                     # setup up the config paramters
449                     #
450                     self.ipv4_params = IPsecIPv4Params()
451                     self.ipv6_params = IPsecIPv6Params()
452
453                     self.params = {self.ipv4_params.addr_type:
454                                    self.ipv4_params,
455                                    self.ipv6_params.addr_type:
456                                    self.ipv6_params}
457
458                     for _, p in self.params.items():
459                         p.auth_algo_vpp_id = algo['vpp']
460                         p.auth_algo = algo['scapy']
461                         p.flags = p.flags | flag
462
463                     #
464                     # configure the SPDs. SAs, etc
465                     #
466                     self.config_network(self.params.values())
467
468                     #
469                     # run some traffic.
470                     #  An exhautsive 4o6, 6o4 is not necessary for each algo
471                     #
472                     self.verify_tra_basic6(count=17)
473                     self.verify_tra_basic4(count=17)
474                     self.verify_tun_66(self.params[socket.AF_INET6], count=17)
475                     self.verify_tun_44(self.params[socket.AF_INET], count=17)
476
477                     #
478                     # remove the SPDs, SAs, etc
479                     #
480                     self.unconfig_network()
481
482
483 if __name__ == '__main__':
484     unittest.main(testRunner=VppTestRunner)