4 from scapy.layers.ipsec import AH
5 from scapy.layers.inet import IP, UDP
6 from scapy.layers.inet6 import IPv6
7 from scapy.layers.l2 import Ether
8 from scapy.packet import Raw
10 from framework import VppTestRunner
11 from template_ipsec import (
23 IpsecTun6HandoffTests,
24 IpsecTun4HandoffTests,
26 from template_ipsec import IpsecTcpTests
27 from vpp_ipsec import VppIpsecSA, VppIpsecSpd, VppIpsecSpdEntry, VppIpsecSpdItfBinding
28 from vpp_ip_route import VppIpRoute, VppRoutePath
29 from vpp_ip import DpoProto
30 from vpp_papi import VppEnum
33 class ConfigIpsecAH(TemplateIpsec):
35 Basic test for IPSEC using AH transport and Tunnel mode
45 --- encrypt --- plain ---
46 |pg0| <------- |VPP| <------ |pg1|
49 --- decrypt --- plain ---
50 |pg0| -------> |VPP| ------> |pg1|
57 tra4_encrypt_node_name = "ah4-encrypt"
58 tra4_decrypt_node_name = ["ah4-decrypt", "ah4-decrypt"]
59 tra6_encrypt_node_name = "ah6-encrypt"
60 tra6_decrypt_node_name = ["ah6-decrypt", "ah6-decrypt"]
61 tun4_encrypt_node_name = "ah4-encrypt"
62 tun4_decrypt_node_name = ["ah4-decrypt", "ah4-decrypt"]
63 tun6_encrypt_node_name = "ah6-encrypt"
64 tun6_decrypt_node_name = ["ah6-decrypt", "ah6-decrypt"]
68 super(ConfigIpsecAH, cls).setUpClass()
71 def tearDownClass(cls):
72 super(ConfigIpsecAH, cls).tearDownClass()
75 super(ConfigIpsecAH, self).setUp()
78 super(ConfigIpsecAH, self).tearDown()
80 def config_network(self, params):
82 self.tun_if = self.pg0
83 self.tra_if = self.pg2
84 self.logger.info(self.vapi.ppcli("show int addr"))
86 self.tra_spd = VppIpsecSpd(self, self.tra_spd_id)
87 self.tra_spd.add_vpp_config()
88 self.net_objs.append(self.tra_spd)
89 self.tun_spd = VppIpsecSpd(self, self.tun_spd_id)
90 self.tun_spd.add_vpp_config()
91 self.net_objs.append(self.tun_spd)
93 b = VppIpsecSpdItfBinding(self, self.tra_spd, self.tra_if)
95 self.net_objs.append(b)
97 b = VppIpsecSpdItfBinding(self, self.tun_spd, self.tun_if)
99 self.net_objs.append(b)
102 self.config_ah_tra(p)
103 config_tra_params(p, self.encryption_type)
105 self.config_ah_tun(p)
106 config_tun_params(p, self.encryption_type, self.tun_if)
108 d = DpoProto.DPO_PROTO_IP6 if p.is_ipv6 else DpoProto.DPO_PROTO_IP4
111 p.remote_tun_if_host,
115 self.tun_if.remote_addr[p.addr_type], 0xFFFFFFFF, proto=d
120 self.net_objs.append(r)
121 self.logger.info(self.vapi.ppcli("show ipsec all"))
123 def unconfig_network(self):
124 for o in reversed(self.net_objs):
125 o.remove_vpp_config()
128 def config_ah_tun(self, params):
129 addr_type = params.addr_type
130 scapy_tun_sa_id = params.scapy_tun_sa_id
131 scapy_tun_spi = params.scapy_tun_spi
132 vpp_tun_sa_id = params.vpp_tun_sa_id
133 vpp_tun_spi = params.vpp_tun_spi
134 auth_algo_vpp_id = params.auth_algo_vpp_id
135 auth_key = params.auth_key
136 crypt_algo_vpp_id = params.crypt_algo_vpp_id
137 crypt_key = params.crypt_key
138 remote_tun_if_host = params.remote_tun_if_host
139 addr_any = params.addr_any
140 addr_bcast = params.addr_bcast
142 tun_flags = params.tun_flags
143 e = VppEnum.vl_api_ipsec_spd_action_t
145 params.outer_hop_limit = 253
146 params.outer_flow_label = 0x12345
148 params.tun_sa_in = VppIpsecSA(
156 self.vpp_ah_protocol,
157 self.tun_if.remote_addr[addr_type],
158 self.tun_if.local_addr[addr_type],
164 params.tun_sa_out = VppIpsecSA(
172 self.vpp_ah_protocol,
173 self.tun_if.local_addr[addr_type],
174 self.tun_if.remote_addr[addr_type],
180 objs.append(params.tun_sa_in)
181 objs.append(params.tun_sa_out)
183 params.spd_policy_in_any = VppIpsecSpdEntry(
193 params.spd_policy_out_any = VppIpsecSpdEntry(
205 objs.append(params.spd_policy_out_any)
206 objs.append(params.spd_policy_in_any)
208 e1 = VppIpsecSpdEntry(
214 self.pg1.remote_addr[addr_type],
215 self.pg1.remote_addr[addr_type],
218 policy=e.IPSEC_API_SPD_ACTION_PROTECT,
221 e2 = VppIpsecSpdEntry(
225 self.pg1.remote_addr[addr_type],
226 self.pg1.remote_addr[addr_type],
230 policy=e.IPSEC_API_SPD_ACTION_PROTECT,
233 e3 = VppIpsecSpdEntry(
239 self.pg0.local_addr[addr_type],
240 self.pg0.local_addr[addr_type],
243 policy=e.IPSEC_API_SPD_ACTION_PROTECT,
246 e4 = VppIpsecSpdEntry(
250 self.pg0.local_addr[addr_type],
251 self.pg0.local_addr[addr_type],
255 policy=e.IPSEC_API_SPD_ACTION_PROTECT,
259 objs = objs + [e1, e2, e3, e4]
264 self.net_objs = self.net_objs + objs
266 def config_ah_tra(self, params):
267 addr_type = params.addr_type
268 scapy_tra_sa_id = params.scapy_tra_sa_id
269 scapy_tra_spi = params.scapy_tra_spi
270 vpp_tra_sa_id = params.vpp_tra_sa_id
271 vpp_tra_spi = params.vpp_tra_spi
272 auth_algo_vpp_id = params.auth_algo_vpp_id
273 auth_key = params.auth_key
274 crypt_algo_vpp_id = params.crypt_algo_vpp_id
275 crypt_key = params.crypt_key
276 addr_any = params.addr_any
277 addr_bcast = params.addr_bcast
278 flags = params.flags | (
279 VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_USE_ANTI_REPLAY
281 e = VppEnum.vl_api_ipsec_spd_action_t
284 params.tra_sa_in = VppIpsecSA(
292 self.vpp_ah_protocol,
295 params.tra_sa_out = VppIpsecSA(
303 self.vpp_ah_protocol,
307 objs.append(params.tra_sa_in)
308 objs.append(params.tra_sa_out)
340 self.tra_if.local_addr[addr_type],
341 self.tra_if.local_addr[addr_type],
342 self.tra_if.remote_addr[addr_type],
343 self.tra_if.remote_addr[addr_type],
346 policy=e.IPSEC_API_SPD_ACTION_PROTECT,
355 self.tra_if.local_addr[addr_type],
356 self.tra_if.local_addr[addr_type],
357 self.tra_if.remote_addr[addr_type],
358 self.tra_if.remote_addr[addr_type],
360 policy=e.IPSEC_API_SPD_ACTION_PROTECT,
367 self.net_objs = self.net_objs + objs
370 class TemplateIpsecAh(ConfigIpsecAH):
372 Basic test for IPSEC using AH transport and Tunnel mode
377 |pg2| <-------> |VPP|
382 --- encrypt --- plain ---
383 |pg0| <------- |VPP| <------ |pg1|
386 --- decrypt --- plain ---
387 |pg0| -------> |VPP| ------> |pg1|
394 super(TemplateIpsecAh, cls).setUpClass()
397 def tearDownClass(cls):
398 super(TemplateIpsecAh, cls).tearDownClass()
401 super(TemplateIpsecAh, self).setUp()
402 self.config_network(self.params.values())
405 self.unconfig_network()
406 super(TemplateIpsecAh, self).tearDown()
409 class TestIpsecAh1(TemplateIpsecAh, IpsecTcpTests):
410 """Ipsec AH - TCP tests"""
415 class TestIpsecAh2(TemplateIpsecAh, IpsecTra46Tests, IpsecTun46Tests):
416 """Ipsec AH w/ SHA1"""
421 class TestIpsecAhTun(TemplateIpsecAh, IpsecTun46Tests):
422 """Ipsec AH - TUN encap tests"""
425 self.ipv4_params = IPsecIPv4Params()
426 self.ipv6_params = IPsecIPv6Params()
429 VppEnum.vl_api_tunnel_encap_decap_flags_t.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_DSCP
432 VppEnum.vl_api_tunnel_encap_decap_flags_t.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_ECN
435 self.ipv4_params.tun_flags = c
436 self.ipv6_params.tun_flags = c1
438 super(TestIpsecAhTun, self).setUp()
440 def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=54):
441 # set the DSCP + ECN - flags are set to copy only DSCP
443 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
444 / IP(src=src, dst=dst, tos=5)
445 / UDP(sport=4444, dport=4444)
446 / Raw(b"X" * payload_size)
447 for i in range(count)
450 def gen_pkts6(self, p, sw_intf, src, dst, count=1, payload_size=54):
451 # set the DSCP + ECN - flags are set to copy both
453 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
454 / IPv6(src=src, dst=dst, tc=5)
455 / UDP(sport=4444, dport=4444)
456 / Raw(b"X" * payload_size)
457 for i in range(count)
460 def verify_encrypted(self, p, sa, rxs):
461 # just check that only the DSCP is copied
463 self.assertEqual(rx[IP].tos, 4)
465 def verify_encrypted6(self, p, sa, rxs):
466 # just check that the DSCP & ECN are copied
468 self.assertEqual(rx[IPv6].tc, 5)
471 class TestIpsecAhTun2(TemplateIpsecAh, IpsecTun46Tests):
472 """Ipsec AH - TUN encap tests"""
475 self.ipv4_params = IPsecIPv4Params()
476 self.ipv6_params = IPsecIPv6Params()
478 self.ipv4_params.dscp = 3
479 self.ipv6_params.dscp = 4
481 super(TestIpsecAhTun2, self).setUp()
483 def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=54):
484 # set the DSCP + ECN - flags are set to copy only DSCP
486 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
487 / IP(src=src, dst=dst, tos=0)
488 / UDP(sport=4444, dport=4444)
489 / Raw(b"X" * payload_size)
490 for i in range(count)
493 def gen_pkts6(self, p, sw_intf, src, dst, count=1, payload_size=54):
494 # set the DSCP + ECN - flags are set to copy both
496 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
497 / IPv6(src=src, dst=dst, tc=0)
498 / UDP(sport=4444, dport=4444)
499 / Raw(b"X" * payload_size)
500 for i in range(count)
503 def verify_encrypted(self, p, sa, rxs):
504 # just check that only the DSCP is copied
506 self.assertEqual(rx[IP].tos, 0xC)
508 def verify_encrypted6(self, p, sa, rxs):
509 # just check that the DSCP & ECN are copied
511 self.assertEqual(rx[IPv6].tc, 0x10)
514 class TestIpsecAhHandoff(TemplateIpsecAh, IpsecTun6HandoffTests, IpsecTun4HandoffTests):
515 """Ipsec AH Handoff"""
520 class TestIpsecAhAll(ConfigIpsecAH, IpsecTra4, IpsecTra6, IpsecTun4, IpsecTun6):
521 """Ipsec AH all Algos"""
524 super(TestIpsecAhAll, self).setUp()
527 super(TestIpsecAhAll, self).tearDown()
529 def test_integ_algs(self):
530 """All Engines SHA[1_96, 256, 384, 512] w/ & w/o ESN"""
531 # foreach VPP crypto engine
532 engines = ["ia32", "ipsecmb", "openssl"]
536 "vpp": VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA1_96,
537 "scapy": "HMAC-SHA1-96",
540 "vpp": VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA_256_128,
541 "scapy": "SHA2-256-128",
544 "vpp": VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA_384_192,
545 "scapy": "SHA2-384-192",
548 "vpp": VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA_512_256,
549 "scapy": "SHA2-512-256",
553 flags = [0, (VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_USE_ESN)]
556 # loop through the VPP engines
558 for engine in engines:
559 self.vapi.cli("set crypto handler all %s" % engine)
561 # loop through each of the algorithms
564 # with self.subTest(algo=algo['scapy']):
567 # setup up the config paramters
569 self.ipv4_params = IPsecIPv4Params()
570 self.ipv6_params = IPsecIPv6Params()
573 self.ipv4_params.addr_type: self.ipv4_params,
574 self.ipv6_params.addr_type: self.ipv6_params,
577 for _, p in self.params.items():
578 p.auth_algo_vpp_id = algo["vpp"]
579 p.auth_algo = algo["scapy"]
580 p.flags = p.flags | flag
583 # configure the SPDs. SAs, etc
585 self.config_network(self.params.values())
589 # An exhautsive 4o6, 6o4 is not necessary for each algo
591 self.verify_tra_basic6(count=17)
592 self.verify_tra_basic4(count=17)
593 self.verify_tun_66(self.params[socket.AF_INET6], count=17)
594 self.verify_tun_44(self.params[socket.AF_INET], count=17)
597 # remove the SPDs, SAs, etc
599 self.unconfig_network()
602 if __name__ == "__main__":
603 unittest.main(testRunner=VppTestRunner)