- super(TestIpsecAh, self).tearDown()
- if not self.vpp_dead:
- self.vapi.cli("show hardware")
-
- def send_and_expect(self, input, pkts, output, count=1):
- input.add_stream(pkts)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
- rx = output.get_capture(count)
- return rx
-
- def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1):
- return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
- sa.encrypt(IP(src=src, dst=dst) / ICMP() /
- "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX")
- ] * count
-
- def gen_pkts(self, sw_intf, src, dst, count=1):
- return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
- IP(src=src, dst=dst) / ICMP() /
- "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"
- ] * count
-
- def test_ipsec_ah_tra_basic(self, count=1):
- """ ipsec ah v4 transport basic test """
- try:
- self.configScapySA()
- send_pkts = self.gen_encrypt_pkts(
- self.remote_tra_sa,
- self.pg2,
- src=self.pg2.remote_ip4,
- dst=self.pg2.local_ip4,
- count=count)
- recv_pkts = self.send_and_expect(
- self.pg2, send_pkts, self.pg2, count=count)
- # ESP TRA VPP encryption/decryption verification
- for Pkts in recv_pkts:
- Pkts[AH].padding = Pkts[AH].icv[12:]
- Pkts[AH].icv = Pkts[AH].icv[:12]
- decrypt_pkt = self.local_tra_sa.decrypt(Pkts[IP])
- finally:
- self.logger.info(self.vapi.ppcli("show error"))
- self.logger.info(self.vapi.ppcli("show ipsec"))
-
- def test_ipsec_ah_tra_burst(self):
- """ ipsec ah v4 transport burst test """
- try:
- self.test_ipsec_ah_tra_basic(count=257)
- finally:
- self.logger.info(self.vapi.ppcli("show error"))
- self.logger.info(self.vapi.ppcli("show ipsec"))
-
- def test_ipsec_ah_tun_basic(self, count=1):
- """ ipsec ah 4o4 tunnel basic test """
- try:
- self.configScapySA(is_tun=True)
- send_pkts = self.gen_encrypt_pkts(
- self.remote_tun_sa,
- self.pg0,
- src=self.remote_pg0_lb_addr,
- dst=self.remote_pg1_lb_addr,
- count=count)
- recv_pkts = self.send_and_expect(
- self.pg0, send_pkts, self.pg1, count=count)
- # ESP TUN VPP decryption verification
- for recv_pkt in recv_pkts:
- self.assert_equal(recv_pkt[IP].src, self.remote_pg0_lb_addr)
- self.assert_equal(recv_pkt[IP].dst, self.remote_pg1_lb_addr)
- send_pkts = self.gen_pkts(
- self.pg1,
- src=self.remote_pg1_lb_addr,
- dst=self.remote_pg0_lb_addr,
- count=count)
- recv_pkts = self.send_and_expect(
- self.pg1, send_pkts, self.pg0, count=count)
- # ESP TUN VPP encryption verification
- for recv_pkt in recv_pkts:
- recv_pkt[IP] = recv_pkt[IP] / IP(recv_pkt[AH].icv[12:])
- recv_pkt[AH].icv = recv_pkt[AH].icv[:12]
- decrypt_pkt = self.local_tun_sa.decrypt(recv_pkt[IP])
- self.assert_equal(decrypt_pkt.src, self.remote_pg1_lb_addr)
- self.assert_equal(decrypt_pkt.dst, self.remote_pg0_lb_addr)
- finally:
- self.logger.info(self.vapi.ppcli("show error"))
- self.logger.info(self.vapi.ppcli("show ipsec"))
-
- def test_ipsec_ah_tun_burst(self):
- """ ipsec ah 4o4 tunnel burst test """
- try:
- self.test_ipsec_ah_tun_basic(count=257)
- finally:
- self.logger.info(self.vapi.ppcli("show error"))
- self.logger.info(self.vapi.ppcli("show ipsec"))
-
-
-if __name__ == '__main__':
+ self.unconfig_network()
+ super(TemplateIpsecAh, self).tearDown()
+
+
+class TestIpsecAh1(TemplateIpsecAh, IpsecTcpTests):
+ """Ipsec AH - TCP tests"""
+
+ pass
+
+
+class TestIpsecAh2(TemplateIpsecAh, IpsecTra46Tests, IpsecTun46Tests):
+ """Ipsec AH w/ SHA1"""
+
+ pass
+
+
+class TestIpsecAhTun(TemplateIpsecAh, IpsecTun46Tests):
+ """Ipsec AH - TUN encap tests"""
+
+ def setUp(self):
+ self.ipv4_params = IPsecIPv4Params()
+ self.ipv6_params = IPsecIPv6Params()
+
+ c = (
+ VppEnum.vl_api_tunnel_encap_decap_flags_t.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_DSCP
+ )
+ c1 = c | (
+ VppEnum.vl_api_tunnel_encap_decap_flags_t.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_ECN
+ )
+
+ self.ipv4_params.tun_flags = c
+ self.ipv6_params.tun_flags = c1
+
+ super(TestIpsecAhTun, self).setUp()
+
+ def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=54):
+ # set the DSCP + ECN - flags are set to copy only DSCP
+ return [
+ Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
+ / IP(src=src, dst=dst, tos=5)
+ / UDP(sport=4444, dport=4444)
+ / Raw(b"X" * payload_size)
+ for i in range(count)
+ ]
+
+ def gen_pkts6(self, p, sw_intf, src, dst, count=1, payload_size=54):
+ # set the DSCP + ECN - flags are set to copy both
+ return [
+ Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
+ / IPv6(src=src, dst=dst, tc=5)
+ / UDP(sport=4444, dport=4444)
+ / Raw(b"X" * payload_size)
+ for i in range(count)
+ ]
+
+ def verify_encrypted(self, p, sa, rxs):
+ # just check that only the DSCP is copied
+ for rx in rxs:
+ self.assertEqual(rx[IP].tos, 4)
+
+ def verify_encrypted6(self, p, sa, rxs):
+ # just check that the DSCP & ECN are copied
+ for rx in rxs:
+ self.assertEqual(rx[IPv6].tc, 5)
+
+
+class TestIpsecAhTun2(TemplateIpsecAh, IpsecTun46Tests):
+ """Ipsec AH - TUN encap tests"""
+
+ def setUp(self):
+ self.ipv4_params = IPsecIPv4Params()
+ self.ipv6_params = IPsecIPv6Params()
+
+ self.ipv4_params.dscp = 3
+ self.ipv6_params.dscp = 4
+
+ super(TestIpsecAhTun2, self).setUp()
+
+ def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=54):
+ # set the DSCP + ECN - flags are set to copy only DSCP
+ return [
+ Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
+ / IP(src=src, dst=dst, tos=0)
+ / UDP(sport=4444, dport=4444)
+ / Raw(b"X" * payload_size)
+ for i in range(count)
+ ]
+
+ def gen_pkts6(self, p, sw_intf, src, dst, count=1, payload_size=54):
+ # set the DSCP + ECN - flags are set to copy both
+ return [
+ Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
+ / IPv6(src=src, dst=dst, tc=0)
+ / UDP(sport=4444, dport=4444)
+ / Raw(b"X" * payload_size)
+ for i in range(count)
+ ]
+
+ def verify_encrypted(self, p, sa, rxs):
+ # just check that only the DSCP is copied
+ for rx in rxs:
+ self.assertEqual(rx[IP].tos, 0xC)
+
+ def verify_encrypted6(self, p, sa, rxs):
+ # just check that the DSCP & ECN are copied
+ for rx in rxs:
+ self.assertEqual(rx[IPv6].tc, 0x10)
+
+
+class TestIpsecAhHandoff(TemplateIpsecAh, IpsecTun6HandoffTests, IpsecTun4HandoffTests):
+ """Ipsec AH Handoff"""
+
+ pass
+
+
+class TestIpsecAhAll(ConfigIpsecAH, IpsecTra4, IpsecTra6, IpsecTun4, IpsecTun6):
+ """Ipsec AH all Algos"""
+
+ def setUp(self):
+ super(TestIpsecAhAll, self).setUp()
+
+ def tearDown(self):
+ super(TestIpsecAhAll, self).tearDown()
+
+ def test_integ_algs(self):
+ """All Engines SHA[1_96, 256, 384, 512] w/ & w/o ESN"""
+ # foreach VPP crypto engine
+ engines = ["ia32", "ipsecmb", "openssl"]
+
+ algos = [
+ {
+ "vpp": VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA1_96,
+ "scapy": "HMAC-SHA1-96",
+ },
+ {
+ "vpp": VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA_256_128,
+ "scapy": "SHA2-256-128",
+ },
+ {
+ "vpp": VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA_384_192,
+ "scapy": "SHA2-384-192",
+ },
+ {
+ "vpp": VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA_512_256,
+ "scapy": "SHA2-512-256",
+ },
+ ]
+
+ flags = [0, (VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_USE_ESN)]
+
+ #
+ # loop through the VPP engines
+ #
+ for engine in engines:
+ self.vapi.cli("set crypto handler all %s" % engine)
+ #
+ # loop through each of the algorithms
+ #
+ for algo in algos:
+ # with self.subTest(algo=algo['scapy']):
+ for flag in flags:
+ #
+ # setup up the config paramters
+ #
+ self.ipv4_params = IPsecIPv4Params()
+ self.ipv6_params = IPsecIPv6Params()
+
+ self.params = {
+ self.ipv4_params.addr_type: self.ipv4_params,
+ self.ipv6_params.addr_type: self.ipv6_params,
+ }
+
+ for _, p in self.params.items():
+ p.auth_algo_vpp_id = algo["vpp"]
+ p.auth_algo = algo["scapy"]
+ p.flags = p.flags | flag
+
+ #
+ # configure the SPDs. SAs, etc
+ #
+ self.config_network(self.params.values())
+
+ #
+ # run some traffic.
+ # An exhautsive 4o6, 6o4 is not necessary for each algo
+ #
+ self.verify_tra_basic6(count=17)
+ self.verify_tra_basic4(count=17)
+ self.verify_tun_66(self.params[socket.AF_INET6], count=17)
+ self.verify_tun_44(self.params[socket.AF_INET], count=17)
+
+ #
+ # remove the SPDs, SAs, etc
+ #
+ self.unconfig_network()
+
+
+if __name__ == "__main__":