- def config_esp_tra(cls):
- cls.vapi.ipsec_sad_add_del_entry(cls.scapy_tra_sa_id,
- cls.scapy_tra_spi,
- cls.auth_algo_vpp_id, cls.auth_key,
- cls.crypt_algo_vpp_id,
- cls.crypt_key, cls.vpp_esp_protocol,
- is_tunnel=0)
- cls.vapi.ipsec_sad_add_del_entry(cls.vpp_tra_sa_id,
- cls.vpp_tra_spi,
- cls.auth_algo_vpp_id, cls.auth_key,
- cls.crypt_algo_vpp_id,
- cls.crypt_key, cls.vpp_esp_protocol,
- is_tunnel=0)
- cls.vapi.ipsec_spd_add_del(cls.tra_spd_id)
- cls.vapi.ipsec_interface_add_del_spd(cls.tra_spd_id,
- cls.tra_if.sw_if_index)
- l_startaddr = r_startaddr = socket.inet_pton(socket.AF_INET,
- "0.0.0.0")
- l_stopaddr = r_stopaddr = socket.inet_pton(socket.AF_INET,
- "255.255.255.255")
- cls.vapi.ipsec_spd_add_del_entry(cls.tra_spd_id, cls.vpp_tra_sa_id,
- l_startaddr, l_stopaddr, r_startaddr,
- r_stopaddr,
- protocol=socket.IPPROTO_ESP)
- cls.vapi.ipsec_spd_add_del_entry(cls.tra_spd_id, cls.vpp_tra_sa_id,
- l_startaddr, l_stopaddr, r_startaddr,
- r_stopaddr, is_outbound=0,
- protocol=socket.IPPROTO_ESP)
- l_startaddr = l_stopaddr = cls.tra_if.local_ip4n
- r_startaddr = r_stopaddr = cls.tra_if.remote_ip4n
- cls.vapi.ipsec_spd_add_del_entry(cls.tra_spd_id, cls.vpp_tra_sa_id,
- l_startaddr, l_stopaddr, r_startaddr,
- r_stopaddr, priority=10, policy=3,
- is_outbound=0)
- cls.vapi.ipsec_spd_add_del_entry(cls.tra_spd_id, cls.scapy_tra_sa_id,
- l_startaddr, l_stopaddr, r_startaddr,
- r_stopaddr, priority=10, policy=3)
-
-
-class TestIpsecEsp1(TemplateIpsecEsp, IpsecTraTests, IpsecTunTests):
- """ Ipsec ESP - TUN & TRA tests """
- pass
-
-
-class TestIpsecEsp2(TemplateIpsecEsp, IpsecTcpTests):
- """ Ipsec ESP - TCP tests """
- pass
+ def configEspTra(cls):
+ try:
+ spd_id = 2
+ remote_sa_id = 30
+ local_sa_id = 40
+ remote_tra_spi = 2001
+ local_tra_spi = 2000
+ cls.vapi.ipsec_sad_add_del_entry(
+ remote_sa_id,
+ remote_tra_spi,
+ integrity_key_length=20,
+ crypto_key_length=16,
+ protocol=1,
+ is_tunnel=0)
+ cls.vapi.ipsec_sad_add_del_entry(
+ local_sa_id,
+ local_tra_spi,
+ integrity_key_length=20,
+ crypto_key_length=16,
+ protocol=1,
+ is_tunnel=0)
+ cls.vapi.ipsec_spd_add_del(spd_id)
+ cls.vapi.ipsec_interface_add_del_spd(spd_id, cls.pg2.sw_if_index)
+ l_startaddr = r_startaddr = socket.inet_pton(
+ socket.AF_INET, "0.0.0.0")
+ l_stopaddr = r_stopaddr = socket.inet_pton(
+ socket.AF_INET, "255.255.255.255")
+ cls.vapi.ipsec_spd_add_del_entry(
+ spd_id,
+ l_startaddr,
+ l_stopaddr,
+ r_startaddr,
+ r_stopaddr,
+ protocol=socket.IPPROTO_ESP)
+ cls.vapi.ipsec_spd_add_del_entry(
+ spd_id,
+ l_startaddr,
+ l_stopaddr,
+ r_startaddr,
+ r_stopaddr,
+ protocol=socket.IPPROTO_ESP,
+ is_outbound=0)
+ l_startaddr = l_stopaddr = cls.pg2.local_ip4n
+ r_startaddr = r_stopaddr = cls.pg2.remote_ip4n
+ cls.vapi.ipsec_spd_add_del_entry(
+ spd_id,
+ l_startaddr,
+ l_stopaddr,
+ r_startaddr,
+ r_stopaddr,
+ priority=10,
+ policy=3,
+ is_outbound=0,
+ sa_id=local_sa_id)
+ cls.vapi.ipsec_spd_add_del_entry(
+ spd_id,
+ l_startaddr,
+ l_stopaddr,
+ r_startaddr,
+ r_stopaddr,
+ priority=10,
+ policy=3,
+ sa_id=remote_sa_id)
+ except Exception:
+ raise
+
+ def configScapySA(self, is_tun=False):
+ if is_tun:
+ self.remote_tun_sa = SecurityAssociation(
+ ESP,
+ spi=0x000003e8,
+ crypt_algo='AES-CBC',
+ crypt_key='JPjyOWBeVEQiMe7h',
+ auth_algo='HMAC-SHA1-96',
+ auth_key='C91KUR9GYMm5GfkEvNjX',
+ tunnel_header=IP(
+ src=self.pg0.remote_ip4,
+ dst=self.pg0.local_ip4))
+ self.local_tun_sa = SecurityAssociation(
+ ESP,
+ spi=0x000003e9,
+ crypt_algo='AES-CBC',
+ crypt_key='JPjyOWBeVEQiMe7h',
+ auth_algo='HMAC-SHA1-96',
+ auth_key='C91KUR9GYMm5GfkEvNjX',
+ tunnel_header=IP(
+ dst=self.pg0.remote_ip4,
+ src=self.pg0.local_ip4))
+ else:
+ self.remote_tra_sa = SecurityAssociation(
+ ESP,
+ spi=0x000007d0,
+ crypt_algo='AES-CBC',
+ crypt_key='JPjyOWBeVEQiMe7h',
+ auth_algo='HMAC-SHA1-96',
+ auth_key='C91KUR9GYMm5GfkEvNjX')
+ self.local_tra_sa = SecurityAssociation(
+ ESP,
+ spi=0x000007d1,
+ crypt_algo='AES-CBC',
+ crypt_key='JPjyOWBeVEQiMe7h',
+ auth_algo='HMAC-SHA1-96',
+ auth_key='C91KUR9GYMm5GfkEvNjX')
+
+ def tearDown(self):
+ super(TestIpsecEsp, self).tearDown()
+ if not self.vpp_dead:
+ self.vapi.cli("show hardware")
+
+ def send_and_expect(self, input, pkts, output, count=1):
+ input.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ rx = output.get_capture(count)
+ return rx
+
+ def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1):
+ return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
+ sa.encrypt(IP(src=src, dst=dst) / ICMP() /
+ "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX")
+ ] * count
+
+ def gen_pkts(self, sw_intf, src, dst, count=1):
+ return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
+ IP(src=src, dst=dst) / ICMP() /
+ "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"
+ ] * count
+
+ def test_ipsec_esp_tra_basic(self, count=1):
+ """ ipsec esp v4 transport basic test """
+ try:
+ self.configScapySA()
+ send_pkts = self.gen_encrypt_pkts(
+ self.remote_tra_sa,
+ self.pg2,
+ src=self.pg2.remote_ip4,
+ dst=self.pg2.local_ip4,
+ count=count)
+ recv_pkts = self.send_and_expect(
+ self.pg2, send_pkts, self.pg2, count=count)
+ # ESP TRA VPP encryption/decryption verification
+ for Pkts in recv_pkts:
+ self.local_tra_sa.decrypt(Pkts[IP])
+ finally:
+ self.logger.info(self.vapi.ppcli("show error"))
+ self.logger.info(self.vapi.ppcli("show ipsec"))
+
+ def test_ipsec_esp_tra_burst(self):
+ """ ipsec esp v4 transport burst test """
+ try:
+ self.test_ipsec_esp_tra_basic(count=257)
+ finally:
+ self.logger.info(self.vapi.ppcli("show error"))
+ self.logger.info(self.vapi.ppcli("show ipsec"))
+
+ def test_ipsec_esp_tun_basic(self, count=1):
+ """ ipsec esp 4o4 tunnel basic test """
+ try:
+ self.configScapySA(is_tun=True)
+ send_pkts = self.gen_encrypt_pkts(
+ self.remote_tun_sa,
+ self.pg0,
+ src=self.remote_pg0_lb_addr,
+ dst=self.remote_pg1_lb_addr,
+ count=count)
+ recv_pkts = self.send_and_expect(
+ self.pg0, send_pkts, self.pg1, count=count)
+ # ESP TUN VPP decryption verification
+ for recv_pkt in recv_pkts:
+ self.assert_equal(recv_pkt[IP].src, self.remote_pg0_lb_addr)
+ self.assert_equal(recv_pkt[IP].dst, self.remote_pg1_lb_addr)
+ send_pkts = self.gen_pkts(
+ self.pg1,
+ src=self.remote_pg1_lb_addr,
+ dst=self.remote_pg0_lb_addr,
+ count=count)
+ recv_pkts = self.send_and_expect(
+ self.pg1, send_pkts, self.pg0, count=count)
+ # ESP TUN VPP encryption verification
+ for recv_pkt in recv_pkts:
+ decrypt_pkt = self.local_tun_sa.decrypt(recv_pkt[IP])
+ self.assert_equal(decrypt_pkt.src, self.remote_pg1_lb_addr)
+ self.assert_equal(decrypt_pkt.dst, self.remote_pg0_lb_addr)
+ finally:
+ self.logger.info(self.vapi.ppcli("show error"))
+ self.logger.info(self.vapi.ppcli("show ipsec"))
+
+ def test_ipsec_esp_tun_burst(self):
+ """ ipsec esp 4o4 tunnel burst test """
+ try:
+ self.test_ipsec_esp_tun_basic(count=257)
+ finally:
+ self.logger.info(self.vapi.ppcli("show error"))
+ self.logger.info(self.vapi.ppcli("show ipsec"))