X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=test%2Ftest_ipsec_esp.py;h=50c6f5c8db5089f26770b3474df16f65fe1f723c;hb=fc81134a26458a8358483b0d2908a6b83afb7f11;hp=209298a30a45779e2d16f1aa6840d569c66b4be6;hpb=3a9bd7608f74594ab6ebc2fb20786bceaca72dea;p=vpp.git diff --git a/test/test_ipsec_esp.py b/test/test_ipsec_esp.py index 209298a30a4..50c6f5c8db5 100644 --- a/test/test_ipsec_esp.py +++ b/test/test_ipsec_esp.py @@ -466,6 +466,90 @@ class TestIpsecEsp2(TemplateIpsecEsp, IpsecTcpTests): pass +class TestIpsecEspAsync(TemplateIpsecEsp): + """ Ipsec ESP - Aysnc tests """ + + worker_config = "workers 2" + + def setUp(self): + super(TestIpsecEspAsync, self).setUp() + + self.vapi.ipsec_set_async_mode(async_enable=True) + self.p4 = IPsecIPv4Params() + + self.p4.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t. + IPSEC_API_CRYPTO_ALG_AES_CBC_256) + self.p4.crypt_algo = 'AES-CBC' # scapy name + self.p4.crypt_key = b'JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h' + + self.p4.scapy_tun_sa_id += 0xf0000 + self.p4.scapy_tun_spi += 0xf0000 + self.p4.vpp_tun_sa_id += 0xf0000 + self.p4.vpp_tun_spi += 0xf0000 + self.p4.remote_tun_if_host = "2.2.2.2" + e = VppEnum.vl_api_ipsec_spd_action_t + + self.p4.sa = VppIpsecSA( + self, + self.p4.vpp_tun_sa_id, + self.p4.vpp_tun_spi, + self.p4.auth_algo_vpp_id, + self.p4.auth_key, + self.p4.crypt_algo_vpp_id, + self.p4.crypt_key, + self.vpp_esp_protocol, + self.tun_if.local_addr[self.p4.addr_type], + self.tun_if.remote_addr[self.p4.addr_type]).add_vpp_config() + self.p4.spd = VppIpsecSpdEntry( + self, + self.tun_spd, + self.p4.vpp_tun_sa_id, + self.pg1.remote_addr[self.p4.addr_type], + self.pg1.remote_addr[self.p4.addr_type], + self.p4.remote_tun_if_host, + self.p4.remote_tun_if_host, + 0, + priority=1, + policy=e.IPSEC_API_SPD_ACTION_PROTECT, + is_outbound=1).add_vpp_config() + VppIpRoute(self, self.p4.remote_tun_if_host, self.p4.addr_len, + [VppRoutePath(self.tun_if.remote_addr[self.p4.addr_type], + 0xffffffff)]).add_vpp_config() + config_tun_params(self.p4, self.encryption_type, self.tun_if) + + def test_dual_stream(self): + """ Alternating SAs """ + p = self.params[self.p4.addr_type] + + pkts = [(Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / + IP(src=self.pg1.remote_ip4, + dst=self.p4.remote_tun_if_host) / + UDP(sport=4444, dport=4444) / + Raw(b'0x0' * 200)), + (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / + IP(src=self.pg1.remote_ip4, + dst=p.remote_tun_if_host) / + UDP(sport=4444, dport=4444) / + Raw(b'0x0' * 200))] + pkts *= 1023 + + rxs = self.send_and_expect(self.pg1, pkts, self.pg0) + + self.assertEqual(len(rxs), len(pkts)) + + for rx in rxs: + if rx[ESP].spi == p.scapy_tun_spi: + decrypted = p.vpp_tun_sa.decrypt(rx[IP]) + elif rx[ESP].spi == self.p4.vpp_tun_spi: + decrypted = self.p4.scapy_tun_sa.decrypt(rx[IP]) + else: + rx.show() + self.assertTrue(False) + + self.p4.spd.remove_vpp_config() + self.p4.sa.remove_vpp_config() + + class TestIpsecEspHandoff(TemplateIpsecEsp, IpsecTun6HandoffTests, IpsecTun4HandoffTests):