http: fix client receiving large data
[vpp.git] / test / test_ipsec_ah.py
1 import socket
2 import unittest
3
4 from scapy.layers.ipsec import AH
5 from scapy.layers.inet import IP, UDP
6 from scapy.layers.inet6 import IPv6
7 from scapy.layers.l2 import Ether
8 from scapy.packet import Raw
9
10 from asfframework import VppTestRunner
11 from template_ipsec import (
12     TemplateIpsec,
13     IpsecTra46Tests,
14     IpsecTun46Tests,
15     config_tun_params,
16     config_tra_params,
17     IPsecIPv4Params,
18     IPsecIPv6Params,
19     IpsecTra4,
20     IpsecTun4,
21     IpsecTra6,
22     IpsecTun6,
23     IpsecTun6HandoffTests,
24     IpsecTun4HandoffTests,
25 )
26 from template_ipsec import IpsecTcpTests
27 from vpp_ipsec import VppIpsecSA, VppIpsecSpd, VppIpsecSpdEntry, VppIpsecSpdItfBinding
28 from vpp_ip_route import VppIpRoute, VppRoutePath
29 from vpp_ip import DpoProto
30 from vpp_papi import VppEnum
31
32
33 class ConfigIpsecAH(TemplateIpsec):
34     """
35     Basic test for IPSEC using AH transport and Tunnel mode
36
37     TRANSPORT MODE::
38
39          ---   encrypt   ---
40         |pg2| <-------> |VPP|
41          ---   decrypt   ---
42
43     TUNNEL MODE::
44
45          ---   encrypt   ---   plain   ---
46         |pg0| <-------  |VPP| <------ |pg1|
47          ---             ---           ---
48
49          ---   decrypt   ---   plain   ---
50         |pg0| ------->  |VPP| ------> |pg1|
51          ---             ---           ---
52
53     """
54
55     encryption_type = AH
56     net_objs = []
57     tra4_encrypt_node_name = "ah4-encrypt"
58     tra4_decrypt_node_name = ["ah4-decrypt", "ah4-decrypt"]
59     tra6_encrypt_node_name = "ah6-encrypt"
60     tra6_decrypt_node_name = ["ah6-decrypt", "ah6-decrypt"]
61     tun4_encrypt_node_name = "ah4-encrypt"
62     tun4_decrypt_node_name = ["ah4-decrypt", "ah4-decrypt"]
63     tun6_encrypt_node_name = "ah6-encrypt"
64     tun6_decrypt_node_name = ["ah6-decrypt", "ah6-decrypt"]
65
66     @classmethod
67     def setUpClass(cls):
68         super(ConfigIpsecAH, cls).setUpClass()
69
70     @classmethod
71     def tearDownClass(cls):
72         super(ConfigIpsecAH, cls).tearDownClass()
73
74     def setUp(self):
75         super(ConfigIpsecAH, self).setUp()
76
77     def tearDown(self):
78         super(ConfigIpsecAH, self).tearDown()
79
80     def config_network(self, params):
81         self.net_objs = []
82         self.tun_if = self.pg0
83         self.tra_if = self.pg2
84         self.logger.info(self.vapi.ppcli("show int addr"))
85
86         self.tra_spd = VppIpsecSpd(self, self.tra_spd_id)
87         self.tra_spd.add_vpp_config()
88         self.net_objs.append(self.tra_spd)
89         self.tun_spd = VppIpsecSpd(self, self.tun_spd_id)
90         self.tun_spd.add_vpp_config()
91         self.net_objs.append(self.tun_spd)
92
93         b = VppIpsecSpdItfBinding(self, self.tra_spd, self.tra_if)
94         b.add_vpp_config()
95         self.net_objs.append(b)
96
97         b = VppIpsecSpdItfBinding(self, self.tun_spd, self.tun_if)
98         b.add_vpp_config()
99         self.net_objs.append(b)
100
101         for p in params:
102             self.config_ah_tra(p)
103             config_tra_params(p, self.encryption_type)
104         for p in params:
105             self.config_ah_tun(p)
106             config_tun_params(p, self.encryption_type, self.tun_if)
107         for p in params:
108             d = DpoProto.DPO_PROTO_IP6 if p.is_ipv6 else DpoProto.DPO_PROTO_IP4
109             r = VppIpRoute(
110                 self,
111                 p.remote_tun_if_host,
112                 p.addr_len,
113                 [
114                     VppRoutePath(
115                         self.tun_if.remote_addr[p.addr_type], 0xFFFFFFFF, proto=d
116                     )
117                 ],
118             )
119             r.add_vpp_config()
120             self.net_objs.append(r)
121         self.logger.info(self.vapi.ppcli("show ipsec all"))
122
123     def unconfig_network(self):
124         for o in reversed(self.net_objs):
125             o.remove_vpp_config()
126         self.net_objs = []
127
128     def config_ah_tun(self, params):
129         addr_type = params.addr_type
130         scapy_tun_sa_id = params.scapy_tun_sa_id
131         scapy_tun_spi = params.scapy_tun_spi
132         vpp_tun_sa_id = params.vpp_tun_sa_id
133         vpp_tun_spi = params.vpp_tun_spi
134         auth_algo_vpp_id = params.auth_algo_vpp_id
135         auth_key = params.auth_key
136         crypt_algo_vpp_id = params.crypt_algo_vpp_id
137         crypt_key = params.crypt_key
138         remote_tun_if_host = params.remote_tun_if_host
139         addr_any = params.addr_any
140         addr_bcast = params.addr_bcast
141         flags = params.flags
142         tun_flags = params.tun_flags
143         e = VppEnum.vl_api_ipsec_spd_action_t
144         objs = []
145         params.outer_hop_limit = 253
146         params.outer_flow_label = 0x12345
147
148         params.tun_sa_in = VppIpsecSA(
149             self,
150             scapy_tun_sa_id,
151             scapy_tun_spi,
152             auth_algo_vpp_id,
153             auth_key,
154             crypt_algo_vpp_id,
155             crypt_key,
156             self.vpp_ah_protocol,
157             self.tun_if.remote_addr[addr_type],
158             self.tun_if.local_addr[addr_type],
159             tun_flags=tun_flags,
160             flags=flags,
161             dscp=params.dscp,
162         )
163
164         params.tun_sa_out = VppIpsecSA(
165             self,
166             vpp_tun_sa_id,
167             vpp_tun_spi,
168             auth_algo_vpp_id,
169             auth_key,
170             crypt_algo_vpp_id,
171             crypt_key,
172             self.vpp_ah_protocol,
173             self.tun_if.local_addr[addr_type],
174             self.tun_if.remote_addr[addr_type],
175             tun_flags=tun_flags,
176             flags=flags,
177             dscp=params.dscp,
178         )
179
180         objs.append(params.tun_sa_in)
181         objs.append(params.tun_sa_out)
182
183         params.spd_policy_in_any = VppIpsecSpdEntry(
184             self,
185             self.tun_spd,
186             vpp_tun_sa_id,
187             addr_any,
188             addr_bcast,
189             addr_any,
190             addr_bcast,
191             socket.IPPROTO_AH,
192         )
193         params.spd_policy_out_any = VppIpsecSpdEntry(
194             self,
195             self.tun_spd,
196             vpp_tun_sa_id,
197             addr_any,
198             addr_bcast,
199             addr_any,
200             addr_bcast,
201             socket.IPPROTO_AH,
202             is_outbound=0,
203         )
204
205         objs.append(params.spd_policy_out_any)
206         objs.append(params.spd_policy_in_any)
207
208         e1 = VppIpsecSpdEntry(
209             self,
210             self.tun_spd,
211             scapy_tun_sa_id,
212             remote_tun_if_host,
213             remote_tun_if_host,
214             self.pg1.remote_addr[addr_type],
215             self.pg1.remote_addr[addr_type],
216             socket.IPPROTO_RAW,
217             priority=10,
218             policy=e.IPSEC_API_SPD_ACTION_PROTECT,
219             is_outbound=0,
220         )
221         e2 = VppIpsecSpdEntry(
222             self,
223             self.tun_spd,
224             vpp_tun_sa_id,
225             self.pg1.remote_addr[addr_type],
226             self.pg1.remote_addr[addr_type],
227             remote_tun_if_host,
228             remote_tun_if_host,
229             socket.IPPROTO_RAW,
230             policy=e.IPSEC_API_SPD_ACTION_PROTECT,
231             priority=10,
232         )
233         e3 = VppIpsecSpdEntry(
234             self,
235             self.tun_spd,
236             scapy_tun_sa_id,
237             remote_tun_if_host,
238             remote_tun_if_host,
239             self.pg0.local_addr[addr_type],
240             self.pg0.local_addr[addr_type],
241             socket.IPPROTO_RAW,
242             priority=20,
243             policy=e.IPSEC_API_SPD_ACTION_PROTECT,
244             is_outbound=0,
245         )
246         e4 = VppIpsecSpdEntry(
247             self,
248             self.tun_spd,
249             vpp_tun_sa_id,
250             self.pg0.local_addr[addr_type],
251             self.pg0.local_addr[addr_type],
252             remote_tun_if_host,
253             remote_tun_if_host,
254             socket.IPPROTO_RAW,
255             policy=e.IPSEC_API_SPD_ACTION_PROTECT,
256             priority=20,
257         )
258
259         objs = objs + [e1, e2, e3, e4]
260
261         for o in objs:
262             o.add_vpp_config()
263
264         self.net_objs = self.net_objs + objs
265
266     def config_ah_tra(self, params):
267         addr_type = params.addr_type
268         scapy_tra_sa_id = params.scapy_tra_sa_id
269         scapy_tra_spi = params.scapy_tra_spi
270         vpp_tra_sa_id = params.vpp_tra_sa_id
271         vpp_tra_spi = params.vpp_tra_spi
272         auth_algo_vpp_id = params.auth_algo_vpp_id
273         auth_key = params.auth_key
274         crypt_algo_vpp_id = params.crypt_algo_vpp_id
275         crypt_key = params.crypt_key
276         addr_any = params.addr_any
277         addr_bcast = params.addr_bcast
278         flags = params.flags | (
279             VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_USE_ANTI_REPLAY
280         )
281         e = VppEnum.vl_api_ipsec_spd_action_t
282         objs = []
283
284         params.tra_sa_in = VppIpsecSA(
285             self,
286             scapy_tra_sa_id,
287             scapy_tra_spi,
288             auth_algo_vpp_id,
289             auth_key,
290             crypt_algo_vpp_id,
291             crypt_key,
292             self.vpp_ah_protocol,
293             flags=flags,
294         )
295         params.tra_sa_out = VppIpsecSA(
296             self,
297             vpp_tra_sa_id,
298             vpp_tra_spi,
299             auth_algo_vpp_id,
300             auth_key,
301             crypt_algo_vpp_id,
302             crypt_key,
303             self.vpp_ah_protocol,
304             flags=flags,
305         )
306
307         objs.append(params.tra_sa_in)
308         objs.append(params.tra_sa_out)
309
310         objs.append(
311             VppIpsecSpdEntry(
312                 self,
313                 self.tra_spd,
314                 vpp_tra_sa_id,
315                 addr_any,
316                 addr_bcast,
317                 addr_any,
318                 addr_bcast,
319                 socket.IPPROTO_AH,
320             )
321         )
322         objs.append(
323             VppIpsecSpdEntry(
324                 self,
325                 self.tra_spd,
326                 scapy_tra_sa_id,
327                 addr_any,
328                 addr_bcast,
329                 addr_any,
330                 addr_bcast,
331                 socket.IPPROTO_AH,
332                 is_outbound=0,
333             )
334         )
335         objs.append(
336             VppIpsecSpdEntry(
337                 self,
338                 self.tra_spd,
339                 scapy_tra_sa_id,
340                 self.tra_if.local_addr[addr_type],
341                 self.tra_if.local_addr[addr_type],
342                 self.tra_if.remote_addr[addr_type],
343                 self.tra_if.remote_addr[addr_type],
344                 socket.IPPROTO_RAW,
345                 priority=10,
346                 policy=e.IPSEC_API_SPD_ACTION_PROTECT,
347                 is_outbound=0,
348             )
349         )
350         objs.append(
351             VppIpsecSpdEntry(
352                 self,
353                 self.tra_spd,
354                 vpp_tra_sa_id,
355                 self.tra_if.local_addr[addr_type],
356                 self.tra_if.local_addr[addr_type],
357                 self.tra_if.remote_addr[addr_type],
358                 self.tra_if.remote_addr[addr_type],
359                 socket.IPPROTO_RAW,
360                 policy=e.IPSEC_API_SPD_ACTION_PROTECT,
361                 priority=10,
362             )
363         )
364
365         for o in objs:
366             o.add_vpp_config()
367         self.net_objs = self.net_objs + objs
368
369
370 class TemplateIpsecAh(ConfigIpsecAH):
371     """
372     Basic test for IPSEC using AH transport and Tunnel mode
373
374     TRANSPORT MODE::
375
376          ---   encrypt   ---
377         |pg2| <-------> |VPP|
378          ---   decrypt   ---
379
380     TUNNEL MODE::
381
382          ---   encrypt   ---   plain   ---
383         |pg0| <-------  |VPP| <------ |pg1|
384          ---             ---           ---
385
386          ---   decrypt   ---   plain   ---
387         |pg0| ------->  |VPP| ------> |pg1|
388          ---             ---           ---
389
390     """
391
392     @classmethod
393     def setUpClass(cls):
394         super(TemplateIpsecAh, cls).setUpClass()
395
396     @classmethod
397     def tearDownClass(cls):
398         super(TemplateIpsecAh, cls).tearDownClass()
399
400     def setUp(self):
401         super(TemplateIpsecAh, self).setUp()
402         self.config_network(self.params.values())
403
404     def tearDown(self):
405         self.unconfig_network()
406         super(TemplateIpsecAh, self).tearDown()
407
408
409 class TestIpsecAh1(TemplateIpsecAh, IpsecTcpTests):
410     """Ipsec AH - TCP tests"""
411
412     pass
413
414
415 class TestIpsecAh2(TemplateIpsecAh, IpsecTra46Tests, IpsecTun46Tests):
416     """Ipsec AH w/ SHA1"""
417
418     pass
419
420
421 class TestIpsecAhTun(TemplateIpsecAh, IpsecTun46Tests):
422     """Ipsec AH - TUN encap tests"""
423
424     def setUp(self):
425         self.ipv4_params = IPsecIPv4Params()
426         self.ipv6_params = IPsecIPv6Params()
427
428         c = (
429             VppEnum.vl_api_tunnel_encap_decap_flags_t.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_DSCP
430         )
431         c1 = c | (
432             VppEnum.vl_api_tunnel_encap_decap_flags_t.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_ECN
433         )
434
435         self.ipv4_params.tun_flags = c
436         self.ipv6_params.tun_flags = c1
437
438         super(TestIpsecAhTun, self).setUp()
439
440     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=54):
441         # set the DSCP + ECN - flags are set to copy only DSCP
442         return [
443             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
444             / IP(src=src, dst=dst, tos=5)
445             / UDP(sport=4444, dport=4444)
446             / Raw(b"X" * payload_size)
447             for i in range(count)
448         ]
449
450     def gen_pkts6(self, p, sw_intf, src, dst, count=1, payload_size=54):
451         # set the DSCP + ECN - flags are set to copy both
452         return [
453             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
454             / IPv6(src=src, dst=dst, tc=5)
455             / UDP(sport=4444, dport=4444)
456             / Raw(b"X" * payload_size)
457             for i in range(count)
458         ]
459
460     def verify_encrypted(self, p, sa, rxs):
461         # just check that only the DSCP is copied
462         for rx in rxs:
463             self.assertEqual(rx[IP].tos, 4)
464
465     def verify_encrypted6(self, p, sa, rxs):
466         # just check that the DSCP & ECN are copied
467         for rx in rxs:
468             self.assertEqual(rx[IPv6].tc, 5)
469
470
471 class TestIpsecAhTun2(TemplateIpsecAh, IpsecTun46Tests):
472     """Ipsec AH - TUN encap tests"""
473
474     def setUp(self):
475         self.ipv4_params = IPsecIPv4Params()
476         self.ipv6_params = IPsecIPv6Params()
477
478         self.ipv4_params.dscp = 3
479         self.ipv6_params.dscp = 4
480
481         super(TestIpsecAhTun2, self).setUp()
482
483     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=54):
484         # set the DSCP + ECN - flags are set to copy only DSCP
485         return [
486             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
487             / IP(src=src, dst=dst, tos=0)
488             / UDP(sport=4444, dport=4444)
489             / Raw(b"X" * payload_size)
490             for i in range(count)
491         ]
492
493     def gen_pkts6(self, p, sw_intf, src, dst, count=1, payload_size=54):
494         # set the DSCP + ECN - flags are set to copy both
495         return [
496             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
497             / IPv6(src=src, dst=dst, tc=0)
498             / UDP(sport=4444, dport=4444)
499             / Raw(b"X" * payload_size)
500             for i in range(count)
501         ]
502
503     def verify_encrypted(self, p, sa, rxs):
504         # just check that only the DSCP is copied
505         for rx in rxs:
506             self.assertEqual(rx[IP].tos, 0xC)
507
508     def verify_encrypted6(self, p, sa, rxs):
509         # just check that the DSCP & ECN are copied
510         for rx in rxs:
511             self.assertEqual(rx[IPv6].tc, 0x10)
512
513
514 class TestIpsecAhHandoff(TemplateIpsecAh, IpsecTun6HandoffTests, IpsecTun4HandoffTests):
515     """Ipsec AH Handoff"""
516
517     pass
518
519
520 class TestIpsecAhAll(ConfigIpsecAH, IpsecTra4, IpsecTra6, IpsecTun4, IpsecTun6):
521     """Ipsec AH all Algos"""
522
523     def setUp(self):
524         super(TestIpsecAhAll, self).setUp()
525
526     def tearDown(self):
527         super(TestIpsecAhAll, self).tearDown()
528
529     def test_integ_algs(self):
530         """All Engines SHA[1_96, 256, 384, 512] w/ & w/o ESN"""
531         # foreach VPP crypto engine
532         engines = ["ia32", "ipsecmb", "openssl"]
533
534         algos = [
535             {
536                 "vpp": VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA1_96,
537                 "scapy": "HMAC-SHA1-96",
538             },
539             {
540                 "vpp": VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA_256_128,
541                 "scapy": "SHA2-256-128",
542             },
543             {
544                 "vpp": VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA_384_192,
545                 "scapy": "SHA2-384-192",
546             },
547             {
548                 "vpp": VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA_512_256,
549                 "scapy": "SHA2-512-256",
550             },
551         ]
552
553         flags = [0, (VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_USE_ESN)]
554
555         #
556         # loop through the VPP engines
557         #
558         for engine in engines:
559             self.vapi.cli("set crypto handler all %s" % engine)
560             #
561             # loop through each of the algorithms
562             #
563             for algo in algos:
564                 # with self.subTest(algo=algo['scapy']):
565                 for flag in flags:
566                     #
567                     # setup up the config paramters
568                     #
569                     self.ipv4_params = IPsecIPv4Params()
570                     self.ipv6_params = IPsecIPv6Params()
571
572                     self.params = {
573                         self.ipv4_params.addr_type: self.ipv4_params,
574                         self.ipv6_params.addr_type: self.ipv6_params,
575                     }
576
577                     for _, p in self.params.items():
578                         p.auth_algo_vpp_id = algo["vpp"]
579                         p.auth_algo = algo["scapy"]
580                         p.flags = p.flags | flag
581
582                     #
583                     # configure the SPDs. SAs, etc
584                     #
585                     self.config_network(self.params.values())
586
587                     #
588                     # run some traffic.
589                     #  An exhautsive 4o6, 6o4 is not necessary for each algo
590                     #
591                     self.verify_tra_basic6(count=17)
592                     self.verify_tra_basic4(count=17)
593                     self.verify_tun_66(self.params[socket.AF_INET6], count=17)
594                     self.verify_tun_44(self.params[socket.AF_INET], count=17)
595
596                     #
597                     # remove the SPDs, SAs, etc
598                     #
599                     self.unconfig_network()
600
601
602 if __name__ == "__main__":
603     unittest.main(testRunner=VppTestRunner)