ipsec: Use the new tunnel API types to add flow label and TTL copy
[vpp.git] / test / test_ipsec_ah.py
1 import socket
2 import unittest
3
4 from scapy.layers.ipsec import AH
5 from scapy.layers.inet import IP, UDP
6 from scapy.layers.inet6 import IPv6
7 from scapy.layers.l2 import Ether
8 from scapy.packet import Raw
9
10 from framework import VppTestRunner
11 from template_ipsec import TemplateIpsec, IpsecTra46Tests, IpsecTun46Tests, \
12     config_tun_params, config_tra_params, IPsecIPv4Params, IPsecIPv6Params, \
13     IpsecTra4, IpsecTun4, IpsecTra6, IpsecTun6, \
14     IpsecTun6HandoffTests, IpsecTun4HandoffTests
15 from template_ipsec import IpsecTcpTests
16 from vpp_ipsec import VppIpsecSA, VppIpsecSpd, VppIpsecSpdEntry,\
17         VppIpsecSpdItfBinding
18 from vpp_ip_route import VppIpRoute, VppRoutePath
19 from vpp_ip import DpoProto
20 from vpp_papi import VppEnum
21
22
23 class ConfigIpsecAH(TemplateIpsec):
24     """
25     Basic test for IPSEC using AH transport and Tunnel mode
26
27     TRANSPORT MODE:
28
29      ---   encrypt   ---
30     |pg2| <-------> |VPP|
31      ---   decrypt   ---
32
33     TUNNEL MODE:
34
35      ---   encrypt   ---   plain   ---
36     |pg0| <-------  |VPP| <------ |pg1|
37      ---             ---           ---
38
39      ---   decrypt   ---   plain   ---
40     |pg0| ------->  |VPP| ------> |pg1|
41      ---             ---           ---
42     """
43     encryption_type = AH
44     net_objs = []
45     tra4_encrypt_node_name = "ah4-encrypt"
46     tra4_decrypt_node_name = "ah4-decrypt"
47     tra6_encrypt_node_name = "ah6-encrypt"
48     tra6_decrypt_node_name = "ah6-decrypt"
49     tun4_encrypt_node_name = "ah4-encrypt"
50     tun4_decrypt_node_name = "ah4-decrypt"
51     tun6_encrypt_node_name = "ah6-encrypt"
52     tun6_decrypt_node_name = "ah6-decrypt"
53
54     @classmethod
55     def setUpClass(cls):
56         super(ConfigIpsecAH, cls).setUpClass()
57
58     @classmethod
59     def tearDownClass(cls):
60         super(ConfigIpsecAH, cls).tearDownClass()
61
62     def setUp(self):
63         super(ConfigIpsecAH, self).setUp()
64
65     def tearDown(self):
66         super(ConfigIpsecAH, self).tearDown()
67
68     def config_network(self, params):
69         self.net_objs = []
70         self.tun_if = self.pg0
71         self.tra_if = self.pg2
72         self.logger.info(self.vapi.ppcli("show int addr"))
73
74         self.tra_spd = VppIpsecSpd(self, self.tra_spd_id)
75         self.tra_spd.add_vpp_config()
76         self.net_objs.append(self.tra_spd)
77         self.tun_spd = VppIpsecSpd(self, self.tun_spd_id)
78         self.tun_spd.add_vpp_config()
79         self.net_objs.append(self.tun_spd)
80
81         b = VppIpsecSpdItfBinding(self, self.tra_spd,
82                                   self.tra_if)
83         b.add_vpp_config()
84         self.net_objs.append(b)
85
86         b = VppIpsecSpdItfBinding(self, self.tun_spd,
87                                   self.tun_if)
88         b.add_vpp_config()
89         self.net_objs.append(b)
90
91         for p in params:
92             self.config_ah_tra(p)
93             config_tra_params(p, self.encryption_type)
94         for p in params:
95             self.config_ah_tun(p)
96             config_tun_params(p, self.encryption_type, self.tun_if)
97         for p in params:
98             d = DpoProto.DPO_PROTO_IP6 if p.is_ipv6 else DpoProto.DPO_PROTO_IP4
99             r = VppIpRoute(self,  p.remote_tun_if_host, p.addr_len,
100                            [VppRoutePath(self.tun_if.remote_addr[p.addr_type],
101                                          0xffffffff,
102                                          proto=d)])
103             r.add_vpp_config()
104             self.net_objs.append(r)
105         self.logger.info(self.vapi.ppcli("show ipsec all"))
106
107     def unconfig_network(self):
108         for o in reversed(self.net_objs):
109             o.remove_vpp_config()
110         self.net_objs = []
111
112     def config_ah_tun(self, params):
113         addr_type = params.addr_type
114         scapy_tun_sa_id = params.scapy_tun_sa_id
115         scapy_tun_spi = params.scapy_tun_spi
116         vpp_tun_sa_id = params.vpp_tun_sa_id
117         vpp_tun_spi = params.vpp_tun_spi
118         auth_algo_vpp_id = params.auth_algo_vpp_id
119         auth_key = params.auth_key
120         crypt_algo_vpp_id = params.crypt_algo_vpp_id
121         crypt_key = params.crypt_key
122         remote_tun_if_host = params.remote_tun_if_host
123         addr_any = params.addr_any
124         addr_bcast = params.addr_bcast
125         flags = params.flags
126         tun_flags = params.tun_flags
127         e = VppEnum.vl_api_ipsec_spd_action_t
128         objs = []
129         params.outer_hop_limit = 253
130         params.outer_flow_label = 0x12345
131
132         params.tun_sa_in = VppIpsecSA(self, scapy_tun_sa_id, scapy_tun_spi,
133                                       auth_algo_vpp_id, auth_key,
134                                       crypt_algo_vpp_id, crypt_key,
135                                       self.vpp_ah_protocol,
136                                       self.tun_if.local_addr[addr_type],
137                                       self.tun_if.remote_addr[addr_type],
138                                       tun_flags=tun_flags,
139                                       flags=flags,
140                                       dscp=params.dscp)
141
142         params.tun_sa_out = VppIpsecSA(self, vpp_tun_sa_id, vpp_tun_spi,
143                                        auth_algo_vpp_id, auth_key,
144                                        crypt_algo_vpp_id, crypt_key,
145                                        self.vpp_ah_protocol,
146                                        self.tun_if.remote_addr[addr_type],
147                                        self.tun_if.local_addr[addr_type],
148                                        tun_flags=tun_flags,
149                                        flags=flags,
150                                        dscp=params.dscp)
151
152         objs.append(params.tun_sa_in)
153         objs.append(params.tun_sa_out)
154
155         params.spd_policy_in_any = VppIpsecSpdEntry(self, self.tun_spd,
156                                                     vpp_tun_sa_id,
157                                                     addr_any, addr_bcast,
158                                                     addr_any, addr_bcast,
159                                                     socket.IPPROTO_AH)
160         params.spd_policy_out_any = VppIpsecSpdEntry(self, self.tun_spd,
161                                                      vpp_tun_sa_id,
162                                                      addr_any, addr_bcast,
163                                                      addr_any, addr_bcast,
164                                                      socket.IPPROTO_AH,
165                                                      is_outbound=0)
166
167         objs.append(params.spd_policy_out_any)
168         objs.append(params.spd_policy_in_any)
169
170         e1 = VppIpsecSpdEntry(self, self.tun_spd, vpp_tun_sa_id,
171                               remote_tun_if_host,
172                               remote_tun_if_host,
173                               self.pg1.remote_addr[addr_type],
174                               self.pg1.remote_addr[addr_type],
175                               0, priority=10,
176                               policy=e.IPSEC_API_SPD_ACTION_PROTECT,
177                               is_outbound=0)
178         e2 = VppIpsecSpdEntry(self, self.tun_spd, scapy_tun_sa_id,
179                               self.pg1.remote_addr[addr_type],
180                               self.pg1.remote_addr[addr_type],
181                               remote_tun_if_host,
182                               remote_tun_if_host,
183                               0, policy=e.IPSEC_API_SPD_ACTION_PROTECT,
184                               priority=10)
185         e3 = VppIpsecSpdEntry(self, self.tun_spd, vpp_tun_sa_id,
186                               remote_tun_if_host,
187                               remote_tun_if_host,
188                               self.pg0.local_addr[addr_type],
189                               self.pg0.local_addr[addr_type],
190                               0, priority=20,
191                               policy=e.IPSEC_API_SPD_ACTION_PROTECT,
192                               is_outbound=0)
193         e4 = VppIpsecSpdEntry(self, self.tun_spd, scapy_tun_sa_id,
194                               self.pg0.local_addr[addr_type],
195                               self.pg0.local_addr[addr_type],
196                               remote_tun_if_host,
197                               remote_tun_if_host,
198                               0, policy=e.IPSEC_API_SPD_ACTION_PROTECT,
199                               priority=20)
200
201         objs = objs + [e1, e2, e3, e4]
202
203         for o in objs:
204             o.add_vpp_config()
205
206         self.net_objs = self.net_objs + objs
207
208     def config_ah_tra(self, params):
209         addr_type = params.addr_type
210         scapy_tra_sa_id = params.scapy_tra_sa_id
211         scapy_tra_spi = params.scapy_tra_spi
212         vpp_tra_sa_id = params.vpp_tra_sa_id
213         vpp_tra_spi = params.vpp_tra_spi
214         auth_algo_vpp_id = params.auth_algo_vpp_id
215         auth_key = params.auth_key
216         crypt_algo_vpp_id = params.crypt_algo_vpp_id
217         crypt_key = params.crypt_key
218         addr_any = params.addr_any
219         addr_bcast = params.addr_bcast
220         flags = params.flags | (VppEnum.vl_api_ipsec_sad_flags_t.
221                                 IPSEC_API_SAD_FLAG_USE_ANTI_REPLAY)
222         e = VppEnum.vl_api_ipsec_spd_action_t
223         objs = []
224
225         params.tra_sa_in = VppIpsecSA(self, scapy_tra_sa_id, scapy_tra_spi,
226                                       auth_algo_vpp_id, auth_key,
227                                       crypt_algo_vpp_id, crypt_key,
228                                       self.vpp_ah_protocol,
229                                       flags=flags)
230         params.tra_sa_out = VppIpsecSA(self, vpp_tra_sa_id, vpp_tra_spi,
231                                        auth_algo_vpp_id, auth_key,
232                                        crypt_algo_vpp_id, crypt_key,
233                                        self.vpp_ah_protocol,
234                                        flags=flags)
235
236         objs.append(params.tra_sa_in)
237         objs.append(params.tra_sa_out)
238
239         objs.append(VppIpsecSpdEntry(self, self.tra_spd, vpp_tra_sa_id,
240                                      addr_any, addr_bcast,
241                                      addr_any, addr_bcast,
242                                      socket.IPPROTO_AH))
243         objs.append(VppIpsecSpdEntry(self, self.tra_spd, scapy_tra_sa_id,
244                                      addr_any, addr_bcast,
245                                      addr_any, addr_bcast,
246                                      socket.IPPROTO_AH,
247                                      is_outbound=0))
248         objs.append(VppIpsecSpdEntry(self, self.tra_spd, vpp_tra_sa_id,
249                                      self.tra_if.local_addr[addr_type],
250                                      self.tra_if.local_addr[addr_type],
251                                      self.tra_if.remote_addr[addr_type],
252                                      self.tra_if.remote_addr[addr_type],
253                                      0, priority=10,
254                                      policy=e.IPSEC_API_SPD_ACTION_PROTECT,
255                                      is_outbound=0))
256         objs.append(VppIpsecSpdEntry(self, self.tra_spd, scapy_tra_sa_id,
257                                      self.tra_if.local_addr[addr_type],
258                                      self.tra_if.local_addr[addr_type],
259                                      self.tra_if.remote_addr[addr_type],
260                                      self.tra_if.remote_addr[addr_type],
261                                      0, policy=e.IPSEC_API_SPD_ACTION_PROTECT,
262                                      priority=10))
263
264         for o in objs:
265             o.add_vpp_config()
266         self.net_objs = self.net_objs + objs
267
268
269 class TemplateIpsecAh(ConfigIpsecAH):
270     """
271     Basic test for IPSEC using AH transport and Tunnel mode
272
273     TRANSPORT MODE:
274
275      ---   encrypt   ---
276     |pg2| <-------> |VPP|
277      ---   decrypt   ---
278
279     TUNNEL MODE:
280
281      ---   encrypt   ---   plain   ---
282     |pg0| <-------  |VPP| <------ |pg1|
283      ---             ---           ---
284
285      ---   decrypt   ---   plain   ---
286     |pg0| ------->  |VPP| ------> |pg1|
287      ---             ---           ---
288     """
289     @classmethod
290     def setUpClass(cls):
291         super(TemplateIpsecAh, cls).setUpClass()
292
293     @classmethod
294     def tearDownClass(cls):
295         super(TemplateIpsecAh, cls).tearDownClass()
296
297     def setUp(self):
298         super(TemplateIpsecAh, self).setUp()
299         self.config_network(self.params.values())
300
301     def tearDown(self):
302         self.unconfig_network()
303         super(TemplateIpsecAh, self).tearDown()
304
305
306 class TestIpsecAh1(TemplateIpsecAh, IpsecTcpTests):
307     """ Ipsec AH - TCP tests """
308     pass
309
310
311 class TestIpsecAh2(TemplateIpsecAh, IpsecTra46Tests, IpsecTun46Tests):
312     """ Ipsec AH w/ SHA1 """
313     pass
314
315
316 class TestIpsecAhTun(TemplateIpsecAh, IpsecTun46Tests):
317     """ Ipsec AH - TUN encap tests """
318
319     def setUp(self):
320         self.ipv4_params = IPsecIPv4Params()
321         self.ipv6_params = IPsecIPv6Params()
322
323         c = (VppEnum.vl_api_tunnel_encap_decap_flags_t.
324              TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_DSCP)
325         c1 = c | (VppEnum.vl_api_tunnel_encap_decap_flags_t.
326                   TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_ECN)
327
328         self.ipv4_params.tun_flags = c
329         self.ipv6_params.tun_flags = c1
330
331         super(TestIpsecAhTun, self).setUp()
332
333     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=54):
334         # set the DSCP + ECN - flags are set to copy only DSCP
335         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
336                 IP(src=src, dst=dst, tos=5) /
337                 UDP(sport=4444, dport=4444) /
338                 Raw(b'X' * payload_size)
339                 for i in range(count)]
340
341     def gen_pkts6(self, p, sw_intf, src, dst, count=1, payload_size=54):
342         # set the DSCP + ECN - flags are set to copy both
343         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
344                 IPv6(src=src, dst=dst, tc=5) /
345                 UDP(sport=4444, dport=4444) /
346                 Raw(b'X' * payload_size)
347                 for i in range(count)]
348
349     def verify_encrypted(self, p, sa, rxs):
350         # just check that only the DSCP is copied
351         for rx in rxs:
352             self.assertEqual(rx[IP].tos, 4)
353
354     def verify_encrypted6(self, p, sa, rxs):
355         # just check that the DSCP & ECN are copied
356         for rx in rxs:
357             self.assertEqual(rx[IPv6].tc, 5)
358
359
360 class TestIpsecAhTun2(TemplateIpsecAh, IpsecTun46Tests):
361     """ Ipsec AH - TUN encap tests """
362
363     def setUp(self):
364         self.ipv4_params = IPsecIPv4Params()
365         self.ipv6_params = IPsecIPv6Params()
366
367         self.ipv4_params.dscp = 3
368         self.ipv6_params.dscp = 4
369
370         super(TestIpsecAhTun2, self).setUp()
371
372     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=54):
373         # set the DSCP + ECN - flags are set to copy only DSCP
374         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
375                 IP(src=src, dst=dst, tos=0) /
376                 UDP(sport=4444, dport=4444) /
377                 Raw(b'X' * payload_size)
378                 for i in range(count)]
379
380     def gen_pkts6(self, p, sw_intf, src, dst, count=1, payload_size=54):
381         # set the DSCP + ECN - flags are set to copy both
382         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
383                 IPv6(src=src, dst=dst, tc=0) /
384                 UDP(sport=4444, dport=4444) /
385                 Raw(b'X' * payload_size)
386                 for i in range(count)]
387
388     def verify_encrypted(self, p, sa, rxs):
389         # just check that only the DSCP is copied
390         for rx in rxs:
391             self.assertEqual(rx[IP].tos, 0xc)
392
393     def verify_encrypted6(self, p, sa, rxs):
394         # just check that the DSCP & ECN are copied
395         for rx in rxs:
396             self.assertEqual(rx[IPv6].tc, 0x10)
397
398
399 class TestIpsecAhHandoff(TemplateIpsecAh,
400                          IpsecTun6HandoffTests,
401                          IpsecTun4HandoffTests):
402     """ Ipsec AH Handoff """
403     pass
404
405
406 class TestIpsecAhAll(ConfigIpsecAH,
407                      IpsecTra4, IpsecTra6,
408                      IpsecTun4, IpsecTun6):
409     """ Ipsec AH all Algos """
410
411     def setUp(self):
412         super(TestIpsecAhAll, self).setUp()
413
414     def tearDown(self):
415         super(TestIpsecAhAll, self).tearDown()
416
417     def test_integ_algs(self):
418         """All Engines SHA[1_96, 256, 384, 512] w/ & w/o ESN"""
419         # foreach VPP crypto engine
420         engines = ["ia32", "ipsecmb", "openssl"]
421
422         algos = [{'vpp': VppEnum.vl_api_ipsec_integ_alg_t.
423                   IPSEC_API_INTEG_ALG_SHA1_96,
424                   'scapy': "HMAC-SHA1-96"},
425                  {'vpp': VppEnum.vl_api_ipsec_integ_alg_t.
426                   IPSEC_API_INTEG_ALG_SHA_256_128,
427                   'scapy': "SHA2-256-128"},
428                  {'vpp': VppEnum.vl_api_ipsec_integ_alg_t.
429                   IPSEC_API_INTEG_ALG_SHA_384_192,
430                   'scapy': "SHA2-384-192"},
431                  {'vpp': VppEnum.vl_api_ipsec_integ_alg_t.
432                   IPSEC_API_INTEG_ALG_SHA_512_256,
433                   'scapy': "SHA2-512-256"}]
434
435         flags = [0, (VppEnum.vl_api_ipsec_sad_flags_t.
436                      IPSEC_API_SAD_FLAG_USE_ESN)]
437
438         #
439         # loop through the VPP engines
440         #
441         for engine in engines:
442             self.vapi.cli("set crypto handler all %s" % engine)
443             #
444             # loop through each of the algorithms
445             #
446             for algo in algos:
447                 # with self.subTest(algo=algo['scapy']):
448                 for flag in flags:
449                     #
450                     # setup up the config paramters
451                     #
452                     self.ipv4_params = IPsecIPv4Params()
453                     self.ipv6_params = IPsecIPv6Params()
454
455                     self.params = {self.ipv4_params.addr_type:
456                                    self.ipv4_params,
457                                    self.ipv6_params.addr_type:
458                                    self.ipv6_params}
459
460                     for _, p in self.params.items():
461                         p.auth_algo_vpp_id = algo['vpp']
462                         p.auth_algo = algo['scapy']
463                         p.flags = p.flags | flag
464
465                     #
466                     # configure the SPDs. SAs, etc
467                     #
468                     self.config_network(self.params.values())
469
470                     #
471                     # run some traffic.
472                     #  An exhautsive 4o6, 6o4 is not necessary for each algo
473                     #
474                     self.verify_tra_basic6(count=17)
475                     self.verify_tra_basic4(count=17)
476                     self.verify_tun_66(self.params[socket.AF_INET6], count=17)
477                     self.verify_tun_44(self.params[socket.AF_INET], count=17)
478
479                     #
480                     # remove the SPDs, SAs, etc
481                     #
482                     self.unconfig_network()
483
484
485 if __name__ == '__main__':
486     unittest.main(testRunner=VppTestRunner)